[ARVADOS] updated: b0ba939812720869fca0a75b07d42518d4953345
Git user
git at public.curoverse.com
Sat Nov 19 03:11:12 EST 2016
Summary of changes:
services/ws/handler.go | 53 +++++++++++++++++++++++++++++++----
services/ws/main.go | 5 ++--
services/ws/pg.go | 31 +++++++++++++++++----
services/ws/router.go | 70 +++++++++++++++++++++++++++++++++++------------
services/ws/session_v0.go | 5 ++--
services/ws/session_v1.go | 4 +--
6 files changed, 133 insertions(+), 35 deletions(-)
via b0ba939812720869fca0a75b07d42518d4953345 (commit)
via 05666b170533c79800bea1d472c284e23c9ec5e0 (commit)
via 88ab51c83676a75efbb7b83e7d17927816ecaac4 (commit)
from 74d5bfb293b2acf76d639df12ff8769bc333a5f2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b0ba939812720869fca0a75b07d42518d4953345
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Nov 19 03:10:24 2016 -0500
8460: Report status of outgoing queues.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 91a7702..d2c119a 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,6 +3,7 @@ package main
import (
"context"
"io"
+ "sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,7 +13,10 @@ type handler struct {
Client arvados.Client
PingTimeout time.Duration
QueueSize int
- NewSession func(wsConn, chan<- interface{}) (session, error)
+
+ mtx sync.Mutex
+ queues map[chan interface{}]struct{}
+ setupOnce sync.Once
}
type handlerStats struct {
@@ -22,13 +26,28 @@ type handlerStats struct {
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsConn, chan<- interface{}) (session, error)) (stats handlerStats) {
+ h.setupOnce.Do(h.setup)
+
ctx, cancel := context.WithCancel(ws.Request().Context())
log := logger(ctx)
+
+ incoming := eventSource.NewSink()
+ defer incoming.Stop()
+
queue := make(chan interface{}, h.QueueSize)
- sess, err := h.NewSession(ws, queue)
+ h.mtx.Lock()
+ h.queues[queue] = struct{}{}
+ h.mtx.Unlock()
+ defer func() {
+ h.mtx.Lock()
+ delete(h.queues, queue)
+ h.mtx.Unlock()
+ }()
+
+ sess, err := newSession(ws, queue)
if err != nil {
- log.WithError(err).Error("NewSession failed")
+ log.WithError(err).Error("newSession failed")
return
}
@@ -146,7 +165,7 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
}
}
continue
- case e, ok := <-incoming:
+ case e, ok := <-incoming.Channel():
if !ok {
cancel()
return
@@ -168,3 +187,27 @@ func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats)
<-ctx.Done()
return
}
+
+func (h *handler) Status() interface{} {
+ h.mtx.Lock()
+ defer h.mtx.Unlock()
+
+ var s struct {
+ QueueCount int
+ QueueMax int
+ QueueTotal uint64
+ }
+ for q := range h.queues {
+ n := len(q)
+ s.QueueTotal += uint64(n)
+ if s.QueueMax < n {
+ s.QueueMax = n
+ }
+ }
+ s.QueueCount = len(h.queues)
+ return &s
+}
+
+func (h *handler) setup() {
+ h.queues = make(map[chan interface{}]struct{})
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 4e2da08..b6b064e 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,6 +2,7 @@ package main
import (
"database/sql"
+ "fmt"
"strconv"
"strings"
"sync"
@@ -36,6 +37,8 @@ type pgEventSource struct {
setupOnce sync.Once
mtx sync.Mutex
shutdown chan error
+
+ lastQDelay time.Duration
}
func (ps *pgEventSource) setup() {
@@ -86,6 +89,7 @@ func (ps *pgEventSource) run() {
WithField("serial", e.Serial).
WithField("detail", e.Detail()).
Debug("event ready")
+ ps.lastQDelay = time.Now().Sub(e.Received)
ps.mtx.Lock()
for sink := range ps.sinks {
@@ -171,7 +175,8 @@ func (ps *pgEventSource) Status() interface{} {
}
return map[string]interface{}{
"Queue": len(ps.queue),
- "QueueMax": cap(ps.queue),
+ "QueueLimit": cap(ps.queue),
+ "QueueDelay": fmt.Sprintf("%.06f", ps.lastQDelay.Seconds()),
"Sinks": len(ps.sinks),
"SinksBlocked": blocked,
}
diff --git a/services/ws/router.go b/services/ws/router.go
index 19f7d18..18eaf73 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -26,6 +26,7 @@ type router struct {
eventSource eventSource
newPermChecker func() permChecker
+ handler *handler
mux *http.ServeMux
setupOnce sync.Once
@@ -46,6 +47,10 @@ type Statuser interface {
type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
func (rtr *router) setup() {
+ rtr.handler = &handler{
+ PingTimeout: rtr.Config.PingTimeout.Duration(),
+ QueueSize: rtr.Config.ClientEventQueue,
+ }
rtr.mux = http.NewServeMux()
rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
@@ -53,30 +58,24 @@ func (rtr *router) setup() {
}
func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
- handler := &handler{
- PingTimeout: rtr.Config.PingTimeout.Duration(),
- QueueSize: rtr.Config.ClientEventQueue,
- NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
- return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
- },
- }
return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
t0 := time.Now()
- sink := rtr.eventSource.NewSink()
log := logger(ws.Request().Context())
log.Info("connected")
- stats := handler.Handle(ws, sink.Channel())
+ stats := rtr.handler.Handle(ws, rtr.eventSource,
+ func(ws wsConn, sendq chan<- interface{}) (session, error) {
+ return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
+ })
log.WithFields(logrus.Fields{
"Elapsed": time.Now().Sub(t0).Seconds(),
"Stats": stats,
}).Info("disconnect")
- sink.Stop()
ws.Close()
}),
}
@@ -94,10 +93,11 @@ func (rtr *router) newReqID() string {
func (rtr *router) Status() interface{} {
s := map[string]interface{}{
- "Router": rtr.status,
+ "HTTP": rtr.status,
+ "Outgoing": rtr.handler.Status(),
}
if es, ok := rtr.eventSource.(Statuser); ok {
- s["EventSource"] = es.Status()
+ s["Incoming"] = es.Status()
}
return s
}
commit 05666b170533c79800bea1d472c284e23c9ec5e0
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Nov 19 02:15:57 2016 -0500
8460: Add /status.json
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 206cfeb..4e2da08 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -31,6 +31,7 @@ type pgEventSource struct {
db *sql.DB
pqListener *pq.Listener
+ queue chan *event
sinks map[*pgEventSink]bool
setupOnce sync.Once
mtx sync.Mutex
@@ -70,10 +71,10 @@ func (ps *pgEventSource) setup() {
}
func (ps *pgEventSource) run() {
- eventQueue := make(chan *event, ps.QueueSize)
+ ps.queue = make(chan *event, ps.QueueSize)
go func() {
- for e := range eventQueue {
+ for e := range ps.queue {
// Wait for the "select ... from logs" call to
// finish. This limits max concurrent queries
// to ps.QueueSize. Without this, max
@@ -103,7 +104,7 @@ func (ps *pgEventSource) run() {
if ok {
logger(nil).WithError(err).Info("shutdown")
}
- close(eventQueue)
+ close(ps.queue)
return
case <-ticker.C:
@@ -112,7 +113,7 @@ func (ps *pgEventSource) run() {
case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
- close(eventQueue)
+ close(ps.queue)
return
}
if pqEvent.Channel != "logs" {
@@ -131,7 +132,7 @@ func (ps *pgEventSource) run() {
db: ps.db,
}
logger(nil).WithField("event", e).Debug("incoming")
- eventQueue <- e
+ ps.queue <- e
go e.Detail()
}
}
@@ -161,6 +162,21 @@ func (ps *pgEventSource) DB() *sql.DB {
return ps.db
}
+func (ps *pgEventSource) Status() interface{} {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+ blocked := 0
+ for sink := range ps.sinks {
+ blocked += len(sink.channel)
+ }
+ return map[string]interface{}{
+ "Queue": len(ps.queue),
+ "QueueMax": cap(ps.queue),
+ "Sinks": len(ps.sinks),
+ "SinksBlocked": blocked,
+ }
+}
+
type pgEventSink struct {
channel chan *event
source *pgEventSource
diff --git a/services/ws/router.go b/services/ws/router.go
index 1558a1d..19f7d18 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -2,10 +2,12 @@ package main
import (
"database/sql"
+ "encoding/json"
"io"
"net/http"
"strconv"
"sync"
+ "sync/atomic"
"time"
"github.com/Sirupsen/logrus"
@@ -29,6 +31,16 @@ type router struct {
lastReqID int64
lastReqMtx sync.Mutex
+
+ status routerStatus
+}
+
+type routerStatus struct {
+ Connections int64
+}
+
+type Statuser interface {
+ Status() interface{}
}
type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
@@ -37,6 +49,7 @@ func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+ rtr.mux.HandleFunc("/status.json", rtr.serveStatus)
}
func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
@@ -63,7 +76,6 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
"Elapsed": time.Now().Sub(t0).Seconds(),
"Stats": stats,
}).Info("disconnect")
-
sink.Stop()
ws.Close()
}),
@@ -80,8 +92,32 @@ func (rtr *router) newReqID() string {
return strconv.FormatInt(id, 36)
}
+func (rtr *router) Status() interface{} {
+ s := map[string]interface{}{
+ "Router": rtr.status,
+ }
+ if es, ok := rtr.eventSource.(Statuser); ok {
+ s["EventSource"] = es.Status()
+ }
+ return s
+}
+
+func (rtr *router) serveStatus(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ logger := logger(req.Context())
+ logger.Debug("status")
+ enc := json.NewEncoder(resp)
+ err := enc.Encode(rtr.Status())
+ if err != nil {
+ logger.WithError(err).Error("status encode failed")
+ }
+}
+
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
+ atomic.AddInt64(&rtr.status.Connections, 1)
+ defer atomic.AddInt64(&rtr.status.Connections, -1)
+
logger := logger(req.Context()).
WithField("RequestID", rtr.newReqID())
ctx := contextWithLogger(req.Context(), logger)
commit 88ab51c83676a75efbb7b83e7d17927816ecaac4
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Nov 19 01:35:01 2016 -0500
8460: Inject permChecker from main.
diff --git a/services/ws/main.go b/services/ws/main.go
index 16fe87f..33728dc 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -61,8 +61,9 @@ func main() {
WriteTimeout: time.Minute,
MaxHeaderBytes: 1 << 20,
Handler: &router{
- Config: &cfg,
- eventSource: eventSource,
+ Config: &cfg,
+ eventSource: eventSource,
+ newPermChecker: func() permChecker { return NewPermChecker(cfg.Client) },
},
}
eventSource.NewSink().Stop()
diff --git a/services/ws/router.go b/services/ws/router.go
index 6aef647..1558a1d 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -8,7 +8,6 @@ import (
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Sirupsen/logrus"
"golang.org/x/net/websocket"
)
@@ -21,17 +20,18 @@ type wsConn interface {
}
type router struct {
- Config *Config
+ Config *Config
+ eventSource eventSource
+ newPermChecker func() permChecker
- eventSource eventSource
- mux *http.ServeMux
- setupOnce sync.Once
+ mux *http.ServeMux
+ setupOnce sync.Once
lastReqID int64
lastReqMtx sync.Mutex
}
-type sessionFactory func(wsConn, chan<- interface{}, arvados.Client, *sql.DB) (session, error)
+type sessionFactory func(wsConn, chan<- interface{}, *sql.DB, permChecker) (session, error)
func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
@@ -44,7 +44,7 @@ func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
- return newSession(ws, sendq, rtr.Config.Client, rtr.eventSource.DB())
+ return newSession(ws, sendq, rtr.eventSource.DB(), rtr.newPermChecker())
},
}
return &websocket.Server{
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 29a7ade..a60a4a3 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -8,7 +8,6 @@ import (
"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Sirupsen/logrus"
)
@@ -34,12 +33,12 @@ type v0session struct {
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV0(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
sess := &v0session{
sendq: sendq,
ws: ws,
db: db,
- permChecker: NewPermChecker(ac),
+ permChecker: pc,
log: logger(ws.Request().Context()),
}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index 88e2414..763fe59 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -3,10 +3,8 @@ package main
import (
"database/sql"
"errors"
-
- "git.curoverse.com/arvados.git/sdk/go/arvados"
)
-func NewSessionV1(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV1(ws wsConn, sendq chan<- interface{}, db *sql.DB, pc permChecker) (session, error) {
return nil, errors.New("Not implemented")
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list