[ARVADOS] updated: d07dbb66a0e06d07a1b6159d6121d924a06dbb58

Git user git at public.curoverse.com
Mon Nov 14 14:50:19 EST 2016


Summary of changes:
 build/run-build-packages.sh       |   2 +
 build/run-tests.sh                |   2 +
 sdk/go/arvados/client.go          |  77 +++++++++++++++++-
 sdk/go/arvados/log.go             |  16 ++++
 services/keep-web/handler_test.go |   2 +-
 services/ws/config.go             |   4 +-
 services/ws/event.go              |  21 +++--
 services/ws/handler.go            | 147 ++++++++++++++++++++++++++++++++--
 services/ws/handler_v0.go         | 162 --------------------------------------
 services/ws/handler_v1.go         |  13 ---
 services/ws/main.go               |  12 +--
 services/ws/pg.go                 |  91 ++++++++++++---------
 services/ws/proxy_client.go       |  41 ++++++++++
 services/ws/router.go             |  21 ++---
 services/ws/session.go            |   8 ++
 services/ws/session_v0.go         |  91 +++++++++++++++++++++
 services/ws/session_v1.go         |  11 +++
 17 files changed, 475 insertions(+), 246 deletions(-)
 create mode 100644 sdk/go/arvados/log.go
 delete mode 100644 services/ws/handler_v0.go
 delete mode 100644 services/ws/handler_v1.go
 create mode 100644 services/ws/proxy_client.go
 create mode 100644 services/ws/session.go
 create mode 100644 services/ws/session_v0.go
 create mode 100644 services/ws/session_v1.go

       via  d07dbb66a0e06d07a1b6159d6121d924a06dbb58 (commit)
       via  2ec0fa427f0cd0008c43d0312d338fc6eab07b74 (commit)
       via  81bf02bfe738c29d4d8315b12802d55f4ad5951b (commit)
       via  e2a5da6ba8b3c4633bca0bcc347c8b7649bb338c (commit)
       via  f6f89d77bd90207e79ea20c5bbc4c479db8ef1de (commit)
       via  d7e1efb7a5ce12175a3eaaedba5725e09e37363a (commit)
       via  dcb82d1101219a76df671a4f61738b764841d8dd (commit)
      from  f9eb135e4420352198729534f115f233cda8c261 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d07dbb66a0e06d07a1b6159d6121d924a06dbb58
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 14:50:14 2016 -0500

    8460: Refactor session logic (subscription protocol) out of handler (queueing and delivery).

diff --git a/services/ws/handler.go b/services/ws/handler.go
index fd5c7f8..d42b137c 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,14 +1,14 @@
 package main
 
 import (
+	"encoding/json"
 	"io"
+	"log"
 	"net/http"
 	"time"
-)
 
-type handler interface {
-	Handle(wsConn, <-chan *event)
-}
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
 
 type wsConn interface {
 	io.ReadWriter
@@ -16,3 +16,141 @@ type wsConn interface {
 	SetReadDeadline(time.Time) error
 	SetWriteDeadline(time.Time) error
 }
+
+type handler struct {
+	Client      arvados.Client
+	PingTimeout time.Duration
+	QueueSize   int
+	NewSession  func(wsConn, arvados.Client) (session, error)
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) {
+	sess, err := h.NewSession(ws, h.Client)
+	if err != nil {
+		log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+		return
+	}
+
+	queue := make(chan *event, h.QueueSize)
+
+	stopped := make(chan struct{})
+	stop := make(chan error, 5)
+
+	go func() {
+		buf := make([]byte, 2<<20)
+		for {
+			select {
+			case <-stopped:
+				return
+			default:
+			}
+			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+			n, err := ws.Read(buf)
+			sess.debugLogf("received frame: %q", buf[:n])
+			if err == nil && n == len(buf) {
+				err = errFrameTooBig
+			}
+			if err != nil {
+				if err != io.EOF {
+					sess.debugLogf("handler: read: %s", err)
+				}
+				stop <- err
+				return
+			}
+			msg := make(map[string]interface{})
+			err = json.Unmarshal(buf[:n], &msg)
+			if err != nil {
+				sess.debugLogf("handler: unmarshal: %s", err)
+				stop <- err
+				return
+			}
+			sess.Receive(msg)
+		}
+	}()
+
+	go func() {
+		for e := range queue {
+			if e == nil {
+				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+				_, err := ws.Write([]byte("{}\n"))
+				if err != nil {
+					sess.debugLogf("handler: write {}: %s", err)
+					stop <- err
+					break
+				}
+				continue
+			}
+
+			buf, err := sess.EventMessage(e)
+			if err != nil {
+				sess.debugLogf("EventMessage %d: err %s", err)
+				stop <- err
+				break
+			} else if len(buf) == 0 {
+				sess.debugLogf("EventMessage %d: skip", e.Serial)
+				continue
+			}
+
+			sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+			_, err = ws.Write(buf)
+			if err != nil {
+				sess.debugLogf("handler: write: %s", err)
+				stop <- err
+				break
+			}
+			sess.debugLogf("handler: sent event %d", e.Serial)
+		}
+		for _ = range queue {
+		}
+	}()
+
+	// Filter incoming events against the current subscription
+	// list, and forward matching events to the outgoing message
+	// queue. Close the queue and return when the "stopped"
+	// channel closes or the incoming event stream ends. Shut down
+	// the handler if the outgoing queue fills up.
+	go func() {
+		send := func(e *event) {
+			select {
+			case queue <- e:
+			default:
+				stop <- errQueueFull
+			}
+		}
+
+		ticker := time.NewTicker(h.PingTimeout)
+		defer ticker.Stop()
+
+		for {
+			var e *event
+			var ok bool
+			select {
+			case <-stopped:
+				close(queue)
+				return
+			case <-ticker.C:
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
+			case e, ok = <-events:
+				if !ok {
+					close(queue)
+					return
+				}
+			}
+			if sess.Filter(e) {
+				send(e)
+			}
+		}
+	}()
+
+	<-stop
+	close(stopped)
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
deleted file mode 100644
index 3f9a211..0000000
--- a/services/ws/handler_v0.go
+++ /dev/null
@@ -1,195 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"errors"
-	"io"
-	"log"
-	"sync"
-	"time"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-var (
-	errQueueFull   = errors.New("client queue full")
-	errFrameTooBig = errors.New("frame too big")
-)
-
-type handlerV0 struct {
-	Client      arvados.Client
-	PingTimeout time.Duration
-	QueueSize   int
-}
-
-func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
-	args = append([]interface{}{ws.Request().RemoteAddr}, args...)
-	debugLogf("%s "+s, args...)
-}
-
-func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
-	queue := make(chan *event, h.QueueSize)
-	mtx := sync.Mutex{}
-	subscribed := make(map[string]bool)
-
-	proxyClient := NewProxyClient(h.Client)
-	{
-		err := ws.Request().ParseForm()
-		if err != nil {
-			log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
-			return
-		}
-		token := ws.Request().Form.Get("api_token")
-		h.debugLogf(ws, "handlerV0: token = %+q", token)
-		proxyClient.SetToken(token)
-	}
-
-	stopped := make(chan struct{})
-	stop := make(chan error, 5)
-
-	go func() {
-		buf := make([]byte, 2<<20)
-		for {
-			select {
-			case <-stopped:
-				return
-			default:
-			}
-			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
-			n, err := ws.Read(buf)
-			h.debugLogf(ws, "received frame: %q", buf[:n])
-			if err == nil && n == len(buf) {
-				err = errFrameTooBig
-			}
-			if err != nil {
-				if err != io.EOF {
-					h.debugLogf(ws, "handlerV0: read: %s", err)
-				}
-				stop <- err
-				return
-			}
-			msg := make(map[string]interface{})
-			err = json.Unmarshal(buf[:n], &msg)
-			if err != nil {
-				h.debugLogf(ws, "handlerV0: unmarshal: %s", err)
-				stop <- err
-				return
-			}
-			h.debugLogf(ws, "received message: %+v", msg)
-			h.debugLogf(ws, "subscribing to *")
-			subscribed["*"] = true
-		}
-	}()
-
-	go func() {
-		for e := range queue {
-			if e == nil {
-				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				_, err := ws.Write([]byte("{}\n"))
-				if err != nil {
-					h.debugLogf(ws, "handlerV0: write: %s", err)
-					stop <- err
-					break
-				}
-				continue
-			}
-			detail := e.Detail()
-			if detail == nil {
-				continue
-			}
-
-			ok, err := proxyClient.CheckReadPermission(detail.UUID)
-			if err != nil {
-				log.Printf("CheckReadPermission: %s", err)
-				stop <- err
-				break
-			}
-			if !ok {
-				h.debugLogf(ws, "handlerV0: skip event %d", e.Serial)
-				continue
-			}
-
-			buf, err := json.Marshal(map[string]interface{}{
-				"msgID":             e.Serial,
-				"id":                detail.ID,
-				"uuid":              detail.UUID,
-				"object_uuid":       detail.ObjectUUID,
-				"object_owner_uuid": detail.ObjectOwnerUUID,
-				"event_type":        detail.EventType,
-			})
-			if err != nil {
-				log.Printf("error encoding: ", err)
-				continue
-			}
-			h.debugLogf(ws, "handlerV0: send event %d: %q", e.Serial, buf)
-			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-			_, err = ws.Write(append(buf, byte('\n')))
-			if err != nil {
-				h.debugLogf(ws, "handlerV0: write: %s", err)
-				stop <- err
-				break
-			}
-			h.debugLogf(ws, "handlerV0: sent event %d", e.Serial)
-		}
-		for _ = range queue {
-		}
-	}()
-
-	// Filter incoming events against the current subscription
-	// list, and forward matching events to the outgoing message
-	// queue. Close the queue and return when the "stopped"
-	// channel closes or the incoming event stream ends. Shut down
-	// the handler if the outgoing queue fills up.
-	go func() {
-		send := func(e *event) {
-			select {
-			case queue <- e:
-			default:
-				stop <- errQueueFull
-			}
-		}
-
-		ticker := time.NewTicker(h.PingTimeout)
-		defer ticker.Stop()
-
-		for {
-			var e *event
-			var ok bool
-			select {
-			case <-stopped:
-				close(queue)
-				return
-			case <-ticker.C:
-				// If the outgoing queue is empty,
-				// send an empty message. This can
-				// help detect a disconnected network
-				// socket, and prevent an idle socket
-				// from being closed.
-				if len(queue) == 0 {
-					queue <- nil
-				}
-				continue
-			case e, ok = <-events:
-				if !ok {
-					close(queue)
-					return
-				}
-			}
-			detail := e.Detail()
-			mtx.Lock()
-			switch {
-			case subscribed["*"]:
-				send(e)
-			case detail == nil:
-			case subscribed[detail.ObjectUUID]:
-				send(e)
-			case subscribed[detail.ObjectOwnerUUID]:
-				send(e)
-			}
-			mtx.Unlock()
-		}
-	}()
-
-	<-stop
-	close(stopped)
-}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
deleted file mode 100644
index 1b8549e..0000000
--- a/services/ws/handler_v1.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package main
-
-import (
-	"time"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-type handlerV1 struct {
-	Client      arvados.Client
-	PingTimeout time.Duration
-	QueueSize   int
-}
-
-func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
-}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 8d5c604..e766f6c 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -28,7 +28,7 @@ func (c pgConfig) ConnectionString() string {
 
 type pgEventSource struct {
 	DataSource string
-	QueueSize int
+	QueueSize  int
 
 	pqListener *pq.Listener
 	sinks      map[*pgEventSink]bool
diff --git a/services/ws/router.go b/services/ws/router.go
index 30f93ea..e829ce8 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -7,6 +7,7 @@ import (
 	"net/http"
 	"sync"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"golang.org/x/net/websocket"
 )
 
@@ -20,19 +21,17 @@ type router struct {
 
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
-	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
-		Client:      rtr.Config.Client,
-		PingTimeout: rtr.Config.PingTimeout.Duration(),
-		QueueSize:   rtr.Config.ClientEventQueue,
-	}))
-	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+	rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+	handler := &handler{
 		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
-	}))
-}
-
-func (rtr *router) makeServer(handler handler) *websocket.Server {
+		NewSession:  newSession,
+	}
 	return &websocket.Server{
 		Handshake: func(c *websocket.Config, r *http.Request) error {
 			return nil
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644
index 0000000..db437b5
--- /dev/null
+++ b/services/ws/session.go
@@ -0,0 +1,8 @@
+package main
+
+type session interface {
+	Receive(map[string]interface{})
+	EventMessage(*event) ([]byte, error)
+	Filter(*event) bool
+	debugLogf(string, ...interface{})
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644
index 0000000..15efc1d
--- /dev/null
+++ b/services/ws/session_v0.go
@@ -0,0 +1,91 @@
+package main
+
+import (
+	"encoding/json"
+	"errors"
+	"log"
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+	errQueueFull   = errors.New("client queue full")
+	errFrameTooBig = errors.New("frame too big")
+)
+
+type sessionV0 struct {
+	ws          wsConn
+	proxyClient *proxyClient
+	subscribed  map[string]bool
+	mtx         sync.Mutex
+	setupOnce   sync.Once
+}
+
+func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+	sess := &sessionV0{
+		ws:          ws,
+		proxyClient: NewProxyClient(ac),
+		subscribed:  make(map[string]bool),
+	}
+
+	err := ws.Request().ParseForm()
+	if err != nil {
+		log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+		return nil, err
+	}
+	token := ws.Request().Form.Get("api_token")
+	sess.proxyClient.SetToken(token)
+	sess.debugLogf("handlerV0: token = %+q", token)
+
+	return sess, nil
+}
+
+func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+	args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+	debugLogf("%s "+s, args...)
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}) {
+	sess.debugLogf("received message: %+v", msg)
+	sess.debugLogf("subscribing to *")
+	sess.subscribed["*"] = true
+}
+
+func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+	detail := e.Detail()
+	if detail == nil {
+		return nil, nil
+	}
+	ok, err := sess.proxyClient.CheckReadPermission(detail.UUID)
+	if err != nil || !ok {
+		return nil, err
+	}
+
+	return json.Marshal(map[string]interface{}{
+		"msgID":             e.Serial,
+		"id":                detail.ID,
+		"uuid":              detail.UUID,
+		"object_uuid":       detail.ObjectUUID,
+		"object_owner_uuid": detail.ObjectOwnerUUID,
+		"event_type":        detail.EventType,
+	})
+}
+
+func (sess *sessionV0) Filter(e *event) bool {
+	detail := e.Detail()
+	sess.mtx.Lock()
+	defer sess.mtx.Unlock()
+	switch {
+	case sess.subscribed["*"]:
+		return true
+	case detail == nil:
+		return false
+	case sess.subscribed[detail.ObjectUUID]:
+		return true
+	case sess.subscribed[detail.ObjectOwnerUUID]:
+		return true
+	default:
+		return false
+	}
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644
index 0000000..bc09ed0
--- /dev/null
+++ b/services/ws/session_v1.go
@@ -0,0 +1,11 @@
+package main
+
+import (
+	"errors"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+	return nil, errors.New("Not implemented")
+}

commit 2ec0fa427f0cd0008c43d0312d338fc6eab07b74
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 13:47:48 2016 -0500

    8460: Drop unnecessary read timeout handling.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index ba8f945..fd5c7f8 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -16,7 +16,3 @@ type wsConn interface {
 	SetReadDeadline(time.Time) error
 	SetWriteDeadline(time.Time) error
 }
-
-type timeouter interface {
-	Timeout() bool
-}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index eb076b5..3f9a211 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -61,9 +61,6 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if err == nil && n == len(buf) {
 				err = errFrameTooBig
 			}
-			if err, ok := err.(timeouter); ok && err.Timeout() {
-				continue
-			}
 			if err != nil {
 				if err != io.EOF {
 					h.debugLogf(ws, "handlerV0: read: %s", err)

commit 81bf02bfe738c29d4d8315b12802d55f4ad5951b
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 13:47:32 2016 -0500

    8460: Pass datasource in Go style.

diff --git a/services/ws/main.go b/services/ws/main.go
index 2866244..c0f4dd5 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -36,8 +36,8 @@ func main() {
 	}
 
 	eventSource := &pgEventSource{
-		PgConfig:  cfg.Postgres,
-		QueueSize: cfg.ServerEventQueue,
+		DataSource: cfg.Postgres.ConnectionString(),
+		QueueSize:  cfg.ServerEventQueue,
 	}
 	srv := &http.Server{
 		Addr:           cfg.Listen,
diff --git a/services/ws/pg.go b/services/ws/pg.go
index f89fd07..8d5c604 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -27,7 +27,7 @@ func (c pgConfig) ConnectionString() string {
 }
 
 type pgEventSource struct {
-	PgConfig  pgConfig
+	DataSource string
 	QueueSize int
 
 	pqListener *pq.Listener
@@ -42,12 +42,15 @@ func (ps *pgEventSource) setup() {
 }
 
 func (ps *pgEventSource) run() {
-	db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
+	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
-		log.Fatal(err)
+		log.Fatalf("sql.Open: %s", err)
+	}
+	if err = db.Ping(); err != nil {
+		log.Fatalf("db.Ping: %s", err)
 	}
 
-	listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+	listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
 		if err != nil {
 			// Until we have a mechanism for catching up
 			// on missed events, we cannot recover from a

commit e2a5da6ba8b3c4633bca0bcc347c8b7649bb338c
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:45:26 2016 -0500

    8460: Combine ping and notify goroutines.

diff --git a/services/ws/pg.go b/services/ws/pg.go
index 5e8e63e..f89fd07 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -60,15 +60,11 @@ func (ps *pgEventSource) run() {
 	if err != nil {
 		log.Fatal(err)
 	}
+
 	debugLogf("pgEventSource listening")
-	go func() {
-		for _ = range time.NewTicker(time.Minute).C {
-			debugLogf("pgEventSource listener ping")
-			listener.Ping()
-		}
-	}()
 
 	eventQueue := make(chan *event, ps.QueueSize)
+
 	go func() {
 		for e := range eventQueue {
 			// Wait for the "select ... from logs" call to
@@ -87,25 +83,38 @@ func (ps *pgEventSource) run() {
 	}()
 
 	var serial uint64
-	for pqEvent := range listener.Notify {
-		if pqEvent.Channel != "logs" {
-			continue
-		}
-		logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
-		if err != nil {
-			log.Printf("bad notify payload: %+v", pqEvent)
-			continue
-		}
-		serial++
-		e := &event{
-			LogID:    logID,
-			Received: time.Now(),
-			Serial:   serial,
-			db:       db,
+	ticker := time.NewTicker(time.Minute)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ticker.C:
+			debugLogf("pgEventSource listener ping")
+			listener.Ping()
+
+		case pqEvent, ok := <-listener.Notify:
+			if !ok {
+				close(eventQueue)
+				return
+			}
+			if pqEvent.Channel != "logs" {
+				continue
+			}
+			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+			if err != nil {
+				log.Printf("bad notify payload: %+v", pqEvent)
+				continue
+			}
+			serial++
+			e := &event{
+				LogID:    logID,
+				Received: time.Now(),
+				Serial:   serial,
+				db:       db,
+			}
+			debugLogf("event %d %+v", e.Serial, e)
+			eventQueue <- e
+			go e.Detail()
 		}
-		debugLogf("event %d %+v", e.Serial, e)
-		eventQueue <- e
-		go e.Detail()
 	}
 }
 

commit f6f89d77bd90207e79ea20c5bbc4c479db8ef1de
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:38:46 2016 -0500

    8460: Add Log type.

diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644
index 0000000..caea04c
--- /dev/null
+++ b/sdk/go/arvados/log.go
@@ -0,0 +1,16 @@
+package arvados
+
+import (
+	"time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+	ID              uint64                 `json:"id"`
+	UUID            string                 `json:"uuid"`
+	ObjectUUID      string                 `json:"object_uuid"`
+	ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+	EventType       string                 `json:"event_type"`
+	Properties      map[string]interface{} `json:"properties"`
+	CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}

commit d7e1efb7a5ce12175a3eaaedba5725e09e37363a
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:38:14 2016 -0500

    8460: Check permissions.

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 36f4eb5..0c18d38 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -41,6 +41,8 @@ type Client struct {
 	// callers who use a Client to initialize an
 	// arvadosclient.ArvadosClient.)
 	KeepServiceURIs []string `json:",omitempty"`
+
+	dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+	BasePath                     string              `json:"basePath"`
+	DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+	BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+	Schemas                      map[string]Schema   `json:"schemas"`
+	Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+	Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+	HTTPMethod string         `json:"httpMethod"`
+	Path       string         `json:"path"`
+	Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+	Ref string `json:"$ref"`
+}
+
+type Schema struct {
+	UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+	if c.dd != nil {
+		return c.dd, nil
+	}
 	var dd DiscoveryDocument
-	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+	err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+	if err != nil {
+		return nil, err
+	}
+	c.dd = &dd
+	return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+	if len(uuid) != 27 {
+		return "", fmt.Errorf("invalid UUID: %q", uuid)
+	}
+	dd, err := c.DiscoveryDocument()
+	if err != nil {
+		return "", err
+	}
+	infix := uuid[6:11]
+	var model string
+	for m, s := range dd.Schemas {
+		if s.UUIDPrefix == infix {
+			model = m
+			break
+		}
+	}
+	if model == "" {
+		return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+	}
+	var resource string
+	for r, rsc := range dd.Resources {
+		if rsc.Methods["get"].Response.Ref == model {
+			resource = r
+			break
+		}
+	}
+	if resource == "" {
+		return "", fmt.Errorf("no resource for model: %q", model)
+	}
+	m, ok := dd.Resources[resource].Methods[method]
+	if !ok {
+		return "", fmt.Errorf("no method %q for resource %q", method, resource)
+	}
+	path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+	if path[0] == '/' {
+		path = path[1:]
+	}
+	return path, nil
 }
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index b3e17e8..cb61b52 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -18,7 +18,7 @@ var _ = check.Suite(&UnitSuite{})
 
 type UnitSuite struct{}
 
-func mustParseURL(s string) *url.URL {
+func mustParseURL(s string ) *url.URL {
 	r, err := url.Parse(s)
 	if err != nil {
 		panic("parse URL: " + s)
diff --git a/services/ws/config.go b/services/ws/config.go
index 3e3d91f..9c2e80a 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -23,12 +23,12 @@ func DefaultConfig() Config {
 			APIHost: "localhost:443",
 		},
 		Postgres: pgConfig{
-			"dbname":          "arvados_test",
+			"dbname":          "arvados_production",
 			"user":            "arvados",
 			"password":        "xyzzy",
 			"host":            "localhost",
 			"connect_timeout": "30",
-			"sslmode":         "disable",
+			"sslmode":         "require",
 		},
 		PingTimeout:      arvados.Duration(time.Minute),
 		ClientEventQueue: 64,
diff --git a/services/ws/event.go b/services/ws/event.go
index b6dda49..e34b6b4 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/ghodss/yaml"
 )
 
 type eventSink interface {
@@ -15,11 +16,11 @@ type eventSink interface {
 }
 
 type eventSource interface {
-	NewSink(chan *event) eventSink
+	NewSink() eventSink
 }
 
 type event struct {
-	LogUUID  string
+	LogID    uint64
 	Received time.Time
 	Serial   uint64
 
@@ -39,18 +40,24 @@ func (e *event) Detail() *arvados.Log {
 		return e.logRow
 	}
 	var logRow arvados.Log
-	var oldAttrs, newAttrs []byte
-	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+	var propYAML []byte
+	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
 		&logRow.ID,
 		&logRow.UUID,
 		&logRow.ObjectUUID,
 		&logRow.ObjectOwnerUUID,
 		&logRow.EventType,
 		&logRow.CreatedAt,
-		&oldAttrs,
-		&newAttrs)
+		&propYAML)
 	if e.err != nil {
-		log.Printf("retrieving log row %s: %s", e.LogUUID, e.err)
+		log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+		return nil
 	}
+	e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+	if e.err != nil {
+		log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+		return nil
+	}
+	e.logRow = &logRow
 	return e.logRow
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
index fe47a62..ba8f945 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -14,6 +14,7 @@ type wsConn interface {
 	io.ReadWriter
 	Request() *http.Request
 	SetReadDeadline(time.Time) error
+	SetWriteDeadline(time.Time) error
 }
 
 type timeouter interface {
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index c728d12..eb076b5 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -7,6 +7,8 @@ import (
 	"log"
 	"sync"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 var (
@@ -15,6 +17,7 @@ var (
 )
 
 type handlerV0 struct {
+	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
 }
@@ -29,6 +32,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	mtx := sync.Mutex{}
 	subscribed := make(map[string]bool)
 
+	proxyClient := NewProxyClient(h.Client)
+	{
+		err := ws.Request().ParseForm()
+		if err != nil {
+			log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+			return
+		}
+		token := ws.Request().Form.Get("api_token")
+		h.debugLogf(ws, "handlerV0: token = %+q", token)
+		proxyClient.SetToken(token)
+	}
+
 	stopped := make(chan struct{})
 	stop := make(chan error, 5)
 
@@ -40,21 +55,13 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 				return
 			default:
 			}
-			ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
+			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
 			n, err := ws.Read(buf)
 			h.debugLogf(ws, "received frame: %q", buf[:n])
 			if err == nil && n == len(buf) {
 				err = errFrameTooBig
 			}
 			if err, ok := err.(timeouter); ok && err.Timeout() {
-				// If the outgoing queue is empty,
-				// send an empty message. This can
-				// help detect a disconnected network
-				// socket, and prevent an idle socket
-				// from being closed.
-				if len(queue) == 0 {
-					queue <- nil
-				}
 				continue
 			}
 			if err != nil {
@@ -80,6 +87,7 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		for e := range queue {
 			if e == nil {
+				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
 				_, err := ws.Write([]byte("{}\n"))
 				if err != nil {
 					h.debugLogf(ws, "handlerV0: write: %s", err)
@@ -92,7 +100,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if detail == nil {
 				continue
 			}
-			// FIXME: check permission
+
+			ok, err := proxyClient.CheckReadPermission(detail.UUID)
+			if err != nil {
+				log.Printf("CheckReadPermission: %s", err)
+				stop <- err
+				break
+			}
+			if !ok {
+				h.debugLogf(ws, "handlerV0: skip event %d", e.Serial)
+				continue
+			}
+
 			buf, err := json.Marshal(map[string]interface{}{
 				"msgID":             e.Serial,
 				"id":                detail.ID,
@@ -105,14 +124,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 				log.Printf("error encoding: ", err)
 				continue
 			}
+			h.debugLogf(ws, "handlerV0: send event %d: %q", e.Serial, buf)
+			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
 			_, err = ws.Write(append(buf, byte('\n')))
 			if err != nil {
 				h.debugLogf(ws, "handlerV0: write: %s", err)
 				stop <- err
 				break
 			}
+			h.debugLogf(ws, "handlerV0: sent event %d", e.Serial)
+		}
+		for _ = range queue {
 		}
-		for _ = range queue {}
 	}()
 
 	// Filter incoming events against the current subscription
@@ -129,6 +152,9 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			}
 		}
 
+		ticker := time.NewTicker(h.PingTimeout)
+		defer ticker.Stop()
+
 		for {
 			var e *event
 			var ok bool
@@ -136,6 +162,16 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			case <-stopped:
 				close(queue)
 				return
+			case <-ticker.C:
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
 			case e, ok = <-events:
 				if !ok {
 					close(queue)
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 4160d86..1b8549e 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -2,9 +2,12 @@ package main
 
 import (
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 type handlerV1 struct {
+	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index 0f97823..2866244 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -35,18 +35,20 @@ func main() {
 		return
 	}
 
+	eventSource := &pgEventSource{
+		PgConfig:  cfg.Postgres,
+		QueueSize: cfg.ServerEventQueue,
+	}
 	srv := &http.Server{
 		Addr:           cfg.Listen,
 		ReadTimeout:    time.Minute,
 		WriteTimeout:   time.Minute,
 		MaxHeaderBytes: 1 << 20,
 		Handler: &router{
-			Config: &cfg,
-			eventSource: &pgEventSource{
-				PgConfig:  cfg.Postgres,
-				QueueSize: cfg.ServerEventQueue,
-			},
+			Config:      &cfg,
+			eventSource: eventSource,
 		},
 	}
+	eventSource.NewSink().Stop()
 	log.Fatal(srv.ListenAndServe())
 }
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 51bc92c..5e8e63e 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -3,6 +3,7 @@ package main
 import (
 	"database/sql"
 	"log"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -52,15 +53,17 @@ func (ps *pgEventSource) run() {
 			// on missed events, we cannot recover from a
 			// dropped connection without breaking our
 			// promises to clients.
-			log.Fatal(err)
+			log.Fatalf("pgEventSource listener problem: %s", err)
 		}
 	})
 	err = listener.Listen("logs")
 	if err != nil {
 		log.Fatal(err)
 	}
+	debugLogf("pgEventSource listening")
 	go func() {
 		for _ = range time.NewTicker(time.Minute).C {
+			debugLogf("pgEventSource listener ping")
 			listener.Ping()
 		}
 	}()
@@ -74,7 +77,7 @@ func (ps *pgEventSource) run() {
 			// concurrent queries would be bounded by
 			// client_count X client_queue_size.
 			e.Detail()
-			debugLogf("%+v", e)
+			debugLogf("event %d detail %+v", e.Serial, e.Detail())
 			ps.mtx.Lock()
 			for sink := range ps.sinks {
 				sink.channel <- e
@@ -88,33 +91,35 @@ func (ps *pgEventSource) run() {
 		if pqEvent.Channel != "logs" {
 			continue
 		}
+		logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+		if err != nil {
+			log.Printf("bad notify payload: %+v", pqEvent)
+			continue
+		}
 		serial++
 		e := &event{
-			LogUUID:  pqEvent.Extra,
+			LogID:    logID,
 			Received: time.Now(),
 			Serial:   serial,
 			db:       db,
 		}
-		debugLogf("%+v", e)
+		debugLogf("event %d %+v", e.Serial, e)
 		eventQueue <- e
 		go e.Detail()
 	}
 }
 
-// NewSink subscribes to the event source. If c is not nil, it will be
-// used as the event channel. Otherwise, a new channel will be
-// created. Either way, the sink channel will be returned by the
-// Channel() method of the returned eventSink. All subsequent events
-// will be sent to the sink channel. The caller must ensure events are
-// received from the sink channel as quickly as possible: when one
-// sink blocks, all other sinks also block.
-func (ps *pgEventSource) NewSink(c chan *event) eventSink {
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
 	ps.setupOnce.Do(ps.setup)
-	if c == nil {
-		c = make(chan *event, 1)
-	}
 	sink := &pgEventSink{
-		channel: c,
+		channel: make(chan *event, 1),
 		source:  ps,
 	}
 	ps.mtx.Lock()
diff --git a/services/ws/proxy_client.go b/services/ws/proxy_client.go
new file mode 100644
index 0000000..28be2e2
--- /dev/null
+++ b/services/ws/proxy_client.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+	"net/http"
+	"net/url"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type proxyClient struct {
+	*arvados.Client
+}
+
+func NewProxyClient(ac arvados.Client) *proxyClient {
+	ac.AuthToken = ""
+	return &proxyClient{
+		Client: &ac,
+	}
+}
+
+func (pc *proxyClient) SetToken(token string) {
+	pc.Client.AuthToken = token
+}
+
+func (pc *proxyClient) CheckReadPermission(uuid string) (bool, error) {
+	var buf map[string]interface{}
+	path, err := pc.PathForUUID("get", uuid)
+	if err != nil {
+		return false, err
+	}
+	err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+		"select": {`["uuid"]`},
+	})
+	if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
+		return false, nil
+	}
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
diff --git a/services/ws/router.go b/services/ws/router.go
index 685b613..30f93ea 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
+		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
@@ -37,7 +39,7 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
 			log.Printf("%v accepted", ws.Request().RemoteAddr)
-			sink := rtr.eventSource.NewSink(nil)
+			sink := rtr.eventSource.NewSink()
 			handler.Handle(ws, sink.Channel())
 			sink.Stop()
 			ws.Close()

commit dcb82d1101219a76df671a4f61738b764841d8dd
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 02:18:42 2016 -0500

    8460: Test and package services/ws as arvados-ws.

diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index 320f9d4..0a4559f 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -427,6 +427,8 @@ package_go_binary services/keepstore keepstore \
     "Keep storage daemon, accessible to clients on the LAN"
 package_go_binary services/keep-web keep-web \
     "Static web hosting service for user data stored in Arvados Keep"
+package_go_binary services/ws arvados-ws \
+    "Arvados Websocket server"
 package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 8959cfb..d993f47 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -79,6 +79,7 @@ services/nodemanager
 services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
+services/ws
 sdk/cli
 sdk/pam
 sdk/python
@@ -765,6 +766,7 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
+    services/ws
     tools/keep-block-check
     tools/keep-exercise
     tools/keep-rsync

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list