[ARVADOS] updated: 74d5bfb293b2acf76d639df12ff8769bc333a5f2
Git user
git at public.curoverse.com
Fri Nov 18 18:08:25 EST 2016
Summary of changes:
services/ws/handler.go | 58 +++++++++++++++++++++++++++--------------------
services/ws/permission.go | 5 ++++
services/ws/session_v0.go | 6 ++++-
3 files changed, 43 insertions(+), 26 deletions(-)
via 74d5bfb293b2acf76d639df12ff8769bc333a5f2 (commit)
via 3c1b51391b320539eb4cfbc02ea9e9363df1b370 (commit)
from c52c3788fbbf161db40118261c4cfff52ebf8ceb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 74d5bfb293b2acf76d639df12ff8769bc333a5f2
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Nov 18 18:07:15 2016 -0500
8460: Fix "send to closed channel" race by using context lib to release goroutines.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index ab25805..91a7702 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,6 +1,7 @@
package main
import (
+ "context"
"io"
"time"
@@ -22,7 +23,8 @@ type handlerStats struct {
}
func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
- log := logger(ws.Request().Context())
+ ctx, cancel := context.WithCancel(ws.Request().Context())
+ log := logger(ctx)
queue := make(chan interface{}, h.QueueSize)
sess, err := h.NewSession(ws, queue)
if err != nil {
@@ -30,14 +32,11 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
return
}
- stopped := make(chan struct{})
- stop := make(chan error, 5)
-
go func() {
buf := make([]byte, 2<<20)
for {
select {
- case <-stopped:
+ case <-ctx.Done():
return
default:
}
@@ -52,19 +51,30 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
if err != io.EOF {
log.WithError(err).Info("read error")
}
- stop <- err
+ cancel()
return
}
err = sess.Receive(buf)
if err != nil {
- stop <- err
+ log.WithError(err).Error("sess.Receive() failed")
+ cancel()
return
}
}
}()
go func() {
- for data := range queue {
+ for {
+ var ok bool
+ var data interface{}
+ select {
+ case <-ctx.Done():
+ return
+ case data, ok = <-queue:
+ if !ok {
+ return
+ }
+ }
var e *event
var buf []byte
var err error
@@ -79,7 +89,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- stop <- err
+ cancel()
break
} else if len(buf) == 0 {
log.Debug("skip")
@@ -96,7 +106,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
_, err = ws.Write(buf)
if err != nil {
log.WithError(err).Error("write failed")
- stop <- err
+ cancel()
break
}
log.Debug("sent")
@@ -108,25 +118,20 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
stats.EventBytes += uint64(len(buf))
stats.EventCount++
}
- for _ = range queue {
- // Ensure queue can't fill up and block other
- // goroutines after we hit a write error.
- }
}()
// Filter incoming events against the current subscription
// list, and forward matching events to the outgoing message
- // queue. Close the queue and return when the "stopped"
- // channel closes or the incoming event stream ends. Shut down
- // the handler if the outgoing queue fills up.
+ // queue. Close the queue and return when the request context
+ // is done/cancelled or the incoming event stream ends. Shut
+ // down the handler if the outgoing queue fills up.
go func() {
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
for {
select {
- case <-stopped:
- close(queue)
+ case <-ctx.Done():
return
case <-ticker.C:
// If the outgoing queue is empty,
@@ -135,12 +140,15 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
// socket, and prevent an idle socket
// from being closed.
if len(queue) == 0 {
- queue <- []byte(`{}`)
+ select {
+ case queue <- []byte(`{}`):
+ default:
+ }
}
continue
case e, ok := <-incoming:
if !ok {
- close(queue)
+ cancel()
return
}
if !sess.Filter(e) {
@@ -149,14 +157,14 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
select {
case queue <- e:
default:
- stop <- errQueueFull
+ log.WithError(errQueueFull).Error("terminate")
+ cancel()
+ return
}
}
}
}()
- <-stop
- close(stopped)
-
+ <-ctx.Done()
return
}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 2bcce60..29a7ade 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -171,7 +171,11 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
db: sess.db,
}
if sub.match(sess, e) {
- sess.sendq <- e
+ select {
+ case sess.sendq <- e:
+ case <-sess.ws.Request().Context().Done():
+ return
+ }
}
}
if err := rows.Err(); err != nil {
commit 3c1b51391b320539eb4cfbc02ea9e9363df1b370
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Nov 18 18:00:59 2016 -0500
8460: Accept 403 response from API as "not permitted", instead of "unexpected error".
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 30276e4..42f7b37 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -66,6 +66,11 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
allowed = true
} else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
allowed = false
+ } else if txErr.StatusCode == http.StatusForbidden {
+ // Some requests are expressly forbidden for reasons
+ // other than "you aren't allowed to know whether this
+ // UUID exists" (404).
+ allowed = false
} else {
logger.WithError(err).Error("lookup error")
return false, err
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list