[ARVADOS] updated: 07ab65aebb290f77b26ce2814f4ebc35b8339af6

Git user git at public.curoverse.com
Mon Nov 14 17:03:45 EST 2016


Summary of changes:
 services/ws/handler.go    |  2 +-
 services/ws/pg.go         | 32 +++++++++++++------
 services/ws/session.go    |  2 +-
 services/ws/session_v0.go | 79 ++++++++++++++++++++++++++++++++++++++++++++---
 4 files changed, 98 insertions(+), 17 deletions(-)

       via  07ab65aebb290f77b26ce2814f4ebc35b8339af6 (commit)
       via  120892a45ffa5a39ca253deb78a2eb352de408b4 (commit)
      from  748e1bba296d4d05252d0fbd9f75764234d9166d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 07ab65aebb290f77b26ce2814f4ebc35b8339af6
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 17:03:40 2016 -0500

    8460: Obey event_type filters if given in all subscription requests.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index e2aa6ca..1c9d5ba 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -64,7 +64,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 				stop <- err
 				return
 			}
-			sess.Receive(msg)
+			sess.Receive(msg, buf[:n])
 		}
 	}()
 
diff --git a/services/ws/session.go b/services/ws/session.go
index db437b5..98164e3 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,7 +1,7 @@
 package main
 
 type session interface {
-	Receive(map[string]interface{})
+	Receive(map[string]interface{}, []byte)
 	EventMessage(*event) ([]byte, error)
 	Filter(*event) bool
 	debugLogf(string, ...interface{})
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3b24a7f..122767b 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -18,15 +18,24 @@ type sessionV0 struct {
 	ws          wsConn
 	permChecker permChecker
 	subscribed  map[string]bool
+	eventTypes  map[string]bool
 	mtx         sync.Mutex
 	setupOnce   sync.Once
 }
 
+type v0subscribe struct {
+	Method  string
+	Filters []v0filter
+}
+
+type v0filter []interface{}
+
 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
 	sess := &sessionV0{
 		ws:          ws,
 		permChecker: NewPermChecker(ac),
 		subscribed:  make(map[string]bool),
+		eventTypes:  make(map[string]bool),
 	}
 
 	err := ws.Request().ParseForm()
@@ -46,10 +55,64 @@ func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
 	debugLogf("%s "+s, args...)
 }
 
-func (sess *sessionV0) Receive(msg map[string]interface{}) {
+// If every client subscription message includes filters consisting
+// only of [["event_type","in",...]] then send only the requested
+// event types. Otherwise, clear sess.eventTypes and send all event
+// types from now on.
+func (sess *sessionV0) checkFilters(filters []v0filter) {
+	if sess.eventTypes == nil {
+		// Already received a subscription request without
+		// event_type filters.
+		return
+	}
+	eventTypes := sess.eventTypes
+	sess.eventTypes = nil
+	if len(filters) == 0 {
+		return
+	}
+	useFilters := false
+	for _, f := range filters {
+		col, ok := f[0].(string)
+		if !ok || col != "event_type" {
+			continue
+		}
+		op, ok := f[1].(string)
+		if !ok || op != "in" {
+			return
+		}
+		arr, ok := f[2].([]interface{})
+		if !ok {
+			return
+		}
+		useFilters = true
+		for _, s := range arr {
+			if s, ok := s.(string); ok {
+				eventTypes[s] = true
+			} else {
+				return
+			}
+		}
+	}
+	if useFilters {
+		sess.debugLogf("eventTypes %+v", eventTypes)
+		sess.eventTypes = eventTypes
+	}
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
 	sess.debugLogf("received message: %+v", msg)
-	sess.debugLogf("subscribing to *")
-	sess.subscribed["*"] = true
+	var sub v0subscribe
+	if err := json.Unmarshal(buf, &sub); err != nil {
+		sess.debugLogf("ignored unrecognized request: %s", err)
+		return
+	}
+	if sub.Method == "subscribe" {
+		sess.debugLogf("subscribing to *")
+		sess.mtx.Lock()
+		sess.checkFilters(sub.Filters)
+		sess.subscribed["*"] = true
+		sess.mtx.Unlock()
+	}
 }
 
 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
@@ -63,14 +126,18 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 		return nil, err
 	}
 
-	return json.Marshal(map[string]interface{}{
+	msg := map[string]interface{}{
 		"msgID":             e.Serial,
 		"id":                detail.ID,
 		"uuid":              detail.UUID,
 		"object_uuid":       detail.ObjectUUID,
 		"object_owner_uuid": detail.ObjectOwnerUUID,
 		"event_type":        detail.EventType,
-	})
+	}
+	if detail.Properties != nil && detail.Properties["text"] != nil {
+		msg["properties"] = detail.Properties
+	}
+	return json.Marshal(msg)
 }
 
 func (sess *sessionV0) Filter(e *event) bool {
@@ -78,6 +145,8 @@ func (sess *sessionV0) Filter(e *event) bool {
 	sess.mtx.Lock()
 	defer sess.mtx.Unlock()
 	switch {
+	case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
+		return false
 	case sess.subscribed["*"]:
 		return true
 	case detail == nil:

commit 120892a45ffa5a39ca253deb78a2eb352de408b4
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 16:10:32 2016 -0500

    8460: Avoid log.Fatal once started.

diff --git a/services/ws/pg.go b/services/ws/pg.go
index e766f6c..a5af9f7 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"database/sql"
+	"fmt"
 	"log"
 	"strconv"
 	"strings"
@@ -30,18 +31,18 @@ type pgEventSource struct {
 	DataSource string
 	QueueSize  int
 
+	db         *sql.DB
 	pqListener *pq.Listener
 	sinks      map[*pgEventSink]bool
 	setupOnce  sync.Once
 	mtx        sync.Mutex
+	shutdown   chan error
 }
 
 func (ps *pgEventSource) setup() {
+	ps.shutdown = make(chan error, 1)
 	ps.sinks = make(map[*pgEventSink]bool)
-	go ps.run()
-}
 
-func (ps *pgEventSource) run() {
 	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
 		log.Fatalf("sql.Open: %s", err)
@@ -49,23 +50,27 @@ func (ps *pgEventSource) run() {
 	if err = db.Ping(); err != nil {
 		log.Fatalf("db.Ping: %s", err)
 	}
+	ps.db = db
 
-	listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
 		if err != nil {
 			// Until we have a mechanism for catching up
 			// on missed events, we cannot recover from a
 			// dropped connection without breaking our
 			// promises to clients.
-			log.Fatalf("pgEventSource listener problem: %s", err)
+			ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
 		}
 	})
-	err = listener.Listen("logs")
+	err = ps.pqListener.Listen("logs")
 	if err != nil {
 		log.Fatal(err)
 	}
-
 	debugLogf("pgEventSource listening")
 
+	go ps.run()
+}
+
+func (ps *pgEventSource) run() {
 	eventQueue := make(chan *event, ps.QueueSize)
 
 	go func() {
@@ -90,11 +95,18 @@ func (ps *pgEventSource) run() {
 	defer ticker.Stop()
 	for {
 		select {
+		case err, ok := <-ps.shutdown:
+			if ok {
+				debugLogf("shutdown on error: %s", err)
+			}
+			close(eventQueue)
+			return
+
 		case <-ticker.C:
 			debugLogf("pgEventSource listener ping")
-			listener.Ping()
+			ps.pqListener.Ping()
 
-		case pqEvent, ok := <-listener.Notify:
+		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
 				close(eventQueue)
 				return
@@ -112,7 +124,7 @@ func (ps *pgEventSource) run() {
 				LogID:    logID,
 				Received: time.Now(),
 				Serial:   serial,
-				db:       db,
+				db:       ps.db,
 			}
 			debugLogf("event %d %+v", e.Serial, e)
 			eventQueue <- e

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list