[ARVADOS] updated: 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f
Git user
git at public.curoverse.com
Sun Nov 13 22:23:54 EST 2016
Summary of changes:
services/ws/config.go | 5 +++
services/ws/event.go | 11 ++---
services/ws/handler.go | 6 +++
services/ws/handler_v0.go | 105 ++++++++++++++++++++++++++++++++++++----------
services/ws/handler_v1.go | 4 +-
services/ws/pg.go | 8 ++--
services/ws/router.go | 6 ++-
7 files changed, 113 insertions(+), 32 deletions(-)
via 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f (commit)
via 39b1824cff7cae632a19dbe9c011b8b5d8fb9375 (commit)
from 219daa3b227f147cb628a27b823b9b4e1f8d32b1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4d8580c854210b8d4c1fbcbbfe13fb74a5858c4f
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 22:23:48 2016 -0500
8460: Ping clients only when read times out and outgoing queue is empty.
diff --git a/services/ws/config.go b/services/ws/config.go
index 9a2bb3c..290109a 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -1,6 +1,8 @@
package main
import (
+ "time"
+
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
@@ -10,6 +12,7 @@ type Config struct {
Listen string
Debug bool
+ PingTimeout arvados.Duration
ClientEventQueue int
ServerEventQueue int
}
@@ -27,6 +30,8 @@ func DefaultConfig() Config {
"connect_timeout": "30",
"sslmode": "disable",
},
+ PingTimeout: arvados.Duration(5 * time.Second),
ClientEventQueue: 64,
+ ServerEventQueue: 4,
}
}
diff --git a/services/ws/handler.go b/services/ws/handler.go
index cd3374e..fe47a62 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
import (
"io"
"net/http"
+ "time"
)
type handler interface {
@@ -12,4 +13,9 @@ type handler interface {
type wsConn interface {
io.ReadWriter
Request() *http.Request
+ SetReadDeadline(time.Time) error
+}
+
+type timeouter interface {
+ Timeout() bool
}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index ea70138..c728d12 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -3,15 +3,20 @@ package main
import (
"encoding/json"
"errors"
+ "io"
"log"
"sync"
"time"
)
-var errQueueFull = errors.New("client queue full")
+var (
+ errQueueFull = errors.New("client queue full")
+ errFrameTooBig = errors.New("frame too big")
+)
type handlerV0 struct {
- QueueSize int
+ PingTimeout time.Duration
+ QueueSize int
}
func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
@@ -30,10 +35,32 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
go func() {
buf := make([]byte, 2<<20)
for {
+ select {
+ case <-stopped:
+ return
+ default:
+ }
+ ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
n, err := ws.Read(buf)
h.debugLogf(ws, "received frame: %q", buf[:n])
- if err != nil || n == len(buf) {
- h.debugLogf(ws, "handlerV0: read: %s", err)
+ if err == nil && n == len(buf) {
+ err = errFrameTooBig
+ }
+ if err, ok := err.(timeouter); ok && err.Timeout() {
+ // If the outgoing queue is empty,
+ // send an empty message. This can
+ // help detect a disconnected network
+ // socket, and prevent an idle socket
+ // from being closed.
+ if len(queue) == 0 {
+ queue <- nil
+ }
+ continue
+ }
+ if err != nil {
+ if err != io.EOF {
+ h.debugLogf(ws, "handlerV0: read: %s", err)
+ }
stop <- err
return
}
@@ -53,7 +80,12 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
go func() {
for e := range queue {
if e == nil {
- ws.Write([]byte("{}\n"))
+ _, err := ws.Write([]byte("{}\n"))
+ if err != nil {
+ h.debugLogf(ws, "handlerV0: write: %s", err)
+ stop <- err
+ break
+ }
continue
}
detail := e.Detail()
@@ -77,9 +109,10 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
if err != nil {
h.debugLogf(ws, "handlerV0: write: %s", err)
stop <- err
- return
+ break
}
}
+ for _ = range queue {}
}()
// Filter incoming events against the current subscription
@@ -96,24 +129,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
}
}
- // Once a minute, if the queue is empty, send an empty
- // message. This can help detect a disconnected
- // network socket.
- ticker := time.NewTicker(time.Minute)
- defer ticker.Stop()
-
for {
var e *event
+ var ok bool
select {
case <-stopped:
close(queue)
return
- case <-ticker.C:
- if len(queue) == 0 {
- send(nil)
+ case e, ok = <-events:
+ if !ok {
+ close(queue)
+ return
}
- continue
- case e = <-events:
}
detail := e.Detail()
mtx.Lock()
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 91f3d34..4160d86 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,10 +1,12 @@
package main
import (
+ "time"
)
type handlerV1 struct {
- QueueSize int
+ PingTimeout time.Duration
+ QueueSize int
}
func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
diff --git a/services/ws/router.go b/services/ws/router.go
index e4e102b..685b613 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
- QueueSize: rtr.Config.ClientEventQueue,
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
}))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
- QueueSize: rtr.Config.ClientEventQueue,
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
}))
}
commit 39b1824cff7cae632a19dbe9c011b8b5d8fb9375
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 21:47:34 2016 -0500
8460: Fix connection cleanup and db connection handling.
diff --git a/services/ws/event.go b/services/ws/event.go
index 7e27e09..b6dda49 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -23,23 +23,24 @@ type event struct {
Received time.Time
Serial uint64
+ db *sql.DB
logRow *arvados.Log
- err error
- mtx sync.Mutex
+ err error
+ mtx sync.Mutex
}
// Detail returns the database row corresponding to the event. It can
// be called safely from multiple goroutines. Only one attempt will be
// made. If the database row cannot be retrieved, Detail returns nil.
-func (e *event) Detail(db *sql.DB) *arvados.Log {
+func (e *event) Detail() *arvados.Log {
e.mtx.Lock()
defer e.mtx.Unlock()
- if e.logRow != nil || e.err != nil || db == nil {
+ if e.logRow != nil || e.err != nil {
return e.logRow
}
var logRow arvados.Log
var oldAttrs, newAttrs []byte
- e.err = db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+ e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
&logRow.ID,
&logRow.UUID,
&logRow.ObjectUUID,
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 451af86..ea70138 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -2,10 +2,14 @@ package main
import (
"encoding/json"
+ "errors"
"log"
"sync"
+ "time"
)
+var errQueueFull = errors.New("client queue full")
+
type handlerV0 struct {
QueueSize int
}
@@ -16,32 +20,43 @@ func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
}
func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
- done := make(chan struct{}, 3)
queue := make(chan *event, h.QueueSize)
mtx := sync.Mutex{}
subscribed := make(map[string]bool)
+
+ stopped := make(chan struct{})
+ stop := make(chan error, 5)
+
go func() {
buf := make([]byte, 2<<20)
for {
n, err := ws.Read(buf)
h.debugLogf(ws, "received frame: %q", buf[:n])
if err != nil || n == len(buf) {
- break
+ h.debugLogf(ws, "handlerV0: read: %s", err)
+ stop <- err
+ return
}
msg := make(map[string]interface{})
err = json.Unmarshal(buf[:n], &msg)
if err != nil {
- break
+ h.debugLogf(ws, "handlerV0: unmarshal: %s", err)
+ stop <- err
+ return
}
h.debugLogf(ws, "received message: %+v", msg)
h.debugLogf(ws, "subscribing to *")
subscribed["*"] = true
}
- done <- struct{}{}
}()
- go func(queue <-chan *event) {
+
+ go func() {
for e := range queue {
- detail := e.Detail(nil)
+ if e == nil {
+ ws.Write([]byte("{}\n"))
+ continue
+ }
+ detail := e.Detail()
if detail == nil {
continue
}
@@ -59,28 +74,48 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
continue
}
_, err = ws.Write(append(buf, byte('\n')))
- if err != nil {
+ if err != nil {
h.debugLogf(ws, "handlerV0: write: %s", err)
- break
+ stop <- err
+ return
}
}
- done <- struct{}{}
- }(queue)
+ }()
+
+ // Filter incoming events against the current subscription
+ // list, and forward matching events to the outgoing message
+ // queue. Close the queue and return when the "stopped"
+ // channel closes or the incoming event stream ends. Shut down
+ // the handler if the outgoing queue fills up.
go func() {
send := func(e *event) {
- if queue == nil {
- return
- }
select {
case queue <- e:
default:
- close(queue)
- queue = nil
- done <- struct{}{}
+ stop <- errQueueFull
}
}
- for e := range events {
- detail := e.Detail(nil)
+
+ // Once a minute, if the queue is empty, send an empty
+ // message. This can help detect a disconnected
+ // network socket.
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+
+ for {
+ var e *event
+ select {
+ case <-stopped:
+ close(queue)
+ return
+ case <-ticker.C:
+ if len(queue) == 0 {
+ send(nil)
+ }
+ continue
+ case e = <-events:
+ }
+ detail := e.Detail()
mtx.Lock()
switch {
case subscribed["*"]:
@@ -93,7 +128,8 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
}
mtx.Unlock()
}
- done <- struct{}{}
}()
- <-done
+
+ <-stop
+ close(stopped)
}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 6bce668..51bc92c 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -73,7 +73,7 @@ func (ps *pgEventSource) run() {
// to ps.QueueSize. Without this, max
// concurrent queries would be bounded by
// client_count X client_queue_size.
- e.Detail(db)
+ e.Detail()
debugLogf("%+v", e)
ps.mtx.Lock()
for sink := range ps.sinks {
@@ -93,10 +93,11 @@ func (ps *pgEventSource) run() {
LogUUID: pqEvent.Extra,
Received: time.Now(),
Serial: serial,
+ db: db,
}
debugLogf("%+v", e)
eventQueue <- e
- go e.Detail(db)
+ go e.Detail()
}
}
@@ -136,7 +137,8 @@ func (sink *pgEventSink) Stop() {
// Ensure this sink cannot fill up and block the
// server-side queue (which otherwise could in turn
// block our mtx.Lock() here)
- for _ = range sink.channel {}
+ for _ = range sink.channel {
+ }
}()
sink.source.mtx.Lock()
delete(sink.source.sinks, sink)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list