[ARVADOS] updated: 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f

Git user git at public.curoverse.com
Sun Nov 13 22:23:54 EST 2016


Summary of changes:
 services/ws/config.go     |   5 +++
 services/ws/event.go      |  11 ++---
 services/ws/handler.go    |   6 +++
 services/ws/handler_v0.go | 105 ++++++++++++++++++++++++++++++++++++----------
 services/ws/handler_v1.go |   4 +-
 services/ws/pg.go         |   8 ++--
 services/ws/router.go     |   6 ++-
 7 files changed, 113 insertions(+), 32 deletions(-)

       via  4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f (commit)
       via  39b1824cff7cae632a19dbe9c011b8b5d8fb9375 (commit)
      from  219daa3b227f147cb628a27b823b9b4e1f8d32b1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 22:23:48 2016 -0500

    8460: Ping clients only when read times out and outgoing queue is empty.

diff --git a/services/ws/config.go b/services/ws/config.go
index 9a2bb3c..290109a 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -1,6 +1,8 @@
 package main
 
 import (
+	"time"
+
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
@@ -10,6 +12,7 @@ type Config struct {
 	Listen   string
 	Debug    bool
 
+	PingTimeout      arvados.Duration
 	ClientEventQueue int
 	ServerEventQueue int
 }
@@ -27,6 +30,8 @@ func DefaultConfig() Config {
 			"connect_timeout": "30",
 			"sslmode":         "disable",
 		},
+		PingTimeout:      arvados.Duration(5 * time.Second),
 		ClientEventQueue: 64,
+		ServerEventQueue: 4,
 	}
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
index cd3374e..fe47a62 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
 import (
 	"io"
 	"net/http"
+	"time"
 )
 
 type handler interface {
@@ -12,4 +13,9 @@ type handler interface {
 type wsConn interface {
 	io.ReadWriter
 	Request() *http.Request
+	SetReadDeadline(time.Time) error
+}
+
+type timeouter interface {
+	Timeout() bool
 }
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index ea70138..c728d12 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -3,15 +3,20 @@ package main
 import (
 	"encoding/json"
 	"errors"
+	"io"
 	"log"
 	"sync"
 	"time"
 )
 
-var errQueueFull = errors.New("client queue full")
+var (
+	errQueueFull   = errors.New("client queue full")
+	errFrameTooBig = errors.New("frame too big")
+)
 
 type handlerV0 struct {
-	QueueSize int
+	PingTimeout time.Duration
+	QueueSize   int
 }
 
 func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
@@ -30,10 +35,32 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		buf := make([]byte, 2<<20)
 		for {
+			select {
+			case <-stopped:
+				return
+			default:
+			}
+			ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
 			n, err := ws.Read(buf)
 			h.debugLogf(ws, "received frame: %q", buf[:n])
-			if err != nil || n == len(buf) {
-				h.debugLogf(ws, "handlerV0: read: %s", err)
+			if err == nil && n == len(buf) {
+				err = errFrameTooBig
+			}
+			if err, ok := err.(timeouter); ok && err.Timeout() {
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
+			}
+			if err != nil {
+				if err != io.EOF {
+					h.debugLogf(ws, "handlerV0: read: %s", err)
+				}
 				stop <- err
 				return
 			}
@@ -53,7 +80,12 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		for e := range queue {
 			if e == nil {
-				ws.Write([]byte("{}\n"))
+				_, err := ws.Write([]byte("{}\n"))
+				if err != nil {
+					h.debugLogf(ws, "handlerV0: write: %s", err)
+					stop <- err
+					break
+				}
 				continue
 			}
 			detail := e.Detail()
@@ -77,9 +109,10 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if err != nil {
 				h.debugLogf(ws, "handlerV0: write: %s", err)
 				stop <- err
-				return
+				break
 			}
 		}
+		for _ = range queue {}
 	}()
 
 	// Filter incoming events against the current subscription
@@ -96,24 +129,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			}
 		}
 
-		// Once a minute, if the queue is empty, send an empty
-		// message. This can help detect a disconnected
-		// network socket.
-		ticker := time.NewTicker(time.Minute)
-		defer ticker.Stop()
-
 		for {
 			var e *event
+			var ok bool
 			select {
 			case <-stopped:
 				close(queue)
 				return
-			case <-ticker.C:
-				if len(queue) == 0 {
-					send(nil)
+			case e, ok = <-events:
+				if !ok {
+					close(queue)
+					return
 				}
-				continue
-			case e = <-events:
 			}
 			detail := e.Detail()
 			mtx.Lock()
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 91f3d34..4160d86 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,10 +1,12 @@
 package main
 
 import (
+	"time"
 )
 
 type handlerV1 struct {
-	QueueSize int
+	PingTimeout time.Duration
+	QueueSize   int
 }
 
 func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
diff --git a/services/ws/router.go b/services/ws/router.go
index e4e102b..685b613 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
-		QueueSize: rtr.Config.ClientEventQueue,
+		PingTimeout: rtr.Config.PingTimeout.Duration(),
+		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
-		QueueSize: rtr.Config.ClientEventQueue,
+		PingTimeout: rtr.Config.PingTimeout.Duration(),
+		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 }
 

commit 39b1824cff7cae632a19dbe9c011b8b5d8fb9375
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 21:47:34 2016 -0500

    8460: Fix connection cleanup and db connection handling.

diff --git a/services/ws/event.go b/services/ws/event.go
index 7e27e09..b6dda49 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -23,23 +23,24 @@ type event struct {
 	Received time.Time
 	Serial   uint64
 
+	db     *sql.DB
 	logRow *arvados.Log
-	err error
-	mtx sync.Mutex
+	err    error
+	mtx    sync.Mutex
 }
 
 // Detail returns the database row corresponding to the event. It can
 // be called safely from multiple goroutines. Only one attempt will be
 // made. If the database row cannot be retrieved, Detail returns nil.
-func (e *event) Detail(db *sql.DB) *arvados.Log {
+func (e *event) Detail() *arvados.Log {
 	e.mtx.Lock()
 	defer e.mtx.Unlock()
-	if e.logRow != nil || e.err != nil || db == nil {
+	if e.logRow != nil || e.err != nil {
 		return e.logRow
 	}
 	var logRow arvados.Log
 	var oldAttrs, newAttrs []byte
-	e.err = db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
 		&logRow.ID,
 		&logRow.UUID,
 		&logRow.ObjectUUID,
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 451af86..ea70138 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -2,10 +2,14 @@ package main
 
 import (
 	"encoding/json"
+	"errors"
 	"log"
 	"sync"
+	"time"
 )
 
+var errQueueFull = errors.New("client queue full")
+
 type handlerV0 struct {
 	QueueSize int
 }
@@ -16,32 +20,43 @@ func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
 }
 
 func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
-	done := make(chan struct{}, 3)
 	queue := make(chan *event, h.QueueSize)
 	mtx := sync.Mutex{}
 	subscribed := make(map[string]bool)
+
+	stopped := make(chan struct{})
+	stop := make(chan error, 5)
+
 	go func() {
 		buf := make([]byte, 2<<20)
 		for {
 			n, err := ws.Read(buf)
 			h.debugLogf(ws, "received frame: %q", buf[:n])
 			if err != nil || n == len(buf) {
-				break
+				h.debugLogf(ws, "handlerV0: read: %s", err)
+				stop <- err
+				return
 			}
 			msg := make(map[string]interface{})
 			err = json.Unmarshal(buf[:n], &msg)
 			if err != nil {
-				break
+				h.debugLogf(ws, "handlerV0: unmarshal: %s", err)
+				stop <- err
+				return
 			}
 			h.debugLogf(ws, "received message: %+v", msg)
 			h.debugLogf(ws, "subscribing to *")
 			subscribed["*"] = true
 		}
-		done <- struct{}{}
 	}()
-	go func(queue <-chan *event) {
+
+	go func() {
 		for e := range queue {
-			detail := e.Detail(nil)
+			if e == nil {
+				ws.Write([]byte("{}\n"))
+				continue
+			}
+			detail := e.Detail()
 			if detail == nil {
 				continue
 			}
@@ -59,28 +74,48 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 				continue
 			}
 			_, err = ws.Write(append(buf, byte('\n')))
-			if  err != nil {
+			if err != nil {
 				h.debugLogf(ws, "handlerV0: write: %s", err)
-				break
+				stop <- err
+				return
 			}
 		}
-		done <- struct{}{}
-	}(queue)
+	}()
+
+	// Filter incoming events against the current subscription
+	// list, and forward matching events to the outgoing message
+	// queue. Close the queue and return when the "stopped"
+	// channel closes or the incoming event stream ends. Shut down
+	// the handler if the outgoing queue fills up.
 	go func() {
 		send := func(e *event) {
-			if queue == nil {
-				return
-			}
 			select {
 			case queue <- e:
 			default:
-				close(queue)
-				queue = nil
-				done <- struct{}{}
+				stop <- errQueueFull
 			}
 		}
-		for e := range events {
-			detail := e.Detail(nil)
+
+		// Once a minute, if the queue is empty, send an empty
+		// message. This can help detect a disconnected
+		// network socket.
+		ticker := time.NewTicker(time.Minute)
+		defer ticker.Stop()
+
+		for {
+			var e *event
+			select {
+			case <-stopped:
+				close(queue)
+				return
+			case <-ticker.C:
+				if len(queue) == 0 {
+					send(nil)
+				}
+				continue
+			case e = <-events:
+			}
+			detail := e.Detail()
 			mtx.Lock()
 			switch {
 			case subscribed["*"]:
@@ -93,7 +128,8 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			}
 			mtx.Unlock()
 		}
-		done <- struct{}{}
 	}()
-	<-done
+
+	<-stop
+	close(stopped)
 }
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 6bce668..51bc92c 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -73,7 +73,7 @@ func (ps *pgEventSource) run() {
 			// to ps.QueueSize. Without this, max
 			// concurrent queries would be bounded by
 			// client_count X client_queue_size.
-			e.Detail(db)
+			e.Detail()
 			debugLogf("%+v", e)
 			ps.mtx.Lock()
 			for sink := range ps.sinks {
@@ -93,10 +93,11 @@ func (ps *pgEventSource) run() {
 			LogUUID:  pqEvent.Extra,
 			Received: time.Now(),
 			Serial:   serial,
+			db:       db,
 		}
 		debugLogf("%+v", e)
 		eventQueue <- e
-		go e.Detail(db)
+		go e.Detail()
 	}
 }
 
@@ -136,7 +137,8 @@ func (sink *pgEventSink) Stop() {
 		// Ensure this sink cannot fill up and block the
 		// server-side queue (which otherwise could in turn
 		// block our mtx.Lock() here)
-		for _ = range sink.channel {}
+		for _ = range sink.channel {
+		}
 	}()
 	sink.source.mtx.Lock()
 	delete(sink.source.sinks, sink)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list