[ARVADOS] updated: 1a17734f7264bc74463e1e6fe115cdad6ec4c521
Git user
git at public.curoverse.com
Thu Nov 17 15:02:49 EST 2016
Summary of changes:
sdk/python/tests/run_test_server.py | 2 +
services/ws/config.go | 11 ++--
services/ws/event.go | 5 +-
services/ws/handler.go | 124 +++++++++++++++++-------------------
services/ws/log.go | 43 ++++---------
services/ws/main.go | 24 +++++--
services/ws/permission.go | 11 ++--
services/ws/pg.go | 28 ++++----
services/ws/router.go | 65 ++++++++++++-------
services/ws/session.go | 8 +--
services/ws/session_v0.go | 119 +++++++++++++++++-----------------
services/ws/session_v1.go | 2 +-
12 files changed, 232 insertions(+), 210 deletions(-)
via 1a17734f7264bc74463e1e6fe115cdad6ec4c521 (commit)
via d3a6d626ab4534865a14e8a34295a65e92036f37 (commit)
from f675fb2c202516021b961b5aa2de4528ba9f0d1f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1a17734f7264bc74463e1e6fe115cdad6ec4c521
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Nov 17 14:49:46 2016 -0500
8460: Refactor "old events / other messages" mechanism to use the outgoing message queue.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 2b94693..3d42b9a 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -13,7 +13,7 @@ type handler struct {
Client arvados.Client
PingTimeout time.Duration
QueueSize int
- NewSession func(wsConn) (session, error)
+ NewSession func(wsConn, chan<- interface{}) (session, error)
}
type handlerStats struct {
@@ -23,18 +23,19 @@ type handlerStats struct {
EventCount uint64
}
-func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+func (h *handler) Handle(ws wsConn, incoming <-chan *event) (stats handlerStats) {
ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
"RemoteAddr": ws.Request().RemoteAddr,
}))
- sess, err := h.NewSession(ws)
+
+ queue := make(chan interface{}, h.QueueSize)
+ sess, err := h.NewSession(ws, queue)
+ log := logger(ctx)
if err != nil {
- logger(ctx).WithError(err).Error("NewSession failed")
+ log.WithError(err).Error("NewSession failed")
return
}
- queue := make(chan interface{}, h.QueueSize)
-
stopped := make(chan struct{})
stop := make(chan error, 5)
@@ -48,54 +49,53 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
}
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
- logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
- if err == nil && n == len(buf) {
+ buf := buf[:n]
+ log.WithField("frame", string(buf[:n])).Debug("received frame")
+ if err == nil && n == cap(buf) {
err = errFrameTooBig
}
if err != nil {
if err != io.EOF {
- logger(ctx).WithError(err).Info("read error")
+ log.WithError(err).Info("read error")
}
stop <- err
return
}
msg := make(map[string]interface{})
- err = json.Unmarshal(buf[:n], &msg)
+ err = json.Unmarshal(buf, &msg)
if err != nil {
- logger(ctx).WithError(err).Info("invalid json from client")
+ log.WithError(err).Info("invalid json from client")
stop <- err
return
}
- for _, buf := range sess.Receive(msg, buf[:n]) {
- logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
- queue <- buf
- }
+ sess.Receive(msg, buf)
}
}()
go func() {
- for e := range queue {
- if buf, ok := e.([]byte); ok {
- ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
- logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
- _, err := ws.Write(buf)
+ for data := range queue {
+ var e *event
+ var buf []byte
+ var err error
+ log := log
+
+ switch data := data.(type) {
+ case []byte:
+ buf = data
+ case *event:
+ e = data
+ log = log.WithField("serial", e.Serial)
+ buf, err = sess.EventMessage(e)
if err != nil {
- logger(ctx).WithError(err).Error("write failed")
+ log.WithError(err).Error("EventMessage failed")
stop <- err
break
+ } else if len(buf) == 0 {
+ log.Debug("skip")
+ continue
}
- continue
- }
- e := e.(*event)
- log := logger(ctx).WithField("serial", e.Serial)
-
- buf, err := sess.EventMessage(e)
- if err != nil {
- log.WithError(err).Error("EventMessage failed")
- stop <- err
- break
- } else if len(buf) == 0 {
- log.Debug("skip")
+ default:
+ log.WithField("data", data).Error("bad object in client queue")
continue
}
@@ -118,6 +118,8 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
stats.EventCount++
}
for _ = range queue {
+ // Ensure queue can't fill up and block other
+ // goroutines after we hit a write error.
}
}()
@@ -127,20 +129,10 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
// channel closes or the incoming event stream ends. Shut down
// the handler if the outgoing queue fills up.
go func() {
- send := func(e *event) {
- select {
- case queue <- e:
- default:
- stop <- errQueueFull
- }
- }
-
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
for {
- var e *event
- var ok bool
select {
case <-stopped:
close(queue)
@@ -155,14 +147,19 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
queue <- []byte(`{}`)
}
continue
- case e, ok = <-events:
+ case e, ok := <-incoming:
if !ok {
close(queue)
return
}
- }
- if sess.Filter(e) {
- send(e)
+ if !sess.Filter(e) {
+ continue
+ }
+ select {
+ case queue <- e:
+ default:
+ stop <- errQueueFull
+ }
}
}
}()
diff --git a/services/ws/router.go b/services/ws/router.go
index 34656ad..e6cec0f 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -31,18 +31,20 @@ type router struct {
lastReqMtx sync.Mutex
}
+type sessionFactory func(wsConn, chan<- interface{}, arvados.Client, *sql.DB) (session, error)
+
func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
}
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession sessionFactory) *websocket.Server {
handler := &handler{
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
- NewSession: func(ws wsConn) (session, error) {
- return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+ NewSession: func(ws wsConn, sendq chan<- interface{}) (session, error) {
+ return newSession(ws, sendq, rtr.Config.Client, rtr.eventSource.DB())
},
}
return &websocket.Server{
diff --git a/services/ws/session.go b/services/ws/session.go
index d148f59..9c3cef1 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,10 +1,8 @@
package main
type session interface {
- // Receive processes a message received from the client. If
- // the returned list of messages is non-nil, they will be
- // queued for sending to the client.
- Receive(map[string]interface{}, []byte) [][]byte
+ // Receive processes a message received from the client.
+ Receive(map[string]interface{}, []byte)
// Filter returns true if the event should be queued for
// sending to the client. It should return as fast as
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 210c8c5..4143282 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"sync"
+ "sync/atomic"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -23,16 +24,19 @@ var (
type v0session struct {
ws wsConn
+ sendq chan<- interface{}
db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
+ lastMsgID uint64
log *log.Entry
mtx sync.Mutex
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV0(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
sess := &v0session{
+ sendq: sendq,
ws: ws,
db: db,
permChecker: NewPermChecker(ac),
@@ -51,23 +55,24 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
return sess, nil
}
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) {
sess.log.WithField("data", msg).Debug("received message")
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
sess.log.WithError(err).Info("ignored invalid request")
- return nil
+ return
}
if sub.Method == "subscribe" {
sub.prepare(sess)
sess.log.WithField("sub", sub).Debug("sub prepared")
+ sess.sendq <- v0subscribeOK
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
-
- return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
+ sub.sendOldEvents(sess)
+ return
}
- return [][]byte{v0subscribeFail}
+ sess.sendq <- v0subscribeFail
}
func (sess *v0session) EventMessage(e *event) ([]byte, error) {
@@ -82,7 +87,7 @@ func (sess *v0session) EventMessage(e *event) ([]byte, error) {
}
msg := map[string]interface{}{
- "msgID": e.Serial,
+ "msgID": atomic.AddUint64(&sess.lastMsgID, 1),
"id": detail.ID,
"uuid": detail.UUID,
"object_uuid": detail.ObjectUUID,
@@ -122,7 +127,7 @@ func (sess *v0session) Filter(e *event) bool {
return false
}
-func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+func (sub *v0subscribe) sendOldEvents(sess *v0session) {
if sub.LastLogID == 0 {
return
}
@@ -151,27 +156,27 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
sess.log.WithError(err).Error("row Scan failed")
continue
}
+ for len(sess.sendq)*2 > cap(sess.sendq) {
+ // Ugly... but if we fill up the whole client
+ // queue with a backlog of old events, a
+ // single new event will overflow it and
+ // terminate the connection, and then the
+ // client will probably reconnect and do the
+ // same thing all over again.
+ time.Sleep(100 * time.Millisecond)
+ }
e := &event{
LogID: id,
Received: time.Now(),
db: sess.db,
}
- if !sub.match(sess, e) {
- sess.log.WithField("event", e).Debug("skip old event")
- continue
- }
- msg, err := sess.EventMessage(e)
- if err != nil {
- sess.log.WithError(err).Error("event marshal failed")
- continue
+ if sub.match(sess, e) {
+ sess.sendq <- e
}
- sess.log.WithField("data", msg).Debug("will queue old event")
- msgs = append(msgs, msg)
}
if err := rows.Err(); err != nil {
sess.log.WithError(err).Error("db.Query failed")
}
- return
}
type v0subscribe struct {
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index 60d12c4..88e2414 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -7,6 +7,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
-func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
+func NewSessionV1(ws wsConn, sendq chan<- interface{}, ac arvados.Client, db *sql.DB) (session, error) {
return nil, errors.New("Not implemented")
}
commit d3a6d626ab4534865a14e8a34295a65e92036f37
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Nov 17 14:10:15 2016 -0500
8460: Structured logging.
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index 5ef5e2a..bd37daa 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -379,6 +379,7 @@ Client:
APIHost: {}
Insecure: true
Listen: :{}
+LogLevel: {}
Postgres:
host: {}
dbname: {}
@@ -387,6 +388,7 @@ Postgres:
sslmode: require
""".format(os.environ['ARVADOS_API_HOST'],
port,
+ ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
_dbconfig('host'),
_dbconfig('database'),
_dbconfig('username'),
diff --git a/services/ws/config.go b/services/ws/config.go
index 9c2e80a..e2d69d0 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -7,10 +7,11 @@ import (
)
type Config struct {
- Client arvados.Client
- Postgres pgConfig
- Listen string
- Debug bool
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+ LogLevel string
+ LogFormat string
PingTimeout arvados.Duration
ClientEventQueue int
@@ -30,6 +31,8 @@ func DefaultConfig() Config {
"connect_timeout": "30",
"sslmode": "require",
},
+ LogLevel: "info",
+ LogFormat: "json",
PingTimeout: arvados.Duration(time.Minute),
ClientEventQueue: 64,
ServerEventQueue: 4,
diff --git a/services/ws/event.go b/services/ws/event.go
index 77acf44..280035b 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -2,7 +2,6 @@ package main
import (
"database/sql"
- "log"
"sync"
"time"
@@ -51,12 +50,12 @@ func (e *event) Detail() *arvados.Log {
&logRow.CreatedAt,
&propYAML)
if e.err != nil {
- log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("QueryRow failed")
return nil
}
e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
if e.err != nil {
- log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+ logger(nil).WithField("LogID", e.LogID).WithError(e.err).Error("yaml decode failed")
return nil
}
e.logRow = &logRow
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 1470c66..2b94693 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -3,20 +3,12 @@ package main
import (
"encoding/json"
"io"
- "log"
- "net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
-type wsConn interface {
- io.ReadWriter
- Request() *http.Request
- SetReadDeadline(time.Time) error
- SetWriteDeadline(time.Time) error
-}
-
type handler struct {
Client arvados.Client
PingTimeout time.Duration
@@ -25,16 +17,19 @@ type handler struct {
}
type handlerStats struct {
- QueueDelay time.Duration
- WriteDelay time.Duration
- EventBytes uint64
- EventCount uint64
+ QueueDelayNs time.Duration
+ WriteDelayNs time.Duration
+ EventBytes uint64
+ EventCount uint64
}
func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
+ ctx := contextWithLogger(ws.Request().Context(), log.WithFields(log.Fields{
+ "RemoteAddr": ws.Request().RemoteAddr,
+ }))
sess, err := h.NewSession(ws)
if err != nil {
- log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+ logger(ctx).WithError(err).Error("NewSession failed")
return
}
@@ -53,13 +48,13 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
}
ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
n, err := ws.Read(buf)
- sess.debugLogf("received frame: %q", buf[:n])
+ logger(ctx).WithField("frame", string(buf[:n])).Debug("received frame")
if err == nil && n == len(buf) {
err = errFrameTooBig
}
if err != nil {
if err != io.EOF {
- sess.debugLogf("handler: read: %s", err)
+ logger(ctx).WithError(err).Info("read error")
}
stop <- err
return
@@ -67,12 +62,12 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
msg := make(map[string]interface{})
err = json.Unmarshal(buf[:n], &msg)
if err != nil {
- sess.debugLogf("handler: unmarshal: %s", err)
+ logger(ctx).WithError(err).Info("invalid json from client")
stop <- err
return
}
for _, buf := range sess.Receive(msg, buf[:n]) {
- sess.debugLogf("handler: to queue: %s", string(buf))
+ logger(ctx).WithField("frame", string(buf)).Debug("queued message from sess.Receive")
queue <- buf
}
}
@@ -82,39 +77,43 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
for e := range queue {
if buf, ok := e.([]byte); ok {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
- sess.debugLogf("handler: send msg: %s", string(buf))
+ logger(ctx).WithField("frame", string(buf)).Debug("send msg buf")
_, err := ws.Write(buf)
if err != nil {
- sess.debugLogf("handler: write {}: %s", err)
+ logger(ctx).WithError(err).Error("write failed")
stop <- err
break
}
continue
}
e := e.(*event)
+ log := logger(ctx).WithField("serial", e.Serial)
buf, err := sess.EventMessage(e)
if err != nil {
- sess.debugLogf("EventMessage %d: err %s", err)
+ log.WithError(err).Error("EventMessage failed")
stop <- err
break
} else if len(buf) == 0 {
- sess.debugLogf("EventMessage %d: skip", e.Serial)
+ log.Debug("skip")
continue
}
- sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+ log.WithField("frame", string(buf)).Debug("send event")
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- sess.debugLogf("handler: write: %s", err)
+ log.WithError(err).Error("write failed")
stop <- err
break
}
- sess.debugLogf("handler: sent event %d", e.Serial)
- stats.WriteDelay += time.Since(t0)
- stats.QueueDelay += t0.Sub(e.Received)
+ log.Debug("sent")
+
+ if e != nil {
+ stats.QueueDelayNs += t0.Sub(e.Received)
+ }
+ stats.WriteDelayNs += time.Since(t0)
stats.EventBytes += uint64(len(buf))
stats.EventCount++
}
diff --git a/services/ws/log.go b/services/ws/log.go
index 1511691..d3aa82d 100644
--- a/services/ws/log.go
+++ b/services/ws/log.go
@@ -1,41 +1,22 @@
package main
import (
- "encoding/json"
- "fmt"
- "log"
- "time"
-)
-
-func init() {
- log.SetFlags(0)
-}
+ "context"
-func errorLogf(f string, args ...interface{}) {
- log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+ log "github.com/Sirupsen/logrus"
+)
-var debugLogf = func(f string, args ...interface{}) {
- log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
-}
+var loggerCtxKey = new(int)
-func mustMarshal(v interface{}) []byte {
- buf, err := json.Marshal(v)
- if err != nil {
- panic(err)
- }
- return buf
+func contextWithLogger(ctx context.Context, logger *log.Entry) context.Context {
+ return context.WithValue(ctx, loggerCtxKey, logger)
}
-func logj(args ...interface{}) {
- m := map[string]interface{}{"Time": time.Now().UTC()}
- for i := 0; i < len(args)-1; i += 2 {
- m[fmt.Sprintf("%s", args[i])] = args[i+1]
- }
- buf, err := json.Marshal(m)
- if err != nil {
- errorLogf("logj: %s", err)
- return
+func logger(ctx context.Context) *log.Entry {
+ if ctx != nil {
+ if logger, ok := ctx.Value(loggerCtxKey).(*log.Entry); ok {
+ return logger
+ }
}
- log.Print(string(buf))
+ return log.WithFields(nil)
}
diff --git a/services/ws/main.go b/services/ws/main.go
index 719128f..c83f8d9 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -3,11 +3,11 @@ package main
import (
"flag"
"fmt"
- "log"
"net/http"
"time"
"git.curoverse.com/arvados.git/sdk/go/config"
+ log "github.com/Sirupsen/logrus"
)
func main() {
@@ -20,8 +20,24 @@ func main() {
if err != nil {
log.Fatal(err)
}
- if !cfg.Debug {
- debugLogf = func(string, ...interface{}) {}
+
+ lvl, err := log.ParseLevel(cfg.LogLevel)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.SetLevel(lvl)
+ switch cfg.LogFormat {
+ case "text":
+ log.SetFormatter(&log.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: time.RFC3339Nano,
+ })
+ case "json":
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ default:
+ log.WithField("LogFormat", cfg.LogFormat).Fatal("unknown log format")
}
if *dumpConfig {
@@ -49,6 +65,6 @@ func main() {
}
eventSource.NewSink().Stop()
- log.Printf("listening at %s", srv.Addr)
+ log.WithField("Listen", srv.Addr).Info("listening")
log.Fatal(srv.ListenAndServe())
}
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 1dc06b8..30276e4 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -43,10 +43,13 @@ func (pc *cachingPermChecker) SetToken(token string) {
}
func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+ logger := logger(nil).
+ WithField("token", pc.Client.AuthToken).
+ WithField("uuid", uuid)
pc.tidy()
now := time.Now()
if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
- debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
+ logger.WithField("allowed", perm.allowed).Debug("cache hit")
return perm.allowed, nil
}
var buf map[string]interface{}
@@ -61,13 +64,13 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
var allowed bool
if err == nil {
allowed = true
- } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ } else if txErr, ok := err.(*arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
allowed = false
} else {
- errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
+ logger.WithError(err).Error("lookup error")
return false, err
}
- debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
+ logger.WithField("allowed", allowed).Debug("cache miss")
pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
return allowed, nil
}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 08fbee1..206cfeb 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,8 +2,6 @@ package main
import (
"database/sql"
- "fmt"
- "log"
"strconv"
"strings"
"sync"
@@ -45,10 +43,10 @@ func (ps *pgEventSource) setup() {
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- log.Fatalf("sql.Open: %s", err)
+ logger(nil).WithError(err).Fatal("sql.Open failed")
}
if err = db.Ping(); err != nil {
- log.Fatalf("db.Ping: %s", err)
+ logger(nil).WithError(err).Fatal("db.Ping failed")
}
ps.db = db
@@ -58,14 +56,15 @@ func (ps *pgEventSource) setup() {
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
+ logger(nil).WithError(err).Error("listener problem")
+ ps.shutdown <- err
}
})
err = ps.pqListener.Listen("logs")
if err != nil {
- log.Fatal(err)
+ logger(nil).WithError(err).Fatal("pq Listen failed")
}
- debugLogf("pgEventSource listening")
+ logger(nil).Debug("pgEventSource listening")
go ps.run()
}
@@ -81,7 +80,12 @@ func (ps *pgEventSource) run() {
// concurrent queries would be bounded by
// client_count X client_queue_size.
e.Detail()
- debugLogf("event %d detail %+v", e.Serial, e.Detail())
+
+ logger(nil).
+ WithField("serial", e.Serial).
+ WithField("detail", e.Detail()).
+ Debug("event ready")
+
ps.mtx.Lock()
for sink := range ps.sinks {
sink.channel <- e
@@ -97,13 +101,13 @@ func (ps *pgEventSource) run() {
select {
case err, ok := <-ps.shutdown:
if ok {
- debugLogf("shutdown on error: %s", err)
+ logger(nil).WithError(err).Info("shutdown")
}
close(eventQueue)
return
case <-ticker.C:
- debugLogf("pgEventSource listener ping")
+ logger(nil).Debug("listener ping")
ps.pqListener.Ping()
case pqEvent, ok := <-ps.pqListener.Notify:
@@ -116,7 +120,7 @@ func (ps *pgEventSource) run() {
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
if err != nil {
- log.Printf("bad notify payload: %+v", pqEvent)
+ logger(nil).WithField("pqEvent", pqEvent).Error("bad notify payload")
continue
}
serial++
@@ -126,7 +130,7 @@ func (ps *pgEventSource) run() {
Serial: serial,
db: ps.db,
}
- debugLogf("event %d %+v", e.Serial, e)
+ logger(nil).WithField("event", e).Debug("incoming")
eventQueue <- e
go e.Detail()
}
diff --git a/services/ws/router.go b/services/ws/router.go
index 2a4e52e..34656ad 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -2,22 +2,33 @@ package main
import (
"database/sql"
- "encoding/json"
- "log"
+ "io"
"net/http"
+ "strconv"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
"golang.org/x/net/websocket"
)
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
+ SetReadDeadline(time.Time) error
+ SetWriteDeadline(time.Time) error
+}
+
type router struct {
Config *Config
eventSource eventSource
mux *http.ServeMux
setupOnce sync.Once
+
+ lastReqID int64
+ lastReqMtx sync.Mutex
}
func (rtr *router) setup() {
@@ -30,7 +41,7 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
handler := &handler{
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
- NewSession: func(ws wsConn) (session, error) {
+ NewSession: func(ws wsConn) (session, error) {
return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
},
}
@@ -39,17 +50,16 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
- logj("Type", "connect",
- "RemoteAddr", ws.Request().RemoteAddr)
t0 := time.Now()
-
sink := rtr.eventSource.NewSink()
+ logger(ws.Request().Context()).Info("connected")
+
stats := handler.Handle(ws, sink.Channel())
- logj("Type", "disconnect",
- "RemoteAddr", ws.Request().RemoteAddr,
- "Elapsed", time.Now().Sub(t0).Seconds(),
- "Stats", stats)
+ logger(ws.Request().Context()).WithFields(log.Fields{
+ "Elapsed": time.Now().Sub(t0).Seconds(),
+ "Stats": stats,
+ }).Info("disconnect")
sink.Stop()
ws.Close()
@@ -57,18 +67,25 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (
}
}
+func (rtr *router) newReqID() string {
+ rtr.lastReqMtx.Lock()
+ defer rtr.lastReqMtx.Unlock()
+ id := time.Now().UnixNano()
+ if id <= rtr.lastReqID {
+ id = rtr.lastReqID + 1
+ }
+ return strconv.FormatInt(id, 36)
+}
+
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- logj("Type", "request",
- "RemoteAddr", req.RemoteAddr,
- "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
+ logger := logger(req.Context()).
+ WithField("RequestID", rtr.newReqID())
+ ctx := contextWithLogger(req.Context(), logger)
+ req = req.WithContext(ctx)
+ logger.WithFields(log.Fields{
+ "RemoteAddr": req.RemoteAddr,
+ "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+ }).Info("accept request")
rtr.mux.ServeHTTP(resp, req)
}
-
-func reqLog(m map[string]interface{}) {
- j, err := json.Marshal(m)
- if err != nil {
- log.Fatal(err)
- }
- log.Print(string(j))
-}
diff --git a/services/ws/session.go b/services/ws/session.go
index a0658d9..d148f59 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -22,6 +22,4 @@ type session interface {
// incoming events will be queued. If the event queue fills
// up, the connection will be dropped.
EventMessage(*event) ([]byte, error)
-
- debugLogf(string, ...interface{})
}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 33cdb2f..210c8c5 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -4,11 +4,11 @@ import (
"database/sql"
"encoding/json"
"errors"
- "log"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
var (
@@ -26,6 +26,7 @@ type v0session struct {
db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
+ log *log.Entry
mtx sync.Mutex
setupOnce sync.Once
}
@@ -35,35 +36,31 @@ func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
ws: ws,
db: db,
permChecker: NewPermChecker(ac),
+ log: logger(ws.Request().Context()),
}
err := ws.Request().ParseForm()
if err != nil {
- log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+ sess.log.WithError(err).Error("ParseForm failed")
return nil, err
}
token := ws.Request().Form.Get("api_token")
sess.permChecker.SetToken(token)
- sess.debugLogf("token = %+q", token)
+ sess.log.WithField("token", token).Debug("set token")
return sess, nil
}
-func (sess *v0session) debugLogf(s string, args ...interface{}) {
- args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
- debugLogf("%s "+s, args...)
-}
-
func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
- sess.debugLogf("received message: %+v", msg)
+ sess.log.WithField("data", msg).Debug("received message")
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
- sess.debugLogf("ignored unrecognized request: %s", err)
+ sess.log.WithError(err).Info("ignored invalid request")
return nil
}
if sub.Method == "subscribe" {
- sub.prepare()
- sess.debugLogf("subscription: %v", sub)
+ sub.prepare(sess)
+ sess.log.WithField("sub", sub).Debug("sub prepared")
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
@@ -118,7 +115,7 @@ func (sess *v0session) Filter(e *event) bool {
sess.mtx.Lock()
defer sess.mtx.Unlock()
for _, sub := range sess.subscriptions {
- if sub.match(e) {
+ if sub.match(sess, e) {
return true
}
}
@@ -129,7 +126,7 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
if sub.LastLogID == 0 {
return
}
- debugLogf("getOldEvents(%d)", sub.LastLogID)
+ sess.log.WithField("LastLogID", sub.LastLogID).Debug("getOldEvents")
// Here we do a "select id" query and queue an event for every
// log since the given ID, then use (*event)Detail() to
// retrieve the whole row and decide whether to send it. This
@@ -144,14 +141,14 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
sub.LastLogID,
time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
if err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
return
}
for rows.Next() {
var id uint64
err := rows.Scan(&id)
if err != nil {
- errorLogf("Scan: %s", err)
+ sess.log.WithError(err).Error("row Scan failed")
continue
}
e := &event{
@@ -159,20 +156,20 @@ func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
Received: time.Now(),
db: sess.db,
}
- if !sub.match(e) {
- debugLogf("skip old event %+v", e)
+ if !sub.match(sess, e) {
+ sess.log.WithField("event", e).Debug("skip old event")
continue
}
msg, err := sess.EventMessage(e)
if err != nil {
- debugLogf("event marshal: %s", err)
+ sess.log.WithError(err).Error("event marshal failed")
continue
}
- debugLogf("old event: %s", string(msg))
+ sess.log.WithField("data", msg).Debug("will queue old event")
msgs = append(msgs, msg)
}
if err := rows.Err(); err != nil {
- errorLogf("db.Query: %s", err)
+ sess.log.WithError(err).Error("db.Query failed")
}
return
}
@@ -187,23 +184,25 @@ type v0subscribe struct {
type v0filter [3]interface{}
-func (sub *v0subscribe) match(e *event) bool {
+func (sub *v0subscribe) match(sess *v0session, e *event) bool {
+ log := sess.log.WithField("LogID", e.LogID)
detail := e.Detail()
if detail == nil {
- debugLogf("match(%d): failed on no detail", e.LogID)
+ log.Error("match failed, no detail")
return false
}
+ log = log.WithField("funcs", len(sub.funcs))
for i, f := range sub.funcs {
if !f(e) {
- debugLogf("match(%d): failed on func %d", e.LogID, i)
+ log.WithField("func", i).Debug("match failed")
return false
}
}
- debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
+ log.Debug("match passed")
return true
}
-func (sub *v0subscribe) prepare() {
+func (sub *v0subscribe) prepare(sess *v0session) {
for _, f := range sub.Filters {
if len(f) != 3 {
continue
@@ -224,7 +223,6 @@ func (sub *v0subscribe) prepare() {
}
}
sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
for _, s := range strs {
if s == e.Detail().EventType {
return true
@@ -243,36 +241,36 @@ func (sub *v0subscribe) prepare() {
}
t, err := time.Parse(time.RFC3339Nano, tstr)
if err != nil {
- debugLogf("time.Parse(%q): %s", tstr, err)
+ sess.log.WithField("data", tstr).WithError(err).Info("time.Parse failed")
continue
}
+ var fn func(*event) bool
switch op {
case ">=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.Before(t)
- })
+ }
case "<=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return !e.Detail().CreatedAt.After(t)
- })
+ }
case ">":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.After(t)
- })
+ }
case "<":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Before(t)
- })
+ }
case "=":
- sub.funcs = append(sub.funcs, func(e *event) bool {
- debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+ fn = func(e *event) bool {
return e.Detail().CreatedAt.Equal(t)
- })
+ }
+ default:
+ sess.log.WithField("operator", op).Info("bogus operator")
+ continue
}
+ sub.funcs = append(sub.funcs, fn)
}
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list