[ARVADOS] updated: f9eb135e4420352198729534f115f233cda8c261

Git user git at public.curoverse.com
Sun Nov 13 22:24:26 EST 2016


Summary of changes:
 services/ws/config.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

  discards  4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f (commit)
       via  f9eb135e4420352198729534f115f233cda8c261 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f)
            \
             N -- N -- N (f9eb135e4420352198729534f115f233cda8c261)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f9eb135e4420352198729534f115f233cda8c261
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 22:24:22 2016 -0500

    8460: Ping clients only when read times out and outgoing queue is empty.

diff --git a/services/ws/config.go b/services/ws/config.go
index 9a2bb3c..3e3d91f 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -1,6 +1,8 @@
 package main
 
 import (
+	"time"
+
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
@@ -10,6 +12,7 @@ type Config struct {
 	Listen   string
 	Debug    bool
 
+	PingTimeout      arvados.Duration
 	ClientEventQueue int
 	ServerEventQueue int
 }
@@ -27,6 +30,8 @@ func DefaultConfig() Config {
 			"connect_timeout": "30",
 			"sslmode":         "disable",
 		},
+		PingTimeout:      arvados.Duration(time.Minute),
 		ClientEventQueue: 64,
+		ServerEventQueue: 4,
 	}
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
index cd3374e..fe47a62 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
 import (
 	"io"
 	"net/http"
+	"time"
 )
 
 type handler interface {
@@ -12,4 +13,9 @@ type handler interface {
 type wsConn interface {
 	io.ReadWriter
 	Request() *http.Request
+	SetReadDeadline(time.Time) error
+}
+
+type timeouter interface {
+	Timeout() bool
 }
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index ea70138..c728d12 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -3,15 +3,20 @@ package main
 import (
 	"encoding/json"
 	"errors"
+	"io"
 	"log"
 	"sync"
 	"time"
 )
 
-var errQueueFull = errors.New("client queue full")
+var (
+	errQueueFull   = errors.New("client queue full")
+	errFrameTooBig = errors.New("frame too big")
+)
 
 type handlerV0 struct {
-	QueueSize int
+	PingTimeout time.Duration
+	QueueSize   int
 }
 
 func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
@@ -30,10 +35,32 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		buf := make([]byte, 2<<20)
 		for {
+			select {
+			case <-stopped:
+				return
+			default:
+			}
+			ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
 			n, err := ws.Read(buf)
 			h.debugLogf(ws, "received frame: %q", buf[:n])
-			if err != nil || n == len(buf) {
-				h.debugLogf(ws, "handlerV0: read: %s", err)
+			if err == nil && n == len(buf) {
+				err = errFrameTooBig
+			}
+			if err, ok := err.(timeouter); ok && err.Timeout() {
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
+			}
+			if err != nil {
+				if err != io.EOF {
+					h.debugLogf(ws, "handlerV0: read: %s", err)
+				}
 				stop <- err
 				return
 			}
@@ -53,7 +80,12 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		for e := range queue {
 			if e == nil {
-				ws.Write([]byte("{}\n"))
+				_, err := ws.Write([]byte("{}\n"))
+				if err != nil {
+					h.debugLogf(ws, "handlerV0: write: %s", err)
+					stop <- err
+					break
+				}
 				continue
 			}
 			detail := e.Detail()
@@ -77,9 +109,10 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if err != nil {
 				h.debugLogf(ws, "handlerV0: write: %s", err)
 				stop <- err
-				return
+				break
 			}
 		}
+		for _ = range queue {}
 	}()
 
 	// Filter incoming events against the current subscription
@@ -96,24 +129,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			}
 		}
 
-		// Once a minute, if the queue is empty, send an empty
-		// message. This can help detect a disconnected
-		// network socket.
-		ticker := time.NewTicker(time.Minute)
-		defer ticker.Stop()
-
 		for {
 			var e *event
+			var ok bool
 			select {
 			case <-stopped:
 				close(queue)
 				return
-			case <-ticker.C:
-				if len(queue) == 0 {
-					send(nil)
+			case e, ok = <-events:
+				if !ok {
+					close(queue)
+					return
 				}
-				continue
-			case e = <-events:
 			}
 			detail := e.Detail()
 			mtx.Lock()
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 91f3d34..4160d86 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,10 +1,12 @@
 package main
 
 import (
+	"time"
 )
 
 type handlerV1 struct {
-	QueueSize int
+	PingTimeout time.Duration
+	QueueSize   int
 }
 
 func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
diff --git a/services/ws/router.go b/services/ws/router.go
index e4e102b..685b613 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
-		QueueSize: rtr.Config.ClientEventQueue,
+		PingTimeout: rtr.Config.PingTimeout.Duration(),
+		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
-		QueueSize: rtr.Config.ClientEventQueue,
+		PingTimeout: rtr.Config.PingTimeout.Duration(),
+		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list