[ARVADOS] updated: b0ba939812720869fca0a75b07d42518d4953345

Git user git at public.curoverse.com
Sat Nov 19 03:11:12 EST 2016


Summary of changes:
 services/ws/handler.go    | 53 +++++++++++++++++++++++++++++++----
 services/ws/main.go       |  5 ++--
 services/ws/pg.go         | 31 +++++++++++++++++----
 services/ws/router.go     | 70 +++++++++++++++++++++++++++++++++++------------
 services/ws/session_v0.go |  5 ++--
 services/ws/session_v1.go |  4 +--
 6 files changed, 133 insertions(+), 35 deletions(-)

       via  b0ba939812720869fca0a75b07d42518d4953345 (commit)
       via  05666b170533c79800bea1d472c284e23c9ec5e0 (commit)
       via  88ab51c83676a75efbb7b83e7d17927816ecaac4 (commit)
      from  74d5bfb293b2acf76d639df12ff8769bc333a5f2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b0ba939812720869fca0a75b07d42518d4953345
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 19 03:10:24 2016 -0500

    8460: Report status of outgoing queues.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index 91a7702..d2c119a 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
 import (
 	"context"
 	"io"
+	"sync"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,7 +13,10 @@ type handler struct {
 	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
-	NewSession  func(wsConn, chan<- interface{}) (session, error)
+
+	mtx       sync.Mutex
+	queues    map[chan interface{}]struct{}
+	setupOnce sync.Once
 }
 
 type handlerStats struct {
@@ -22,13 +26,28 @@ type handlerStats struct {
 	EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) {
+	h.setupOnce.Do(h.setup)
+
 	ctx, cancel := context.WithCancel(ws.Request().Context())
 	log := logger(ctx)
+
+	incoming := eventSource.NewSink()
+	defer incoming.Stop()
+
 	queue := make(chan interface{}, h.QueueSize)
-	sess, err := h.NewSession(ws, queue)
+	h.mtx.Lock()
+	h.queues[queue] = struct{}{}
+	h.mtx.Unlock()
+	defer func() {
+		h.mtx.Lock()
+		delete(h.queues, queue)
+		h.mtx.Unlock()
+	}()
+
+	sess, err := newSession(ws, queue)
 	if err != nil {
-		log.WithError(err).Error("NewSession failed")
+		log.WithError(err).Error("newSession failed")
 		return
 	}
 
@@ -146,7 +165,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 					}
 				}
 				continue
-			case e, ok := <-incoming:
+			case e, ok := <-incoming.Channel():
 				if !ok {
 					cancel()
 					return
@@ -168,3 +187,27 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
 	<-ctx.Done()
 	return
 }
+
+func (h *handler) Status() interface{} {
+	h.mtx.Lock()
+	defer h.mtx.Unlock()
+
+	var s struct {
+		QueueCount int
+		QueueMax   int
+		QueueTotal uint64
+	}
+	for q := range h.queues {
+		n := len(q)
+		s.QueueTotal += uint64(n)
+		if s.QueueMax < n {
+			s.QueueMax = n
+		}
+	}
+	s.QueueCount = len(h.queues)
+	return &s
+}
+
+func (h *handler) setup() {
+	h.queues = make(map[chan interface{}]struct{})
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 4e2da08..b6b064e 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"database/sql"
+	"fmt"
 	"strconv"
 	"strings"
 	"sync"
@@ -36,6 +37,8 @@ type pgEventSource struct {
 	setupOnce  sync.Once
 	mtx        sync.Mutex
 	shutdown   chan error
+
+	lastQDelay time.Duration
 }
 
 func (ps *pgEventSource) setup() {
@@ -86,6 +89,7 @@ func (ps *pgEventSource) run() {
 				WithField("serial", e.Serial).
 				WithField("detail", e.Detail()).
 				Debug("event ready")
+			ps.lastQDelay = time.Now().Sub(e.Received)
 
 			ps.mtx.Lock()
 			for sink := range ps.sinks {
@@ -171,7 +175,8 @@ func (ps *pgEventSource) Status() interface{} {
 	}
 	return map[string]interface{}{
 		"Queue":        len(ps.queue),
-		"QueueMax":     cap(ps.queue),
+		"QueueLimit":   cap(ps.queue),
+		"QueueDelay":   fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
 		"Sinks":        len(ps.sinks),
 		"SinksBlocked": blocked,
 	}
diff --git a/services/ws/router.go b/services/ws/router.go
index 19f7d18..18eaf73 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -26,6 +26,7 @@ type router struct {
 	eventSource    eventSource
 	newPermChecker func() permChecker
 
+	handler   *handler
 	mux       *http.ServeMux
 	setupOnce sync.Once
 
@@ -46,6 +47,10 @@ type Statuser interface {
 type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
 
 func (rtr *router) setup() {
+	rtr.handler = &handler{
+		PingTimeout: rtr.Config.PingTimeout.Duration(),
+		QueueSize:   rtr.Config.ClientEventQueue,
+	}
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
@@ -53,30 +58,24 @@ func (rtr *router) setup() {
 }
 
 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
-	handler := &handler{
-		PingTimeout: rtr.Config.PingTimeout.Duration(),
-		QueueSize:   rtr.Config.ClientEventQueue,
-		NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
-			return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
-		},
-	}
 	return &websocket.Server{
 		Handshake: func(c *websocket.Config, r *http.Request) error {
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
 			t0 := time.Now()
-			sink := rtr.eventSource.NewSink()
 			log := logger(ws.Request().Context())
 			log.Info("connected")
 
-			stats := handler.Handle(ws, sink.Channel())
+			stats := rtr.handler.Handle(ws, rtr.eventSource,
+				func(ws wsConn, sendq chan<- interface{}) (session, error) {
+					return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
+				})
 
 			log.WithFields(logrus.Fields{
 				"Elapsed": time.Now().Sub(t0).Seconds(),
 				"Stats":   stats,
 			}).Info("disconnect")
-			sink.Stop()
 			ws.Close()
 		}),
 	}
@@ -94,10 +93,11 @@ func (rtr *router) newReqID() string {
 
 func (rtr *router) Status() interface{} {
 	s := map[string]interface{}{
-		"Router": rtr.status,
+		"HTTP":     rtr.status,
+		"Outgoing": rtr.handler.Status(),
 	}
 	if es, ok := rtr.eventSource.(Statuser); ok {
-		s["EventSource"] = es.Status()
+		s["Incoming"] = es.Status()
 	}
 	return s
 }

commit 05666b170533c79800bea1d472c284e23c9ec5e0
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 19 02:15:57 2016 -0500

    8460: Add /status.json

diff --git a/services/ws/pg.go b/services/ws/pg.go
index 206cfeb..4e2da08 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -31,6 +31,7 @@ type pgEventSource struct {
 
 	db         *sql.DB
 	pqListener *pq.Listener
+	queue      chan *event
 	sinks      map[*pgEventSink]bool
 	setupOnce  sync.Once
 	mtx        sync.Mutex
@@ -70,10 +71,10 @@ func (ps *pgEventSource) setup() {
 }
 
 func (ps *pgEventSource) run() {
-	eventQueue := make(chan *event, ps.QueueSize)
+	ps.queue = make(chan *event, ps.QueueSize)
 
 	go func() {
-		for e := range eventQueue {
+		for e := range ps.queue {
 			// Wait for the "select ... from logs" call to
 			// finish. This limits max concurrent queries
 			// to ps.QueueSize. Without this, max
@@ -103,7 +104,7 @@ func (ps *pgEventSource) run() {
 			if ok {
 				logger(nil).WithError(err).Info("shutdown")
 			}
-			close(eventQueue)
+			close(ps.queue)
 			return
 
 		case <-ticker.C:
@@ -112,7 +113,7 @@ func (ps *pgEventSource) run() {
 
 		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
-				close(eventQueue)
+				close(ps.queue)
 				return
 			}
 			if pqEvent.Channel != "logs" {
@@ -131,7 +132,7 @@ func (ps *pgEventSource) run() {
 				db:       ps.db,
 			}
 			logger(nil).WithField("event", e).Debug("incoming")
-			eventQueue <- e
+			ps.queue <- e
 			go e.Detail()
 		}
 	}
@@ -161,6 +162,21 @@ func (ps *pgEventSource) DB() *sql.DB {
 	return ps.db
 }
 
+func (ps *pgEventSource) Status() interface{} {
+	ps.mtx.Lock()
+	defer ps.mtx.Unlock()
+	blocked := 0
+	for sink := range ps.sinks {
+		blocked += len(sink.channel)
+	}
+	return map[string]interface{}{
+		"Queue":        len(ps.queue),
+		"QueueMax":     cap(ps.queue),
+		"Sinks":        len(ps.sinks),
+		"SinksBlocked": blocked,
+	}
+}
+
 type pgEventSink struct {
 	channel chan *event
 	source  *pgEventSource
diff --git a/services/ws/router.go b/services/ws/router.go
index 1558a1d..19f7d18 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -2,10 +2,12 @@ package main
 
 import (
 	"database/sql"
+	"encoding/json"
 	"io"
 	"net/http"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/Sirupsen/logrus"
@@ -29,6 +31,16 @@ type router struct {
 
 	lastReqID  int64
 	lastReqMtx sync.Mutex
+
+	status routerStatus
+}
+
+type routerStatus struct {
+	Connections int64
+}
+
+type Statuser interface {
+	Status() interface{}
 }
 
 type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
@@ -37,6 +49,7 @@ func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+	rtr.mux.HandleFunc("/status.json", rtr.serveStatus)
 }
 
 func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
@@ -63,7 +76,6 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
 				"Elapsed": time.Now().Sub(t0).Seconds(),
 				"Stats":   stats,
 			}).Info("disconnect")
-
 			sink.Stop()
 			ws.Close()
 		}),
@@ -80,8 +92,32 @@ func (rtr *router) newReqID() string {
 	return strconv.FormatInt(id, 36)
 }
 
+func (rtr *router) Status() interface{} {
+	s := map[string]interface{}{
+		"Router": rtr.status,
+	}
+	if es, ok := rtr.eventSource.(Statuser); ok {
+		s["EventSource"] = es.Status()
+	}
+	return s
+}
+
+func (rtr *router) serveStatus(resp http.ResponseWriter, req *http.Request) {
+	rtr.setupOnce.Do(rtr.setup)
+	logger := logger(req.Context())
+	logger.Debug("status")
+	enc := json.NewEncoder(resp)
+	err := enc.Encode(rtr.Status())
+	if err != nil {
+		logger.WithError(err).Error("status encode failed")
+	}
+}
+
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rtr.setupOnce.Do(rtr.setup)
+	atomic.AddInt64(&rtr.status.Connections, 1)
+	defer atomic.AddInt64(&rtr.status.Connections, -1)
+
 	logger := logger(req.Context()).
 		WithField("RequestID", rtr.newReqID())
 	ctx := contextWithLogger(req.Context(), logger)

commit 88ab51c83676a75efbb7b83e7d17927816ecaac4
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 19 01:35:01 2016 -0500

    8460: Inject permChecker from main.

diff --git a/services/ws/main.go b/services/ws/main.go
index 16fe87f..33728dc 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -61,8 +61,9 @@ func main() {
 		WriteTimeout:   time.Minute,
 		MaxHeaderBytes: 1 << 20,
 		Handler: &router{
-			Config:      &cfg,
-			eventSource: eventSource,
+			Config:         &cfg,
+			eventSource:    eventSource,
+			newPermChecker: func() permChecker { return NewPermChecker(cfg.Client) },
 		},
 	}
 	eventSource.NewSink().Stop()
diff --git a/services/ws/router.go b/services/ws/router.go
index 6aef647..1558a1d 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -8,7 +8,6 @@ import (
 	"sync"
 	"time"
 
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/Sirupsen/logrus"
 	"golang.org/x/net/websocket"
 )
@@ -21,17 +20,18 @@ type wsConn interface {
 }
 
 type router struct {
-	Config *Config
+	Config         *Config
+	eventSource    eventSource
+	newPermChecker func() permChecker
 
-	eventSource eventSource
-	mux         *http.ServeMux
-	setupOnce   sync.Once
+	mux       *http.ServeMux
+	setupOnce sync.Once
 
 	lastReqID  int64
 	lastReqMtx sync.Mutex
 }
 
-type sessionFactory func(wsConn, chan<- interface{}, arvados.Client, *sql.DB) (session, error)
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
 
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
@@ -44,7 +44,7 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
 		NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
-			return newSession(ws, sendq, rtr.Config.Client, rtr.eventSource.DB())
+			return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
 		},
 	}
 	return &websocket.Server{
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 29a7ade..a60a4a3 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -8,7 +8,6 @@ import (
 	"sync/atomic"
 	"time"
 
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/Sirupsen/logrus"
 )
 
@@ -34,12 +33,12 @@ type v0session struct {
 	setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
 	sess := &v0session{
 		sendq:       sendq,
 		ws:          ws,
 		db:          db,
-		permChecker: NewPermChecker(ac),
+		permChecker: pc,
 		log:         logger(ws.Request().Context()),
 	}
 
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index 88e2414..763fe59 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -3,10 +3,8 @@ package main
 import (
 	"database/sql"
 	"errors"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-func NewSessionV1(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
 	return nil, errors.New("Not implemented")
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list