[ARVADOS] updated: 219daa3b227f147cb628a27b823b9b4e1f8d32b1

Git user git at public.curoverse.com
Sun Nov 13 20:59:46 EST 2016


Summary of changes:
 services/ws/event.go      |  5 +--
 services/ws/handler.go    |  8 ++++-
 services/ws/handler_v0.go | 91 +++++++++++++++++++++++++++++++++++++++++++++--
 services/ws/handler_v1.go |  3 +-
 services/ws/router.go     | 13 +++++--
 5 files changed, 111 insertions(+), 9 deletions(-)

       via  219daa3b227f147cb628a27b823b9b4e1f8d32b1 (commit)
       via  6e2b2d15e8c3b7926cd7b300660698fa23644efc (commit)
      from  7cb536fa58d8cc837b4cb59680c7355a1687648b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 219daa3b227f147cb628a27b823b9b4e1f8d32b1
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 20:59:40 2016 -0500

    8460: Hide *websocket.Conn behind interface.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index 96219ed..cd3374e 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,9 +1,15 @@
 package main
 
 import (
-	"golang.org/x/net/websocket"
+	"io"
+	"net/http"
 )
 
 type handler interface {
-	Handle(*websocket.Conn, <-chan *event)
+	Handle(wsConn, <-chan *event)
+}
+
+type wsConn interface {
+	io.ReadWriter
+	Request() *http.Request
 }
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 960698f..451af86 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -4,20 +4,18 @@ import (
 	"encoding/json"
 	"log"
 	"sync"
-
-	"golang.org/x/net/websocket"
 )
 
 type handlerV0 struct {
 	QueueSize int
 }
 
-func (h *handlerV0) debugLogf(ws *websocket.Conn, s string, args ...interface{}) {
+func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
 	args = append([]interface{}{ws.Request().RemoteAddr}, args...)
 	debugLogf("%s "+s, args...)
 }
 
-func (h *handlerV0) Handle(ws *websocket.Conn, events <-chan *event) {
+func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	done := make(chan struct{}, 3)
 	queue := make(chan *event, h.QueueSize)
 	mtx := sync.Mutex{}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index f7b3237..91f3d34 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,12 +1,11 @@
 package main
 
 import (
-	"golang.org/x/net/websocket"
 )
 
 type handlerV1 struct {
 	QueueSize int
 }
 
-func (h *handlerV1) Handle(ws *websocket.Conn, events <-chan *event) {
+func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
 }

commit 6e2b2d15e8c3b7926cd7b300660698fa23644efc
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 20:56:04 2016 -0500

    8460: Send events.

diff --git a/services/ws/event.go b/services/ws/event.go
index 1634a7a..7e27e09 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -34,12 +34,13 @@ type event struct {
 func (e *event) Detail(db *sql.DB) *arvados.Log {
 	e.mtx.Lock()
 	defer e.mtx.Unlock()
-	if e.logRow != nil || e.err != nil {
+	if e.logRow != nil || e.err != nil || db == nil {
 		return e.logRow
 	}
 	var logRow arvados.Log
 	var oldAttrs, newAttrs []byte
-	e.err = db.QueryRow(`SELECT uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+	e.err = db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+		&logRow.ID,
 		&logRow.UUID,
 		&logRow.ObjectUUID,
 		&logRow.ObjectOwnerUUID,
diff --git a/services/ws/handler.go b/services/ws/handler.go
index c768143..96219ed 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,9 +1,9 @@
 package main
 
 import (
-	"io"
+	"golang.org/x/net/websocket"
 )
 
 type handler interface {
-	Handle(io.ReadWriter, <-chan *event)
+	Handle(*websocket.Conn, <-chan *event)
 }
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 40ab75b..960698f 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -1,12 +1,101 @@
 package main
 
 import (
-	"io"
+	"encoding/json"
+	"log"
+	"sync"
+
+	"golang.org/x/net/websocket"
 )
 
 type handlerV0 struct {
 	QueueSize int
 }
 
-func (h *handlerV0) Handle(ws io.ReadWriter, events <-chan *event) {
+func (h *handlerV0) debugLogf(ws *websocket.Conn, s string, args ...interface{}) {
+	args = append([]interface{}{ws.Request().RemoteAddr}, args...)
+	debugLogf("%s "+s, args...)
+}
+
+func (h *handlerV0) Handle(ws *websocket.Conn, events <-chan *event) {
+	done := make(chan struct{}, 3)
+	queue := make(chan *event, h.QueueSize)
+	mtx := sync.Mutex{}
+	subscribed := make(map[string]bool)
+	go func() {
+		buf := make([]byte, 2<<20)
+		for {
+			n, err := ws.Read(buf)
+			h.debugLogf(ws, "received frame: %q", buf[:n])
+			if err != nil || n == len(buf) {
+				break
+			}
+			msg := make(map[string]interface{})
+			err = json.Unmarshal(buf[:n], &msg)
+			if err != nil {
+				break
+			}
+			h.debugLogf(ws, "received message: %+v", msg)
+			h.debugLogf(ws, "subscribing to *")
+			subscribed["*"] = true
+		}
+		done <- struct{}{}
+	}()
+	go func(queue <-chan *event) {
+		for e := range queue {
+			detail := e.Detail(nil)
+			if detail == nil {
+				continue
+			}
+			// FIXME: check permission
+			buf, err := json.Marshal(map[string]interface{}{
+				"msgID":             e.Serial,
+				"id":                detail.ID,
+				"uuid":              detail.UUID,
+				"object_uuid":       detail.ObjectUUID,
+				"object_owner_uuid": detail.ObjectOwnerUUID,
+				"event_type":        detail.EventType,
+			})
+			if err != nil {
+				log.Printf("error encoding: ", err)
+				continue
+			}
+			_, err = ws.Write(append(buf, byte('\n')))
+			if  err != nil {
+				h.debugLogf(ws, "handlerV0: write: %s", err)
+				break
+			}
+		}
+		done <- struct{}{}
+	}(queue)
+	go func() {
+		send := func(e *event) {
+			if queue == nil {
+				return
+			}
+			select {
+			case queue <- e:
+			default:
+				close(queue)
+				queue = nil
+				done <- struct{}{}
+			}
+		}
+		for e := range events {
+			detail := e.Detail(nil)
+			mtx.Lock()
+			switch {
+			case subscribed["*"]:
+				send(e)
+			case detail == nil:
+			case subscribed[detail.ObjectUUID]:
+				send(e)
+			case subscribed[detail.ObjectOwnerUUID]:
+				send(e)
+			}
+			mtx.Unlock()
+		}
+		done <- struct{}{}
+	}()
+	<-done
 }
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 4b3f12b..f7b3237 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,12 +1,12 @@
 package main
 
 import (
-	"io"
+	"golang.org/x/net/websocket"
 )
 
 type handlerV1 struct {
 	QueueSize int
 }
 
-func (h *handlerV1) Handle(ws io.ReadWriter, events <-chan *event) {
+func (h *handlerV1) Handle(ws *websocket.Conn, events <-chan *event) {
 }
diff --git a/services/ws/router.go b/services/ws/router.go
index 01c1477..e4e102b 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,6 +1,8 @@
 package main
 
 import (
+	"encoding/json"
+	"fmt"
 	"log"
 	"net/http"
 	"sync"
@@ -32,12 +34,12 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
-			log.Printf("socket request: %+v", ws.Request())
+			log.Printf("%v accepted", ws.Request().RemoteAddr)
 			sink := rtr.eventSource.NewSink(nil)
 			handler.Handle(ws, sink.Channel())
 			sink.Stop()
 			ws.Close()
-			log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
+			log.Printf("%v disconnected", ws.Request().RemoteAddr)
 		}),
 	}
 }
@@ -45,4 +47,11 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rtr.setupOnce.Do(rtr.setup)
 	rtr.mux.ServeHTTP(resp, req)
+	j, err := json.Marshal(map[string]interface{}{
+		"req": fmt.Sprintf("%+v", req),
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	log.Print(string(j))
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list