[ARVADOS] updated: f9eb135e4420352198729534f115f233cda8c261
Git user
git at public.curoverse.com
Sun Nov 13 22:24:26 EST 2016
Summary of changes:
services/ws/config.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
discards 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f (commit)
via f9eb135e4420352198729534f115f233cda8c261 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f)
\
N -- N -- N (f9eb135e4420352198729534f115f233cda8c261)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f9eb135e4420352198729534f115f233cda8c261
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 22:24:22 2016 -0500
8460: Ping clients only when read times out and outgoing queue is empty.
diff --git a/services/ws/config.go b/services/ws/config.go
index 9a2bb3c..3e3d91f 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -1,6 +1,8 @@
package main
import (
+ "time"
+
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
@@ -10,6 +12,7 @@ type Config struct {
Listen string
Debug bool
+ PingTimeout arvados.Duration
ClientEventQueue int
ServerEventQueue int
}
@@ -27,6 +30,8 @@ func DefaultConfig() Config {
"connect_timeout": "30",
"sslmode": "disable",
},
+ PingTimeout: arvados.Duration(time.Minute),
ClientEventQueue: 64,
+ ServerEventQueue: 4,
}
}
diff --git a/services/ws/handler.go b/services/ws/handler.go
index cd3374e..fe47a62 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
import (
"io"
"net/http"
+ "time"
)
type handler interface {
@@ -12,4 +13,9 @@ type handler interface {
type wsConn interface {
io.ReadWriter
Request() *http.Request
+ SetReadDeadline(time.Time) error
+}
+
+type timeouter interface {
+ Timeout() bool
}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index ea70138..c728d12 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -3,15 +3,20 @@ package main
import (
"encoding/json"
"errors"
+ "io"
"log"
"sync"
"time"
)
-var errQueueFull = errors.New("client queue full")
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+)
type handlerV0 struct {
- QueueSize int
+ PingTimeout time.Duration
+ QueueSize int
}
func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
@@ -30,10 +35,32 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
go func() {
buf := make([]byte, 2<<20)
for {
+ select {
+ case <-stopped:
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
n, err := ws.Read(buf)
h.debugLogf(ws, "received frame: %q", buf[:n])
- if err != nil || n == len(buf) {
- h.debugLogf(ws, "handlerV0: read: %s", err)
+ if err == nil && n == len(buf) {
+ err = errFrameTooBig
+ }
+ if err, ok := err.(timeouter); ok && err.Timeout() {
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ queue <- nil
+ }
+ continue
+ }
+ if err != nil {
+ if err != io.EOF {
+ h.debugLogf(ws, "handlerV0: read: %s", err)
+ }
stop <- err
return
}
@@ -53,7 +80,12 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
go func() {
for e := range queue {
if e == nil {
- ws.Write([]byte("{}\n"))
+ _, err := ws.Write([]byte("{}\n"))
+ if err != nil {
+ h.debugLogf(ws, "handlerV0: write: %s", err)
+ stop <- err
+ break
+ }
continue
}
detail := e.Detail()
@@ -77,9 +109,10 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
if err != nil {
h.debugLogf(ws, "handlerV0: write: %s", err)
stop <- err
- return
+ break
}
}
+ for _ = range queue {}
}()
// Filter incoming events against the current subscription
@@ -96,24 +129,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
}
}
- // Once a minute, if the queue is empty, send an empty
- // message. This can help detect a disconnected
- // network socket.
- ticker := time.NewTicker(time.Minute)
- defer ticker.Stop()
-
for {
var e *event
+ var ok bool
select {
case <-stopped:
close(queue)
return
- case <-ticker.C:
- if len(queue) == 0 {
- send(nil)
+ case e, ok = <-events:
+ if !ok {
+ close(queue)
+ return
}
- continue
- case e = <-events:
}
detail := e.Detail()
mtx.Lock()
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 91f3d34..4160d86 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,10 +1,12 @@
package main
import (
+ "time"
)
type handlerV1 struct {
- QueueSize int
+ PingTimeout time.Duration
+ QueueSize int
}
func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
diff --git a/services/ws/router.go b/services/ws/router.go
index e4e102b..685b613 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
- QueueSize: rtr.Config.ClientEventQueue,
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
}))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
- QueueSize: rtr.Config.ClientEventQueue,
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
}))
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list