[ARVADOS] updated: 219daa3b227f147cb628a27b823b9b4e1f8d32b1
Git user
git at public.curoverse.com
Sun Nov 13 20:59:46 EST 2016
Summary of changes:
services/ws/event.go | 5 +--
services/ws/handler.go | 8 ++++-
services/ws/handler_v0.go | 91 +++++++++++++++++++++++++++++++++++++++++++++--
services/ws/handler_v1.go | 3 +-
services/ws/router.go | 13 +++++--
5 files changed, 111 insertions(+), 9 deletions(-)
via 219daa3b227f147cb628a27b823b9b4e1f8d32b1 (commit)
via 6e2b2d15e8c3b7926cd7b300660698fa23644efc (commit)
from 7cb536fa58d8cc837b4cb59680c7355a1687648b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 219daa3b227f147cb628a27b823b9b4e1f8d32b1
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 20:59:40 2016 -0500
8460: Hide *websocket.Conn behind interface.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 96219ed..cd3374e 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,9 +1,15 @@
package main
import (
- "golang.org/x/net/websocket"
+ "io"
+ "net/http"
)
type handler interface {
- Handle(*websocket.Conn, <-chan *event)
+ Handle(wsConn, <-chan *event)
+}
+
+type wsConn interface {
+ io.ReadWriter
+ Request() *http.Request
}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 960698f..451af86 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -4,20 +4,18 @@ import (
"encoding/json"
"log"
"sync"
-
- "golang.org/x/net/websocket"
)
type handlerV0 struct {
QueueSize int
}
-func (h *handlerV0) debugLogf(ws *websocket.Conn, s string, args ...interface{}) {
+func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
args = append([]interface{}{ws.Request().RemoteAddr}, args...)
debugLogf("%s "+s, args...)
}
-func (h *handlerV0) Handle(ws *websocket.Conn, events <-chan *event) {
+func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
done := make(chan struct{}, 3)
queue := make(chan *event, h.QueueSize)
mtx := sync.Mutex{}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index f7b3237..91f3d34 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,12 +1,11 @@
package main
import (
- "golang.org/x/net/websocket"
)
type handlerV1 struct {
QueueSize int
}
-func (h *handlerV1) Handle(ws *websocket.Conn, events <-chan *event) {
+func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
}
commit 6e2b2d15e8c3b7926cd7b300660698fa23644efc
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 20:56:04 2016 -0500
8460: Send events.
diff --git a/services/ws/event.go b/services/ws/event.go
index 1634a7a..7e27e09 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -34,12 +34,13 @@ type event struct {
func (e *event) Detail(db *sql.DB) *arvados.Log {
e.mtx.Lock()
defer e.mtx.Unlock()
- if e.logRow != nil || e.err != nil {
+ if e.logRow != nil || e.err != nil || db == nil {
return e.logRow
}
var logRow arvados.Log
var oldAttrs, newAttrs []byte
- e.err = db.QueryRow(`SELECT uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+ e.err = db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+ &logRow.ID,
&logRow.UUID,
&logRow.ObjectUUID,
&logRow.ObjectOwnerUUID,
diff --git a/services/ws/handler.go b/services/ws/handler.go
index c768143..96219ed 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,9 +1,9 @@
package main
import (
- "io"
+ "golang.org/x/net/websocket"
)
type handler interface {
- Handle(io.ReadWriter, <-chan *event)
+ Handle(*websocket.Conn, <-chan *event)
}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index 40ab75b..960698f 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -1,12 +1,101 @@
package main
import (
- "io"
+ "encoding/json"
+ "log"
+ "sync"
+
+ "golang.org/x/net/websocket"
)
type handlerV0 struct {
QueueSize int
}
-func (h *handlerV0) Handle(ws io.ReadWriter, events <-chan *event) {
+func (h *handlerV0) debugLogf(ws *websocket.Conn, s string, args ...interface{}) {
+ args = append([]interface{}{ws.Request().RemoteAddr}, args...)
+ debugLogf("%s "+s, args...)
+}
+
+func (h *handlerV0) Handle(ws *websocket.Conn, events <-chan *event) {
+ done := make(chan struct{}, 3)
+ queue := make(chan *event, h.QueueSize)
+ mtx := sync.Mutex{}
+ subscribed := make(map[string]bool)
+ go func() {
+ buf := make([]byte, 2<<20)
+ for {
+ n, err := ws.Read(buf)
+ h.debugLogf(ws, "received frame: %q", buf[:n])
+ if err != nil || n == len(buf) {
+ break
+ }
+ msg := make(map[string]interface{})
+ err = json.Unmarshal(buf[:n], &msg)
+ if err != nil {
+ break
+ }
+ h.debugLogf(ws, "received message: %+v", msg)
+ h.debugLogf(ws, "subscribing to *")
+ subscribed["*"] = true
+ }
+ done <- struct{}{}
+ }()
+ go func(queue <-chan *event) {
+ for e := range queue {
+ detail := e.Detail(nil)
+ if detail == nil {
+ continue
+ }
+ // FIXME: check permission
+ buf, err := json.Marshal(map[string]interface{}{
+ "msgID": e.Serial,
+ "id": detail.ID,
+ "uuid": detail.UUID,
+ "object_uuid": detail.ObjectUUID,
+ "object_owner_uuid": detail.ObjectOwnerUUID,
+ "event_type": detail.EventType,
+ })
+ if err != nil {
+ log.Printf("error encoding: ", err)
+ continue
+ }
+ _, err = ws.Write(append(buf, byte('\n')))
+ if err != nil {
+ h.debugLogf(ws, "handlerV0: write: %s", err)
+ break
+ }
+ }
+ done <- struct{}{}
+ }(queue)
+ go func() {
+ send := func(e *event) {
+ if queue == nil {
+ return
+ }
+ select {
+ case queue <- e:
+ default:
+ close(queue)
+ queue = nil
+ done <- struct{}{}
+ }
+ }
+ for e := range events {
+ detail := e.Detail(nil)
+ mtx.Lock()
+ switch {
+ case subscribed["*"]:
+ send(e)
+ case detail == nil:
+ case subscribed[detail.ObjectUUID]:
+ send(e)
+ case subscribed[detail.ObjectOwnerUUID]:
+ send(e)
+ }
+ mtx.Unlock()
+ }
+ done <- struct{}{}
+ }()
+ <-done
}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 4b3f12b..f7b3237 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -1,12 +1,12 @@
package main
import (
- "io"
+ "golang.org/x/net/websocket"
)
type handlerV1 struct {
QueueSize int
}
-func (h *handlerV1) Handle(ws io.ReadWriter, events <-chan *event) {
+func (h *handlerV1) Handle(ws *websocket.Conn, events <-chan *event) {
}
diff --git a/services/ws/router.go b/services/ws/router.go
index 01c1477..e4e102b 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,6 +1,8 @@
package main
import (
+ "encoding/json"
+ "fmt"
"log"
"net/http"
"sync"
@@ -32,12 +34,12 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
- log.Printf("socket request: %+v", ws.Request())
+ log.Printf("%v accepted", ws.Request().RemoteAddr)
sink := rtr.eventSource.NewSink(nil)
handler.Handle(ws, sink.Channel())
sink.Stop()
ws.Close()
- log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
+ log.Printf("%v disconnected", ws.Request().RemoteAddr)
}),
}
}
@@ -45,4 +47,11 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
rtr.mux.ServeHTTP(resp, req)
+ j, err := json.Marshal(map[string]interface{}{
+ "req": fmt.Sprintf("%+v", req),
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Print(string(j))
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list