[ARVADOS] updated: 1a17734f7264bc74463e1e6fe115cdad6ec4c521

Git user git at public.curoverse.com
Thu Nov 17 15:02:49 EST 2016


Summary of changes:
 sdk/python/tests/run_test_server.py |   2 +
 services/ws/config.go               |  11 ++--
 services/ws/event.go                |   5 +-
 services/ws/handler.go              | 124 +++++++++++++++++-------------------
 services/ws/log.go                  |  43 ++++---------
 services/ws/main.go                 |  24 +++++--
 services/ws/permission.go           |  11 ++--
 services/ws/pg.go                   |  28 ++++----
 services/ws/router.go               |  65 ++++++++++++-------
 services/ws/session.go              |   8 +--
 services/ws/session_v0.go           | 119 +++++++++++++++++-----------------
 services/ws/session_v1.go           |   2 +-
 12 files changed, 232 insertions(+), 210 deletions(-)

       via  1a17734f7264bc74463e1e6fe115cdad6ec4c521 (commit)
       via  d3a6d626ab4534865a14e8a34295a65e92036f37 (commit)
      from  f675fb2c202516021b961b5aa2de4528ba9f0d1f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 1a17734f7264bc74463e1e6fe115cdad6ec4c521
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Nov 17 14:49:46 2016 -0500

    8460: Refactor "old events / other messages" mechanism to use the outgoing message queue.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index 2b94693..3d42b9a 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -13,7 +13,7 @@ type handler struct {
 	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
-	NewSession  func(wsConn) (session, error)
+	NewSession  func(wsConn, chan<- interface{}) (session, error)
 }
 
 type handlerStats struct {
@@ -23,18 +23,19 @@ type handlerStats struct {
 	EventCount   uint64
 }
 
-func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
 	ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
 		"RemoteAddr": ws.Request().RemoteAddr,
 	}))
-	sess, err := h.NewSession(ws)
+
+	queue := make(chan interface{}, h.QueueSize)
+	sess, err := h.NewSession(ws, queue)
+	log := logger(ctx)
 	if err != nil {
-		logger(ctx).WithError(err).Error("NewSession failed")
+		log.WithError(err).Error("NewSession failed")
 		return
 	}
 
-	queue := make(chan interface{}, h.QueueSize)
-
 	stopped := make(chan struct{})
 	stop := make(chan error, 5)
 
@@ -48,54 +49,53 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 			}
 			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
 			n, err := ws.Read(buf)
-			logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
-			if err == nil && n == len(buf) {
+			buf := buf[:n]
+			log.WithField("frame", string(buf[:n])).Debug("received frame")
+			if err == nil && n == cap(buf) {
 				err = errFrameTooBig
 			}
 			if err != nil {
 				if err != io.EOF {
-					logger(ctx).WithError(err).Info("read error")
+					log.WithError(err).Info("read error")
 				}
 				stop <- err
 				return
 			}
 			msg := make(map[string]interface{})
-			err = json.Unmarshal(buf[:n], &msg)
+			err = json.Unmarshal(buf, &msg)
 			if err != nil {
-				logger(ctx).WithError(err).Info("invalid json from client")
+				log.WithError(err).Info("invalid json from client")
 				stop <- err
 				return
 			}
-			for _, buf := range sess.Receive(msg, buf[:n]) {
-				logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
-				queue <- buf
-			}
+			sess.Receive(msg, buf)
 		}
 	}()
 
 	go func() {
-		for e := range queue {
-			if buf, ok := e.([]byte); ok {
-				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
-				_, err := ws.Write(buf)
+		for data := range queue {
+			var e *event
+			var buf []byte
+			var err error
+			log := log
+
+			switch data := data.(type) {
+			case []byte:
+				buf = data
+			case *event:
+				e = data
+				log = log.WithField("serial", e.Serial)
+				buf, err = sess.EventMessage(e)
 				if err != nil {
-					logger(ctx).WithError(err).Error("write failed")
+					log.WithError(err).Error("EventMessage failed")
 					stop <- err
 					break
+				} else if len(buf) == 0 {
+					log.Debug("skip")
+					continue
 				}
-				continue
-			}
-			e := e.(*event)
-			log := logger(ctx).WithField("serial", e.Serial)
-
-			buf, err := sess.EventMessage(e)
-			if err != nil {
-				log.WithError(err).Error("EventMessage failed")
-				stop <- err
-				break
-			} else if len(buf) == 0 {
-				log.Debug("skip")
+			default:
+				log.WithField("data", data).Error("bad object in client queue")
 				continue
 			}
 
@@ -118,6 +118,8 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 			stats.EventCount++
 		}
 		for _ = range queue {
+			// Ensure queue can't fill up and block other
+			// goroutines after we hit a write error.
 		}
 	}()
 
@@ -127,20 +129,10 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 	// channel closes or the incoming event stream ends. Shut down
 	// the handler if the outgoing queue fills up.
 	go func() {
-		send := func(e *event) {
-			select {
-			case queue <- e:
-			default:
-				stop <- errQueueFull
-			}
-		}
-
 		ticker := time.NewTicker(h.PingTimeout)
 		defer ticker.Stop()
 
 		for {
-			var e *event
-			var ok bool
 			select {
 			case <-stopped:
 				close(queue)
@@ -155,14 +147,19 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 					queue <- []byte(`{}`)
 				}
 				continue
-			case e, ok = <-events:
+			case e, ok := <-incoming:
 				if !ok {
 					close(queue)
 					return
 				}
-			}
-			if sess.Filter(e) {
-				send(e)
+				if !sess.Filter(e) {
+					continue
+				}
+				select {
+				case queue <- e:
+				default:
+					stop <- errQueueFull
+				}
 			}
 		}
 	}()
diff --git a/services/ws/router.go b/services/ws/router.go
index 34656ad..e6cec0f 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -31,18 +31,20 @@ type router struct {
 	lastReqMtx sync.Mutex
 }
 
+type sessionFactory func(wsConn, chan<- interface{}, arvados.Client, *sql.DB) (session, error)
+
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
 }
 
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
 	handler := &handler{
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
-		NewSession: func(ws wsConn) (session, error) {
-			return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+		NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
+			return newSession(ws, sendq, rtr.Config.Client, rtr.eventSource.DB())
 		},
 	}
 	return &websocket.Server{
diff --git a/services/ws/session.go b/services/ws/session.go
index d148f59..9c3cef1 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,10 +1,8 @@
 package main
 
 type session interface {
-	// Receive processes a message received from the client. If
-	// the returned list of messages is non-nil, they will be
-	// queued for sending to the client.
-	Receive(map[string]interface{}, []byte) [][]byte
+	// Receive processes a message received from the client.
+	Receive(map[string]interface{}, []byte)
 
 	// Filter returns true if the event should be queued for
 	// sending to the client. It should return as fast as
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 210c8c5..4143282 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -23,16 +24,19 @@ var (
 
 type v0session struct {
 	ws            wsConn
+	sendq         chan<- interface{}
 	db            *sql.DB
 	permChecker   permChecker
 	subscriptions []v0subscribe
+	lastMsgID     uint64
 	log           *log.Entry
 	mtx           sync.Mutex
 	setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
 	sess := &v0session{
+		sendq:       sendq,
 		ws:          ws,
 		db:          db,
 		permChecker: NewPermChecker(ac),
@@ -51,23 +55,24 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
 	return sess, nil
 }
 
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) {
 	sess.log.WithField("data", msg).Debug("received message")
 	var sub v0subscribe
 	if err := json.Unmarshal(buf, &sub); err != nil {
 		sess.log.WithError(err).Info("ignored invalid request")
-		return nil
+		return
 	}
 	if sub.Method == "subscribe" {
 		sub.prepare(sess)
 		sess.log.WithField("sub", sub).Debug("sub prepared")
+		sess.sendq <- v0subscribeOK
 		sess.mtx.Lock()
 		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
-
-		return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
+		sub.sendOldEvents(sess)
+		return
 	}
-	return [][]byte{v0subscribeFail}
+	sess.sendq <- v0subscribeFail
 }
 
 func (sess *v0session) EventMessage(e *event) ([]byte, error) {
@@ -82,7 +87,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
 	}
 
 	msg := map[string]interface{}{
-		"msgID":             e.Serial,
+		"msgID":             atomic.AddUint64(&sess.lastMsgID, 1),
 		"id":                detail.ID,
 		"uuid":              detail.UUID,
 		"object_uuid":       detail.ObjectUUID,
@@ -122,7 +127,7 @@ func (sess *v0session) Filter(e *event) bool {
 	return false
 }
 
-func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
 	if sub.LastLogID == 0 {
 		return
 	}
@@ -151,27 +156,27 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
 			sess.log.WithError(err).Error("row Scan failed")
 			continue
 		}
+		for len(sess.sendq)*2 > cap(sess.sendq) {
+			// Ugly... but if we fill up the whole client
+			// queue with a backlog of old events, a
+			// single new event will overflow it and
+			// terminate the connection, and then the
+			// client will probably reconnect and do the
+			// same thing all over again.
+			time.Sleep(100 * time.Millisecond)
+		}
 		e := &event{
 			LogID:    id,
 			Received: time.Now(),
 			db:       sess.db,
 		}
-		if !sub.match(sess, e) {
-			sess.log.WithField("event", e).Debug("skip old event")
-			continue
-		}
-		msg, err := sess.EventMessage(e)
-		if err != nil {
-			sess.log.WithError(err).Error("event marshal failed")
-			continue
+		if sub.match(sess, e) {
+			sess.sendq <- e
 		}
-		sess.log.WithField("data", msg).Debug("will queue old event")
-		msgs = append(msgs, msg)
 	}
 	if err := rows.Err(); err != nil {
 		sess.log.WithError(err).Error("db.Query failed")
 	}
-	return
 }
 
 type v0subscribe struct {
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index 60d12c4..88e2414 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -7,6 +7,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV1(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
 	return nil, errors.New("Not implemented")
 }

commit d3a6d626ab4534865a14e8a34295a65e92036f37
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Nov 17 14:10:15 2016 -0500

    8460: Structured logging.

diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index 5ef5e2a..bd37daa 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -379,6 +379,7 @@ Client:
   APIHost: {}
   Insecure: true
 Listen: :{}
+LogLevel: {}
 Postgres:
   host: {}
   dbname: {}
@@ -387,6 +388,7 @@ Postgres:
   sslmode: require
         """.format(os.environ['ARVADOS_API_HOST'],
                    port,
+                   ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
                    _dbconfig('host'),
                    _dbconfig('database'),
                    _dbconfig('username'),
diff --git a/services/ws/config.go b/services/ws/config.go
index 9c2e80a..e2d69d0 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -7,10 +7,11 @@ import (
 )
 
 type Config struct {
-	Client   arvados.Client
-	Postgres pgConfig
-	Listen   string
-	Debug    bool
+	Client    arvados.Client
+	Postgres  pgConfig
+	Listen    string
+	LogLevel  string
+	LogFormat string
 
 	PingTimeout      arvados.Duration
 	ClientEventQueue int
@@ -30,6 +31,8 @@ func DefaultConfig() Config {
 			"connect_timeout": "30",
 			"sslmode":         "require",
 		},
+		LogLevel:         "info",
+		LogFormat:        "json",
 		PingTimeout:      arvados.Duration(time.Minute),
 		ClientEventQueue: 64,
 		ServerEventQueue: 4,
diff --git a/services/ws/event.go b/services/ws/event.go
index 77acf44..280035b 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -2,7 +2,6 @@ package main
 
 import (
 	"database/sql"
-	"log"
 	"sync"
 	"time"
 
@@ -51,12 +50,12 @@ func (e *event) Detail() *arvados.Log {
 		&logRow.CreatedAt,
 		&propYAML)
 	if e.err != nil {
-		log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+		logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
 		return nil
 	}
 	e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
 	if e.err != nil {
-		log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+		logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
 		return nil
 	}
 	e.logRow = &logRow
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 1470c66..2b94693 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,20 +3,12 @@ package main
 import (
 	"encoding/json"
 	"io"
-	"log"
-	"net/http"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	log "github.com/Sirupsen/logrus"
 )
 
-type wsConn interface {
-	io.ReadWriter
-	Request() *http.Request
-	SetReadDeadline(time.Time) error
-	SetWriteDeadline(time.Time) error
-}
-
 type handler struct {
 	Client      arvados.Client
 	PingTimeout time.Duration
@@ -25,16 +17,19 @@ type handler struct {
 }
 
 type handlerStats struct {
-	QueueDelay time.Duration
-	WriteDelay time.Duration
-	EventBytes uint64
-	EventCount uint64
+	QueueDelayNs time.Duration
+	WriteDelayNs time.Duration
+	EventBytes   uint64
+	EventCount   uint64
 }
 
 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+	ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
+		"RemoteAddr": ws.Request().RemoteAddr,
+	}))
 	sess, err := h.NewSession(ws)
 	if err != nil {
-		log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+		logger(ctx).WithError(err).Error("NewSession failed")
 		return
 	}
 
@@ -53,13 +48,13 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 			}
 			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
 			n, err := ws.Read(buf)
-			sess.debugLogf("received frame: %q", buf[:n])
+			logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
 			if err == nil && n == len(buf) {
 				err = errFrameTooBig
 			}
 			if err != nil {
 				if err != io.EOF {
-					sess.debugLogf("handler: read: %s", err)
+					logger(ctx).WithError(err).Info("read error")
 				}
 				stop <- err
 				return
@@ -67,12 +62,12 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 			msg := make(map[string]interface{})
 			err = json.Unmarshal(buf[:n], &msg)
 			if err != nil {
-				sess.debugLogf("handler: unmarshal: %s", err)
+				logger(ctx).WithError(err).Info("invalid json from client")
 				stop <- err
 				return
 			}
 			for _, buf := range sess.Receive(msg, buf[:n]) {
-				sess.debugLogf("handler: to queue: %s", string(buf))
+				logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
 				queue <- buf
 			}
 		}
@@ -82,39 +77,43 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 		for e := range queue {
 			if buf, ok := e.([]byte); ok {
 				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				sess.debugLogf("handler: send msg: %s", string(buf))
+				logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
 				_, err := ws.Write(buf)
 				if err != nil {
-					sess.debugLogf("handler: write {}: %s", err)
+					logger(ctx).WithError(err).Error("write failed")
 					stop <- err
 					break
 				}
 				continue
 			}
 			e := e.(*event)
+			log := logger(ctx).WithField("serial", e.Serial)
 
 			buf, err := sess.EventMessage(e)
 			if err != nil {
-				sess.debugLogf("EventMessage %d: err %s", err)
+				log.WithError(err).Error("EventMessage failed")
 				stop <- err
 				break
 			} else if len(buf) == 0 {
-				sess.debugLogf("EventMessage %d: skip", e.Serial)
+				log.Debug("skip")
 				continue
 			}
 
-			sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+			log.WithField("frame", string(buf)).Debug("send event")
 			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
 			t0 := time.Now()
 			_, err = ws.Write(buf)
 			if err != nil {
-				sess.debugLogf("handler: write: %s", err)
+				log.WithError(err).Error("write failed")
 				stop <- err
 				break
 			}
-			sess.debugLogf("handler: sent event %d", e.Serial)
-			stats.WriteDelay += time.Since(t0)
-			stats.QueueDelay += t0.Sub(e.Received)
+			log.Debug("sent")
+
+			if e != nil {
+				stats.QueueDelayNs += t0.Sub(e.Received)
+			}
+			stats.WriteDelayNs += time.Since(t0)
 			stats.EventBytes += uint64(len(buf))
 			stats.EventCount++
 		}
diff --git a/services/ws/log.go b/services/ws/log.go
index 1511691..d3aa82d 100644
--- a/services/ws/log.go
+++ b/services/ws/log.go
@@ -1,41 +1,22 @@
 package main
 
 import (
-	"encoding/json"
-	"fmt"
-	"log"
-	"time"
-)
-
-func init() {
-	log.SetFlags(0)
-}
+	"context"
 
-func errorLogf(f string, args ...interface{}) {
-	log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+	log "github.com/Sirupsen/logrus"
+)
 
-var debugLogf = func(f string, args ...interface{}) {
-	log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+var loggerCtxKey = new(int)
 
-func mustMarshal(v interface{}) []byte {
-	buf, err := json.Marshal(v)
-	if err != nil {
-		panic(err)
-	}
-	return buf
+func contextWithLogger(ctx context.Context, logger *log.Entry) context.Context {
+	return context.WithValue(ctx, loggerCtxKey, logger)
 }
 
-func logj(args ...interface{}) {
-	m := map[string]interface{}{"Time": time.Now().UTC()}
-	for i := 0; i < len(args)-1; i += 2 {
-		m[fmt.Sprintf("%s", args[i])] = args[i+1]
-	}
-	buf, err := json.Marshal(m)
-	if err != nil {
-		errorLogf("logj: %s", err)
-		return
+func logger(ctx context.Context) *log.Entry {
+	if ctx != nil {
+		if logger, ok := ctx.Value(loggerCtxKey).(*log.Entry); ok {
+			return logger
+		}
 	}
-	log.Print(string(buf))
+	return log.WithFields(nil)
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index 719128f..c83f8d9 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -3,11 +3,11 @@ package main
 import (
 	"flag"
 	"fmt"
-	"log"
 	"net/http"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/config"
+	log "github.com/Sirupsen/logrus"
 )
 
 func main() {
@@ -20,8 +20,24 @@ func main() {
 	if err != nil {
 		log.Fatal(err)
 	}
-	if !cfg.Debug {
-		debugLogf = func(string, ...interface{}) {}
+
+	lvl, err := log.ParseLevel(cfg.LogLevel)
+	if err != nil {
+		log.Fatal(err)
+	}
+	log.SetLevel(lvl)
+	switch cfg.LogFormat {
+	case "text":
+		log.SetFormatter(&log.TextFormatter{
+			FullTimestamp:   true,
+			TimestampFormat: time.RFC3339Nano,
+		})
+	case "json":
+		log.SetFormatter(&log.JSONFormatter{
+			TimestampFormat: time.RFC3339Nano,
+		})
+	default:
+		log.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
 	}
 
 	if *dumpConfig {
@@ -49,6 +65,6 @@ func main() {
 	}
 	eventSource.NewSink().Stop()
 
-	log.Printf("listening at %s", srv.Addr)
+	log.WithField("Listen", srv.Addr).Info("listening")
 	log.Fatal(srv.ListenAndServe())
 }
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 1dc06b8..30276e4 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -43,10 +43,13 @@ func (pc *cachingPermChecker) SetToken(token string) {
 }
 
 func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+	logger := logger(nil).
+		WithField("token", pc.Client.AuthToken).
+		WithField("uuid", uuid)
 	pc.tidy()
 	now := time.Now()
 	if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
-		debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+		logger.WithField("allowed", perm.allowed).Debug("cache hit")
 		return perm.allowed, nil
 	}
 	var buf map[string]interface{}
@@ -61,13 +64,13 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	var allowed bool
 	if err == nil {
 		allowed = true
-	} else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+	} else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
 		allowed = false
 	} else {
-		errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+		logger.WithError(err).Error("lookup error")
 		return false, err
 	}
-	debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+	logger.WithField("allowed", allowed).Debug("cache miss")
 	pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
 	return allowed, nil
 }
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 08fbee1..206cfeb 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,8 +2,6 @@ package main
 
 import (
 	"database/sql"
-	"fmt"
-	"log"
 	"strconv"
 	"strings"
 	"sync"
@@ -45,10 +43,10 @@ func (ps *pgEventSource) setup() {
 
 	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
-		log.Fatalf("sql.Open: %s", err)
+		logger(nil).WithError(err).Fatal("sql.Open failed")
 	}
 	if err = db.Ping(); err != nil {
-		log.Fatalf("db.Ping: %s", err)
+		logger(nil).WithError(err).Fatal("db.Ping failed")
 	}
 	ps.db = db
 
@@ -58,14 +56,15 @@ func (ps *pgEventSource) setup() {
 			// on missed events, we cannot recover from a
 			// dropped connection without breaking our
 			// promises to clients.
-			ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+			logger(nil).WithError(err).Error("listener problem")
+			ps.shutdown <- err
 		}
 	})
 	err = ps.pqListener.Listen("logs")
 	if err != nil {
-		log.Fatal(err)
+		logger(nil).WithError(err).Fatal("pq Listen failed")
 	}
-	debugLogf("pgEventSource listening")
+	logger(nil).Debug("pgEventSource listening")
 
 	go ps.run()
 }
@@ -81,7 +80,12 @@ func (ps *pgEventSource) run() {
 			// concurrent queries would be bounded by
 			// client_count X client_queue_size.
 			e.Detail()
-			debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+			logger(nil).
+				WithField("serial", e.Serial).
+				WithField("detail", e.Detail()).
+				Debug("event ready")
+
 			ps.mtx.Lock()
 			for sink := range ps.sinks {
 				sink.channel <- e
@@ -97,13 +101,13 @@ func (ps *pgEventSource) run() {
 		select {
 		case err, ok := <-ps.shutdown:
 			if ok {
-				debugLogf("shutdown on error: %s", err)
+				logger(nil).WithError(err).Info("shutdown")
 			}
 			close(eventQueue)
 			return
 
 		case <-ticker.C:
-			debugLogf("pgEventSource listener ping")
+			logger(nil).Debug("listener ping")
 			ps.pqListener.Ping()
 
 		case pqEvent, ok := <-ps.pqListener.Notify:
@@ -116,7 +120,7 @@ func (ps *pgEventSource) run() {
 			}
 			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
 			if err != nil {
-				log.Printf("bad notify payload: %+v", pqEvent)
+				logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
 				continue
 			}
 			serial++
@@ -126,7 +130,7 @@ func (ps *pgEventSource) run() {
 				Serial:   serial,
 				db:       ps.db,
 			}
-			debugLogf("event %d %+v", e.Serial, e)
+			logger(nil).WithField("event", e).Debug("incoming")
 			eventQueue <- e
 			go e.Detail()
 		}
diff --git a/services/ws/router.go b/services/ws/router.go
index 2a4e52e..34656ad 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -2,22 +2,33 @@ package main
 
 import (
 	"database/sql"
-	"encoding/json"
-	"log"
+	"io"
 	"net/http"
+	"strconv"
 	"sync"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	log "github.com/Sirupsen/logrus"
 	"golang.org/x/net/websocket"
 )
 
+type wsConn interface {
+	io.ReadWriter
+	Request() *http.Request
+	SetReadDeadline(time.Time) error
+	SetWriteDeadline(time.Time) error
+}
+
 type router struct {
 	Config *Config
 
 	eventSource eventSource
 	mux         *http.ServeMux
 	setupOnce   sync.Once
+
+	lastReqID  int64
+	lastReqMtx sync.Mutex
 }
 
 func (rtr *router) setup() {
@@ -30,7 +41,7 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
 	handler := &handler{
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
-		NewSession:  func(ws wsConn) (session, error) {
+		NewSession: func(ws wsConn) (session, error) {
 			return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
 		},
 	}
@@ -39,17 +50,16 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
-			logj("Type", "connect",
-				"RemoteAddr", ws.Request().RemoteAddr)
 			t0 := time.Now()
-
 			sink := rtr.eventSource.NewSink()
+			logger(ws.Request().Context()).Info("connected")
+
 			stats := handler.Handle(ws, sink.Channel())
 
-			logj("Type", "disconnect",
-				"RemoteAddr", ws.Request().RemoteAddr,
-				"Elapsed", time.Now().Sub(t0).Seconds(),
-				"Stats", stats)
+			logger(ws.Request().Context()).WithFields(log.Fields{
+				"Elapsed": time.Now().Sub(t0).Seconds(),
+				"Stats":   stats,
+			}).Info("disconnect")
 
 			sink.Stop()
 			ws.Close()
@@ -57,18 +67,25 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
 	}
 }
 
+func (rtr *router) newReqID() string {
+	rtr.lastReqMtx.Lock()
+	defer rtr.lastReqMtx.Unlock()
+	id := time.Now().UnixNano()
+	if id <= rtr.lastReqID {
+		id = rtr.lastReqID + 1
+	}
+	return strconv.FormatInt(id, 36)
+}
+
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rtr.setupOnce.Do(rtr.setup)
-	logj("Type", "request",
-		"RemoteAddr", req.RemoteAddr,
-		"X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+	logger := logger(req.Context()).
+		WithField("RequestID", rtr.newReqID())
+	ctx := contextWithLogger(req.Context(), logger)
+	req = req.WithContext(ctx)
+	logger.WithFields(log.Fields{
+		"RemoteAddr":      req.RemoteAddr,
+		"X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+	}).Info("accept request")
 	rtr.mux.ServeHTTP(resp, req)
 }
-
-func reqLog(m map[string]interface{}) {
-	j, err := json.Marshal(m)
-	if err != nil {
-		log.Fatal(err)
-	}
-	log.Print(string(j))
-}
diff --git a/services/ws/session.go b/services/ws/session.go
index a0658d9..d148f59 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -22,6 +22,4 @@ type session interface {
 	// incoming events will be queued. If the event queue fills
 	// up, the connection will be dropped.
 	EventMessage(*event) ([]byte, error)
-
-	debugLogf(string, ...interface{})
 }
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 33cdb2f..210c8c5 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -4,11 +4,11 @@ import (
 	"database/sql"
 	"encoding/json"
 	"errors"
-	"log"
 	"sync"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	log "github.com/Sirupsen/logrus"
 )
 
 var (
@@ -26,6 +26,7 @@ type v0session struct {
 	db            *sql.DB
 	permChecker   permChecker
 	subscriptions []v0subscribe
+	log           *log.Entry
 	mtx           sync.Mutex
 	setupOnce     sync.Once
 }
@@ -35,35 +36,31 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
 		ws:          ws,
 		db:          db,
 		permChecker: NewPermChecker(ac),
+		log:         logger(ws.Request().Context()),
 	}
 
 	err := ws.Request().ParseForm()
 	if err != nil {
-		log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+		sess.log.WithError(err).Error("ParseForm failed")
 		return nil, err
 	}
 	token := ws.Request().Form.Get("api_token")
 	sess.permChecker.SetToken(token)
-	sess.debugLogf("token = %+q", token)
+	sess.log.WithField("token", token).Debug("set token")
 
 	return sess, nil
 }
 
-func (sess *v0session) debugLogf(s string, args ...interface{}) {
-	args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
-	debugLogf("%s "+s, args...)
-}
-
 func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
-	sess.debugLogf("received message: %+v", msg)
+	sess.log.WithField("data", msg).Debug("received message")
 	var sub v0subscribe
 	if err := json.Unmarshal(buf, &sub); err != nil {
-		sess.debugLogf("ignored unrecognized request: %s", err)
+		sess.log.WithError(err).Info("ignored invalid request")
 		return nil
 	}
 	if sub.Method == "subscribe" {
-		sub.prepare()
-		sess.debugLogf("subscription: %v", sub)
+		sub.prepare(sess)
+		sess.log.WithField("sub", sub).Debug("sub prepared")
 		sess.mtx.Lock()
 		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
@@ -118,7 +115,7 @@ func (sess *v0session) Filter(e *event) bool {
 	sess.mtx.Lock()
 	defer sess.mtx.Unlock()
 	for _, sub := range sess.subscriptions {
-		if sub.match(e) {
+		if sub.match(sess, e) {
 			return true
 		}
 	}
@@ -129,7 +126,7 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
 	if sub.LastLogID == 0 {
 		return
 	}
-	debugLogf("getOldEvents(%d)", sub.LastLogID)
+	sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
 	// Here we do a "select id" query and queue an event for every
 	// log since the given ID, then use (*event)Detail() to
 	// retrieve the whole row and decide whether to send it. This
@@ -144,14 +141,14 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
 		sub.LastLogID,
 		time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
 	if err != nil {
-		errorLogf("db.Query: %s", err)
+		sess.log.WithError(err).Error("db.Query failed")
 		return
 	}
 	for rows.Next() {
 		var id uint64
 		err := rows.Scan(&id)
 		if err != nil {
-			errorLogf("Scan: %s", err)
+			sess.log.WithError(err).Error("row Scan failed")
 			continue
 		}
 		e := &event{
@@ -159,20 +156,20 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
 			Received: time.Now(),
 			db:       sess.db,
 		}
-		if !sub.match(e) {
-			debugLogf("skip old event %+v", e)
+		if !sub.match(sess, e) {
+			sess.log.WithField("event", e).Debug("skip old event")
 			continue
 		}
 		msg, err := sess.EventMessage(e)
 		if err != nil {
-			debugLogf("event marshal: %s", err)
+			sess.log.WithError(err).Error("event marshal failed")
 			continue
 		}
-		debugLogf("old event: %s", string(msg))
+		sess.log.WithField("data", msg).Debug("will queue old event")
 		msgs = append(msgs, msg)
 	}
 	if err := rows.Err(); err != nil {
-		errorLogf("db.Query: %s", err)
+		sess.log.WithError(err).Error("db.Query failed")
 	}
 	return
 }
@@ -187,23 +184,25 @@ type v0subscribe struct {
 
 type v0filter [3]interface{}
 
-func (sub *v0subscribe) match(e *event) bool {
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+	log := sess.log.WithField("LogID", e.LogID)
 	detail := e.Detail()
 	if detail == nil {
-		debugLogf("match(%d): failed on no detail", e.LogID)
+		log.Error("match failed, no detail")
 		return false
 	}
+	log = log.WithField("funcs", len(sub.funcs))
 	for i, f := range sub.funcs {
 		if !f(e) {
-			debugLogf("match(%d): failed on func %d", e.LogID, i)
+			log.WithField("func", i).Debug("match failed")
 			return false
 		}
 	}
-	debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+	log.Debug("match passed")
 	return true
 }
 
-func (sub *v0subscribe) prepare() {
+func (sub *v0subscribe) prepare(sess *v0session) {
 	for _, f := range sub.Filters {
 		if len(f) != 3 {
 			continue
@@ -224,7 +223,6 @@ func (sub *v0subscribe) prepare() {
 				}
 			}
 			sub.funcs = append(sub.funcs, func(e *event) bool {
-				debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
 				for _, s := range strs {
 					if s == e.Detail().EventType {
 						return true
@@ -243,36 +241,36 @@ func (sub *v0subscribe) prepare() {
 			}
 			t, err := time.Parse(time.RFC3339Nano, tstr)
 			if err != nil {
-				debugLogf("time.Parse(%q): %s", tstr, err)
+				sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
 				continue
 			}
+			var fn func(*event) bool
 			switch op {
 			case ">=":
-				sub.funcs = append(sub.funcs, func(e *event) bool {
-					debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+				fn = func(e *event) bool {
 					return !e.Detail().CreatedAt.Before(t)
-				})
+				}
 			case "<=":
-				sub.funcs = append(sub.funcs, func(e *event) bool {
-					debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+				fn = func(e *event) bool {
 					return !e.Detail().CreatedAt.After(t)
-				})
+				}
 			case ">":
-				sub.funcs = append(sub.funcs, func(e *event) bool {
-					debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+				fn = func(e *event) bool {
 					return e.Detail().CreatedAt.After(t)
-				})
+				}
 			case "<":
-				sub.funcs = append(sub.funcs, func(e *event) bool {
-					debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+				fn = func(e *event) bool {
 					return e.Detail().CreatedAt.Before(t)
-				})
+				}
 			case "=":
-				sub.funcs = append(sub.funcs, func(e *event) bool {
-					debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+				fn = func(e *event) bool {
 					return e.Detail().CreatedAt.Equal(t)
-				})
+				}
+			default:
+				sess.log.WithField("operator", op).Info("bogus operator")
+				continue
 			}
+			sub.funcs = append(sub.funcs, fn)
 		}
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list