[ARVADOS] updated: 07ab65aebb290f77b26ce2814f4ebc35b8339af6
Git user
git at public.curoverse.com
Mon Nov 14 17:03:45 EST 2016
Summary of changes:
services/ws/handler.go | 2 +-
services/ws/pg.go | 32 +++++++++++++------
services/ws/session.go | 2 +-
services/ws/session_v0.go | 79 ++++++++++++++++++++++++++++++++++++++++++++---
4 files changed, 98 insertions(+), 17 deletions(-)
via 07ab65aebb290f77b26ce2814f4ebc35b8339af6 (commit)
via 120892a45ffa5a39ca253deb78a2eb352de408b4 (commit)
from 748e1bba296d4d05252d0fbd9f75764234d9166d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 07ab65aebb290f77b26ce2814f4ebc35b8339af6
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Nov 14 17:03:40 2016 -0500
8460: Obey event_type filters if given in all subscription requests.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index e2aa6ca..1c9d5ba 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -64,7 +64,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
stop <- err
return
}
- sess.Receive(msg)
+ sess.Receive(msg, buf[:n])
}
}()
diff --git a/services/ws/session.go b/services/ws/session.go
index db437b5..98164e3 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,7 +1,7 @@
package main
type session interface {
- Receive(map[string]interface{})
+ Receive(map[string]interface{}, []byte)
EventMessage(*event) ([]byte, error)
Filter(*event) bool
debugLogf(string, ...interface{})
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3b24a7f..122767b 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -18,15 +18,24 @@ type sessionV0 struct {
ws wsConn
permChecker permChecker
subscribed map[string]bool
+ eventTypes map[string]bool
mtx sync.Mutex
setupOnce sync.Once
}
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+}
+
+type v0filter []interface{}
+
func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
sess := &sessionV0{
ws: ws,
permChecker: NewPermChecker(ac),
subscribed: make(map[string]bool),
+ eventTypes: make(map[string]bool),
}
err := ws.Request().ParseForm()
@@ -46,10 +55,64 @@ func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
debugLogf("%s "+s, args...)
}
-func (sess *sessionV0) Receive(msg map[string]interface{}) {
+// If every client subscription message includes filters consisting
+// only of [["event_type","in",...]] then send only the requested
+// event types. Otherwise, clear sess.eventTypes and send all event
+// types from now on.
+func (sess *sessionV0) checkFilters(filters []v0filter) {
+ if sess.eventTypes == nil {
+ // Already received a subscription request without
+ // event_type filters.
+ return
+ }
+ eventTypes := sess.eventTypes
+ sess.eventTypes = nil
+ if len(filters) == 0 {
+ return
+ }
+ useFilters := false
+ for _, f := range filters {
+ col, ok := f[0].(string)
+ if !ok || col != "event_type" {
+ continue
+ }
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ return
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ return
+ }
+ useFilters = true
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ eventTypes[s] = true
+ } else {
+ return
+ }
+ }
+ }
+ if useFilters {
+ sess.debugLogf("eventTypes %+v", eventTypes)
+ sess.eventTypes = eventTypes
+ }
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
sess.debugLogf("received message: %+v", msg)
- sess.debugLogf("subscribing to *")
- sess.subscribed["*"] = true
+ var sub v0subscribe
+ if err := json.Unmarshal(buf, &sub); err != nil {
+ sess.debugLogf("ignored unrecognized request: %s", err)
+ return
+ }
+ if sub.Method == "subscribe" {
+ sess.debugLogf("subscribing to *")
+ sess.mtx.Lock()
+ sess.checkFilters(sub.Filters)
+ sess.subscribed["*"] = true
+ sess.mtx.Unlock()
+ }
}
func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
@@ -63,14 +126,18 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
return nil, err
}
- return json.Marshal(map[string]interface{}{
+ msg := map[string]interface{}{
"msgID": e.Serial,
"id": detail.ID,
"uuid": detail.UUID,
"object_uuid": detail.ObjectUUID,
"object_owner_uuid": detail.ObjectOwnerUUID,
"event_type": detail.EventType,
- })
+ }
+ if detail.Properties != nil && detail.Properties["text"] != nil {
+ msg["properties"] = detail.Properties
+ }
+ return json.Marshal(msg)
}
func (sess *sessionV0) Filter(e *event) bool {
@@ -78,6 +145,8 @@ func (sess *sessionV0) Filter(e *event) bool {
sess.mtx.Lock()
defer sess.mtx.Unlock()
switch {
+ case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
+ return false
case sess.subscribed["*"]:
return true
case detail == nil:
commit 120892a45ffa5a39ca253deb78a2eb352de408b4
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Nov 14 16:10:32 2016 -0500
8460: Avoid log.Fatal once started.
diff --git a/services/ws/pg.go b/services/ws/pg.go
index e766f6c..a5af9f7 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,6 +2,7 @@ package main
import (
"database/sql"
+ "fmt"
"log"
"strconv"
"strings"
@@ -30,18 +31,18 @@ type pgEventSource struct {
DataSource string
QueueSize int
+ db *sql.DB
pqListener *pq.Listener
sinks map[*pgEventSink]bool
setupOnce sync.Once
mtx sync.Mutex
+ shutdown chan error
}
func (ps *pgEventSource) setup() {
+ ps.shutdown = make(chan error, 1)
ps.sinks = make(map[*pgEventSink]bool)
- go ps.run()
-}
-func (ps *pgEventSource) run() {
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
log.Fatalf("sql.Open: %s", err)
@@ -49,23 +50,27 @@ func (ps *pgEventSource) run() {
if err = db.Ping(); err != nil {
log.Fatalf("db.Ping: %s", err)
}
+ ps.db = db
- listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
if err != nil {
// Until we have a mechanism for catching up
// on missed events, we cannot recover from a
// dropped connection without breaking our
// promises to clients.
- log.Fatalf("pgEventSource listener problem: %s", err)
+ ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
}
})
- err = listener.Listen("logs")
+ err = ps.pqListener.Listen("logs")
if err != nil {
log.Fatal(err)
}
-
debugLogf("pgEventSource listening")
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
eventQueue := make(chan *event, ps.QueueSize)
go func() {
@@ -90,11 +95,18 @@ func (ps *pgEventSource) run() {
defer ticker.Stop()
for {
select {
+ case err, ok := <-ps.shutdown:
+ if ok {
+ debugLogf("shutdown on error: %s", err)
+ }
+ close(eventQueue)
+ return
+
case <-ticker.C:
debugLogf("pgEventSource listener ping")
- listener.Ping()
+ ps.pqListener.Ping()
- case pqEvent, ok := <-listener.Notify:
+ case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
close(eventQueue)
return
@@ -112,7 +124,7 @@ func (ps *pgEventSource) run() {
LogID: logID,
Received: time.Now(),
Serial: serial,
- db: db,
+ db: ps.db,
}
debugLogf("event %d %+v", e.Serial, e)
eventQueue <- e
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list