[ARVADOS] updated: 74d5bfb293b2acf76d639df12ff8769bc333a5f2

Git user git at public.curoverse.com
Fri Nov 18 18:08:25 EST 2016


Summary of changes:
 services/ws/handler.go    | 58 +++++++++++++++++++++++++++--------------------
 services/ws/permission.go |  5 ++++
 services/ws/session_v0.go |  6 ++++-
 3 files changed, 43 insertions(+), 26 deletions(-)

       via  74d5bfb293b2acf76d639df12ff8769bc333a5f2 (commit)
       via  3c1b51391b320539eb4cfbc02ea9e9363df1b370 (commit)
      from  c52c3788fbbf161db40118261c4cfff52ebf8ceb (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 74d5bfb293b2acf76d639df12ff8769bc333a5f2
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 18 18:07:15 2016 -0500

    8460: Fix "send to closed channel" race by using context lib to release goroutines.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index ab25805..91a7702 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"io"
 	"time"
 
@@ -22,7 +23,8 @@ type handlerStats struct {
 }
 
 func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
-	log := logger(ws.Request().Context())
+	ctx, cancel := context.WithCancel(ws.Request().Context())
+	log := logger(ctx)
 	queue := make(chan interface{}, h.QueueSize)
 	sess, err := h.NewSession(ws, queue)
 	if err != nil {
@@ -30,14 +32,11 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 		return
 	}
 
-	stopped := make(chan struct{})
-	stop := make(chan error, 5)
-
 	go func() {
 		buf := make([]byte, 2<<20)
 		for {
 			select {
-			case <-stopped:
+			case <-ctx.Done():
 				return
 			default:
 			}
@@ -52,19 +51,30 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 				if err != io.EOF {
 					log.WithError(err).Info("read error")
 				}
-				stop <- err
+				cancel()
 				return
 			}
 			err = sess.Receive(buf)
 			if err != nil {
-				stop <- err
+				log.WithError(err).Error("sess.Receive() failed")
+				cancel()
 				return
 			}
 		}
 	}()
 
 	go func() {
-		for data := range queue {
+		for {
+			var ok bool
+			var data interface{}
+			select {
+			case <-ctx.Done():
+				return
+			case data, ok = <-queue:
+				if !ok {
+					return
+				}
+			}
 			var e *event
 			var buf []byte
 			var err error
@@ -79,7 +89,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 				buf, err = sess.EventMessage(e)
 				if err != nil {
 					log.WithError(err).Error("EventMessage failed")
-					stop <- err
+					cancel()
 					break
 				} else if len(buf) == 0 {
 					log.Debug("skip")
@@ -96,7 +106,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 			_, err = ws.Write(buf)
 			if err != nil {
 				log.WithError(err).Error("write failed")
-				stop <- err
+				cancel()
 				break
 			}
 			log.Debug("sent")
@@ -108,25 +118,20 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 			stats.EventBytes += uint64(len(buf))
 			stats.EventCount++
 		}
-		for _ = range queue {
-			// Ensure queue can't fill up and block other
-			// goroutines after we hit a write error.
-		}
 	}()
 
 	// Filter incoming events against the current subscription
 	// list, and forward matching events to the outgoing message
-	// queue. Close the queue and return when the "stopped"
-	// channel closes or the incoming event stream ends. Shut down
-	// the handler if the outgoing queue fills up.
+	// queue. Close the queue and return when the request context
+	// is done/cancelled or the incoming event stream ends. Shut
+	// down the handler if the outgoing queue fills up.
 	go func() {
 		ticker := time.NewTicker(h.PingTimeout)
 		defer ticker.Stop()
 
 		for {
 			select {
-			case <-stopped:
-				close(queue)
+			case <-ctx.Done():
 				return
 			case <-ticker.C:
 				// If the outgoing queue is empty,
@@ -135,12 +140,15 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 				// socket, and prevent an idle socket
 				// from being closed.
 				if len(queue) == 0 {
-					queue <- []byte(`{}`)
+					select {
+					case queue <- []byte(`{}`):
+					default:
+					}
 				}
 				continue
 			case e, ok := <-incoming:
 				if !ok {
-					close(queue)
+					cancel()
 					return
 				}
 				if !sess.Filter(e) {
@@ -149,14 +157,14 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 				select {
 				case queue <- e:
 				default:
-					stop <- errQueueFull
+					log.WithError(errQueueFull).Error("terminate")
+					cancel()
+					return
 				}
 			}
 		}
 	}()
 
-	<-stop
-	close(stopped)
-
+	<-ctx.Done()
 	return
 }
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 2bcce60..29a7ade 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -171,7 +171,11 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
 			db:       sess.db,
 		}
 		if sub.match(sess, e) {
-			sess.sendq <- e
+			select {
+			case sess.sendq <- e:
+			case <-sess.ws.Request().Context().Done():
+				return
+			}
 		}
 	}
 	if err := rows.Err(); err != nil {

commit 3c1b51391b320539eb4cfbc02ea9e9363df1b370
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 18 18:00:59 2016 -0500

    8460: Accept 403 response from API as "not permitted", instead of "unexpected error".

diff --git a/services/ws/permission.go b/services/ws/permission.go
index 30276e4..42f7b37 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -66,6 +66,11 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 		allowed = true
 	} else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
 		allowed = false
+	} else if txErr.StatusCode == http.StatusForbidden {
+		// Some requests are expressly forbidden for reasons
+		// other than "you aren't allowed to know whether this
+		// UUID exists" (404).
+		allowed = false
 	} else {
 		logger.WithError(err).Error("lookup error")
 		return false, err

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list