[ARVADOS] updated: 3e39ed33427a0c3000ed41b4826fcfa182934f71

Git user git at public.curoverse.com
Tue Nov 15 02:58:34 EST 2016


Summary of changes:
 build/run-tests.sh                          |  4 +-
 sdk/python/tests/nginx.conf                 | 16 +++++++
 sdk/python/tests/run_test_server.py         | 65 ++++++++++++++++++++++++++++-
 services/api/config/application.default.yml |  1 +
 services/keep-web/handler_test.go           |  2 +-
 services/ws/event.go                        |  2 +-
 services/ws/main.go                         |  2 +
 services/ws/router.go                       | 24 ++++++++---
 services/ws/session_v0.go                   |  2 +
 9 files changed, 109 insertions(+), 9 deletions(-)

  discards  07ab65aebb290f77b26ce2814f4ebc35b8339af6 (commit)
  discards  120892a45ffa5a39ca253deb78a2eb352de408b4 (commit)
  discards  748e1bba296d4d05252d0fbd9f75764234d9166d (commit)
  discards  0770b0ece2046cb33f598ff67168f4d08e2d0c87 (commit)
  discards  d07dbb66a0e06d07a1b6159d6121d924a06dbb58 (commit)
  discards  2ec0fa427f0cd0008c43d0312d338fc6eab07b74 (commit)
  discards  81bf02bfe738c29d4d8315b12802d55f4ad5951b (commit)
  discards  e2a5da6ba8b3c4633bca0bcc347c8b7649bb338c (commit)
  discards  f6f89d77bd90207e79ea20c5bbc4c479db8ef1de (commit)
  discards  d7e1efb7a5ce12175a3eaaedba5725e09e37363a (commit)
       via  3e39ed33427a0c3000ed41b4826fcfa182934f71 (commit)
       via  e12c0414039f7e818e31aa4765a5e69666333069 (commit)
       via  f224b7ff4c38027309292a01516cd1df9f158a3e (commit)
       via  53b116a7dac30b01f4080ea2832a68e7b885db51 (commit)
       via  2a4aae5d6b865e959b97386b84386358e56ebd50 (commit)
       via  bbf8780255b9c04d0aaa95ef055cb8e777df11be (commit)
       via  87363153b74c9137b6113a5c62da475e02908d9a (commit)
       via  54aa46e52ff46fa667ec333e6bbaa00e0382f6f4 (commit)
       via  0f2a62815c97a47b610b76a748a7d092c89e3e7b (commit)
       via  83f085a0c405695d9840a0542eb9746e7b5d3f6f (commit)
       via  f1c316086c32a0c19cfbf4ac7bb122d3e8cd049b (commit)
       via  0040a5a5b2e82f20641278584ecae835c1c537d3 (commit)
       via  da44904bb64303db2ae01aa9978188c20d453445 (commit)
       via  f517492fccb7cec138621cd0e710668877be57ce (commit)
       via  104b2b5a46844e94a37c332b4ddd5a861dd8d63d (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (07ab65aebb290f77b26ce2814f4ebc35b8339af6)
            \
             N -- N -- N (3e39ed33427a0c3000ed41b4826fcfa182934f71)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3e39ed33427a0c3000ed41b4826fcfa182934f71
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 02:47:51 2016 -0500

    8460: Use websocket server for integration tests.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index d993f47..c771bdc 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -269,10 +269,11 @@ start_api() {
 }
 
 start_nginx_proxy_services() {
-    echo 'Starting keepproxy, keep-web, arv-git-httpd, and nginx ssl proxy...'
+    echo 'Starting keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
@@ -284,6 +285,7 @@ stop_services() {
         cd "$WORKSPACE" \
             && python sdk/python/tests/run_test_server.py stop_nginx \
             && python sdk/python/tests/run_test_server.py stop_arv-git-httpd \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop_keep-web \
             && python sdk/python/tests/run_test_server.py stop_keep_proxy
     fi
diff --git a/sdk/python/tests/nginx.conf b/sdk/python/tests/nginx.conf
index 2b8b6ca..0066040 100644
--- a/sdk/python/tests/nginx.conf
+++ b/sdk/python/tests/nginx.conf
@@ -54,4 +54,20 @@ http {
       proxy_redirect //download:{{KEEPWEBPORT}}/ https://$host:{{KEEPWEBDLSSLPORT}}/;
     }
   }
+  upstream ws {
+    server localhost:{{WSPORT}};
+  }
+  server {
+    listen *:{{WSSPORT}} ssl default_server;
+    server_name ~^(?<request_host>.*)$;
+    ssl_certificate {{SSLCERT}};
+    ssl_certificate_key {{SSLKEY}};
+    location  / {
+      proxy_pass http://ws;
+      proxy_set_header Upgrade $http_upgrade;
+      proxy_set_header Connection "upgrade";
+      proxy_set_header Host $request_host:{{WSPORT}};
+      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
+    }
+  }
 }
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index 642b7cc..5ef5e2a 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -44,6 +44,7 @@ if not os.path.exists(TEST_TMPDIR):
 
 my_api_host = None
 _cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -284,10 +285,16 @@ def run(leave_running_atexit=False):
         os.makedirs(gitdir)
     subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
 
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
+
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    env.pop('ARVADOS_WEBSOCKETS', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -360,6 +367,45 @@ def stop(force=False):
         kill_server_pid(_pidfile('api'))
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
@@ -545,6 +591,8 @@ def run_nginx():
     nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
     nginxconf['GITPORT'] = _getport('arv-git-httpd')
     nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
     nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
@@ -593,7 +641,15 @@ def _getport(program):
     except IOError:
         return 9
 
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
 def _apiconfig(key):
+    global _cached_config
     if _cached_config:
         return _cached_config[key]
     def _load(f, required=True):
@@ -647,6 +703,7 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_WEB_SERVER = None
@@ -667,6 +724,7 @@ class TestCaseWithServers(unittest.TestCase):
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
                 (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
@@ -693,6 +751,7 @@ class TestCaseWithServers(unittest.TestCase):
 if __name__ == "__main__":
     actions = [
         'start', 'stop',
+        'start_ws', 'stop_ws',
         'start_keep', 'stop_keep',
         'start_keep_proxy', 'stop_keep_proxy',
         'start_keep-web', 'stop_keep-web',
@@ -725,6 +784,10 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
         run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
diff --git a/services/api/config/application.default.yml b/services/api/config/application.default.yml
index a9aa953..ab560d7 100644
--- a/services/api/config/application.default.yml
+++ b/services/api/config/application.default.yml
@@ -444,3 +444,4 @@ test:
   workbench_address: https://localhost:3001/
   git_repositories_dir: <%= Rails.root.join 'tmp', 'git', 'test' %>
   git_internal_dir: <%= Rails.root.join 'tmp', 'internal.git' %>
+  websocket_address: "wss://0.0.0.0:<%= ENV['ARVADOS_TEST_WSS_PORT'] %>/websocket"

commit e12c0414039f7e818e31aa4765a5e69666333069
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 02:42:25 2016 -0500

    8460: JSON request log.

diff --git a/services/ws/router.go b/services/ws/router.go
index e829ce8..69654cb 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -2,10 +2,10 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
 	"log"
 	"net/http"
 	"sync"
+	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"golang.org/x/net/websocket"
@@ -37,22 +37,36 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session,
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
-			log.Printf("%v accepted", ws.Request().RemoteAddr)
 			sink := rtr.eventSource.NewSink()
 			handler.Handle(ws, sink.Channel())
 			sink.Stop()
 			ws.Close()
-			log.Printf("%v disconnected", ws.Request().RemoteAddr)
 		}),
 	}
 }
 
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rtr.setupOnce.Do(rtr.setup)
+	t0 := time.Now()
+	reqLog(map[string]interface{}{
+		"Connect":         req.RemoteAddr,
+		"RemoteAddr":      req.RemoteAddr,
+		"X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+		"Time":            t0.UTC(),
+	})
 	rtr.mux.ServeHTTP(resp, req)
-	j, err := json.Marshal(map[string]interface{}{
-		"req": fmt.Sprintf("%+v", req),
+	t1 := time.Now()
+	reqLog(map[string]interface{}{
+		"Disconnect":      req.RemoteAddr,
+		"RemoteAddr":      req.RemoteAddr,
+		"X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
+		"Time":            t1.UTC(),
+		"Elapsed":         time.Now().Sub(t0).Seconds(),
 	})
+}
+
+func reqLog(m map[string]interface{}) {
+	j, err := json.Marshal(m)
 	if err != nil {
 		log.Fatal(err)
 	}

commit f224b7ff4c38027309292a01516cd1df9f158a3e
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 02:12:25 2016 -0500

    8460: Skip non-log events if filtering by event_type.

diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 122767b..467d156 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -145,6 +145,8 @@ func (sess *sessionV0) Filter(e *event) bool {
 	sess.mtx.Lock()
 	defer sess.mtx.Unlock()
 	switch {
+	case sess.eventTypes != nil && detail == nil:
+		return false
 	case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
 		return false
 	case sess.subscribed["*"]:

commit 53b116a7dac30b01f4080ea2832a68e7b885db51
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 02:11:45 2016 -0500

    8460: Print listening address at startup.

diff --git a/services/ws/main.go b/services/ws/main.go
index c0f4dd5..a143ae9 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -50,5 +50,7 @@ func main() {
 		},
 	}
 	eventSource.NewSink().Stop()
+
+	log.Printf("listening at %s", srv.Addr)
 	log.Fatal(srv.ListenAndServe())
 }

commit 2a4aae5d6b865e959b97386b84386358e56ebd50
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 02:11:27 2016 -0500

    8460: Accept nulls in some columns.

diff --git a/services/ws/event.go b/services/ws/event.go
index e34b6b4..09c9d0f 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -41,7 +41,7 @@ func (e *event) Detail() *arvados.Log {
 	}
 	var logRow arvados.Log
 	var propYAML []byte
-	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
+	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, COALESCE(object_owner_uuid,''), COALESCE(event_type,''), created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
 		&logRow.ID,
 		&logRow.UUID,
 		&logRow.ObjectUUID,

commit bbf8780255b9c04d0aaa95ef055cb8e777df11be
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 17:03:40 2016 -0500

    8460: Obey event_type filters if given in all subscription requests.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index e2aa6ca..1c9d5ba 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -64,7 +64,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 				stop <- err
 				return
 			}
-			sess.Receive(msg)
+			sess.Receive(msg, buf[:n])
 		}
 	}()
 
diff --git a/services/ws/session.go b/services/ws/session.go
index db437b5..98164e3 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,7 +1,7 @@
 package main
 
 type session interface {
-	Receive(map[string]interface{})
+	Receive(map[string]interface{}, []byte)
 	EventMessage(*event) ([]byte, error)
 	Filter(*event) bool
 	debugLogf(string, ...interface{})
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3b24a7f..122767b 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -18,15 +18,24 @@ type sessionV0 struct {
 	ws          wsConn
 	permChecker permChecker
 	subscribed  map[string]bool
+	eventTypes  map[string]bool
 	mtx         sync.Mutex
 	setupOnce   sync.Once
 }
 
+type v0subscribe struct {
+	Method  string
+	Filters []v0filter
+}
+
+type v0filter []interface{}
+
 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
 	sess := &sessionV0{
 		ws:          ws,
 		permChecker: NewPermChecker(ac),
 		subscribed:  make(map[string]bool),
+		eventTypes:  make(map[string]bool),
 	}
 
 	err := ws.Request().ParseForm()
@@ -46,10 +55,64 @@ func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
 	debugLogf("%s "+s, args...)
 }
 
-func (sess *sessionV0) Receive(msg map[string]interface{}) {
+// If every client subscription message includes filters consisting
+// only of [["event_type","in",...]] then send only the requested
+// event types. Otherwise, clear sess.eventTypes and send all event
+// types from now on.
+func (sess *sessionV0) checkFilters(filters []v0filter) {
+	if sess.eventTypes == nil {
+		// Already received a subscription request without
+		// event_type filters.
+		return
+	}
+	eventTypes := sess.eventTypes
+	sess.eventTypes = nil
+	if len(filters) == 0 {
+		return
+	}
+	useFilters := false
+	for _, f := range filters {
+		col, ok := f[0].(string)
+		if !ok || col != "event_type" {
+			continue
+		}
+		op, ok := f[1].(string)
+		if !ok || op != "in" {
+			return
+		}
+		arr, ok := f[2].([]interface{})
+		if !ok {
+			return
+		}
+		useFilters = true
+		for _, s := range arr {
+			if s, ok := s.(string); ok {
+				eventTypes[s] = true
+			} else {
+				return
+			}
+		}
+	}
+	if useFilters {
+		sess.debugLogf("eventTypes %+v", eventTypes)
+		sess.eventTypes = eventTypes
+	}
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
 	sess.debugLogf("received message: %+v", msg)
-	sess.debugLogf("subscribing to *")
-	sess.subscribed["*"] = true
+	var sub v0subscribe
+	if err := json.Unmarshal(buf, &sub); err != nil {
+		sess.debugLogf("ignored unrecognized request: %s", err)
+		return
+	}
+	if sub.Method == "subscribe" {
+		sess.debugLogf("subscribing to *")
+		sess.mtx.Lock()
+		sess.checkFilters(sub.Filters)
+		sess.subscribed["*"] = true
+		sess.mtx.Unlock()
+	}
 }
 
 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
@@ -63,14 +126,18 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 		return nil, err
 	}
 
-	return json.Marshal(map[string]interface{}{
+	msg := map[string]interface{}{
 		"msgID":             e.Serial,
 		"id":                detail.ID,
 		"uuid":              detail.UUID,
 		"object_uuid":       detail.ObjectUUID,
 		"object_owner_uuid": detail.ObjectOwnerUUID,
 		"event_type":        detail.EventType,
-	})
+	}
+	if detail.Properties != nil && detail.Properties["text"] != nil {
+		msg["properties"] = detail.Properties
+	}
+	return json.Marshal(msg)
 }
 
 func (sess *sessionV0) Filter(e *event) bool {
@@ -78,6 +145,8 @@ func (sess *sessionV0) Filter(e *event) bool {
 	sess.mtx.Lock()
 	defer sess.mtx.Unlock()
 	switch {
+	case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
+		return false
 	case sess.subscribed["*"]:
 		return true
 	case detail == nil:

commit 87363153b74c9137b6113a5c62da475e02908d9a
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 16:10:32 2016 -0500

    8460: Avoid log.Fatal once started.

diff --git a/services/ws/pg.go b/services/ws/pg.go
index e766f6c..a5af9f7 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"database/sql"
+	"fmt"
 	"log"
 	"strconv"
 	"strings"
@@ -30,18 +31,18 @@ type pgEventSource struct {
 	DataSource string
 	QueueSize  int
 
+	db         *sql.DB
 	pqListener *pq.Listener
 	sinks      map[*pgEventSink]bool
 	setupOnce  sync.Once
 	mtx        sync.Mutex
+	shutdown   chan error
 }
 
 func (ps *pgEventSource) setup() {
+	ps.shutdown = make(chan error, 1)
 	ps.sinks = make(map[*pgEventSink]bool)
-	go ps.run()
-}
 
-func (ps *pgEventSource) run() {
 	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
 		log.Fatalf("sql.Open: %s", err)
@@ -49,23 +50,27 @@ func (ps *pgEventSource) run() {
 	if err = db.Ping(); err != nil {
 		log.Fatalf("db.Ping: %s", err)
 	}
+	ps.db = db
 
-	listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
 		if err != nil {
 			// Until we have a mechanism for catching up
 			// on missed events, we cannot recover from a
 			// dropped connection without breaking our
 			// promises to clients.
-			log.Fatalf("pgEventSource listener problem: %s", err)
+			ps.shutdown <- fmt.Errorf("pgEventSource listener problem: %s", err)
 		}
 	})
-	err = listener.Listen("logs")
+	err = ps.pqListener.Listen("logs")
 	if err != nil {
 		log.Fatal(err)
 	}
-
 	debugLogf("pgEventSource listening")
 
+	go ps.run()
+}
+
+func (ps *pgEventSource) run() {
 	eventQueue := make(chan *event, ps.QueueSize)
 
 	go func() {
@@ -90,11 +95,18 @@ func (ps *pgEventSource) run() {
 	defer ticker.Stop()
 	for {
 		select {
+		case err, ok := <-ps.shutdown:
+			if ok {
+				debugLogf("shutdown on error: %s", err)
+			}
+			close(eventQueue)
+			return
+
 		case <-ticker.C:
 			debugLogf("pgEventSource listener ping")
-			listener.Ping()
+			ps.pqListener.Ping()
 
-		case pqEvent, ok := <-listener.Notify:
+		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
 				close(eventQueue)
 				return
@@ -112,7 +124,7 @@ func (ps *pgEventSource) run() {
 				LogID:    logID,
 				Received: time.Now(),
 				Serial:   serial,
-				db:       db,
+				db:       ps.db,
 			}
 			debugLogf("event %d %+v", e.Serial, e)
 			eventQueue <- e

commit 54aa46e52ff46fa667ec333e6bbaa00e0382f6f4
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 15:34:40 2016 -0500

    8460: Check permissions on event target instead of log entry.

diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 257e985..3b24a7f 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -58,7 +58,7 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 		return nil, nil
 	}
 
-	ok, err := sess.permChecker.Check(detail.UUID)
+	ok, err := sess.permChecker.Check(detail.ObjectUUID)
 	if err != nil || !ok {
 		return nil, err
 	}

commit 0f2a62815c97a47b610b76a748a7d092c89e3e7b
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 15:30:47 2016 -0500

    8460: Cache permissions.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index d42b137c..e2aa6ca 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -72,7 +72,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 		for e := range queue {
 			if e == nil {
 				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				_, err := ws.Write([]byte("{}\n"))
+				_, err := ws.Write([]byte("{}"))
 				if err != nil {
 					sess.debugLogf("handler: write {}: %s", err)
 					stop <- err
diff --git a/services/ws/permission.go b/services/ws/permission.go
new file mode 100644
index 0000000..b2b962c
--- /dev/null
+++ b/services/ws/permission.go
@@ -0,0 +1,78 @@
+package main
+
+import (
+	"net/http"
+	"net/url"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const (
+	maxPermCacheAge = time.Hour
+	minPermCacheAge = 5 * time.Minute
+)
+
+type permChecker interface {
+	SetToken(token string)
+	Check(uuid string) (bool, error)
+}
+
+func NewPermChecker(ac arvados.Client) permChecker {
+	ac.AuthToken = ""
+	return &cachingPermChecker{
+		Client:     &ac,
+		cache:      make(map[string]time.Time),
+		maxCurrent: 16,
+	}
+}
+
+type cachingPermChecker struct {
+	*arvados.Client
+	cache      map[string]time.Time
+	maxCurrent int
+}
+
+func (pc *cachingPermChecker) SetToken(token string) {
+	pc.Client.AuthToken = token
+}
+
+func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
+	pc.tidy()
+	if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge {
+		debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid)
+		return true, nil
+	}
+	var buf map[string]interface{}
+	path, err := pc.PathForUUID("get", uuid)
+	if err != nil {
+		return false, err
+	}
+	err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+		"select": {`["uuid"]`},
+	})
+	if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
+		debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+		return false, nil
+	}
+	if err != nil {
+		debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid)
+		return false, err
+	}
+	debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid)
+	pc.cache[uuid] = time.Now()
+	return true, nil
+}
+
+func (pc *cachingPermChecker) tidy() {
+	if len(pc.cache) <= pc.maxCurrent*2 {
+		return
+	}
+	tooOld := time.Now().Add(-minPermCacheAge)
+	for uuid, t := range pc.cache {
+		if t.Before(tooOld) {
+			delete(pc.cache, uuid)
+		}
+	}
+	pc.maxCurrent = len(pc.cache)
+}
diff --git a/services/ws/proxy_client.go b/services/ws/proxy_client.go
deleted file mode 100644
index 28be2e2..0000000
--- a/services/ws/proxy_client.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package main
-
-import (
-	"net/http"
-	"net/url"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-type proxyClient struct {
-	*arvados.Client
-}
-
-func NewProxyClient(ac arvados.Client) *proxyClient {
-	ac.AuthToken = ""
-	return &proxyClient{
-		Client: &ac,
-	}
-}
-
-func (pc *proxyClient) SetToken(token string) {
-	pc.Client.AuthToken = token
-}
-
-func (pc *proxyClient) CheckReadPermission(uuid string) (bool, error) {
-	var buf map[string]interface{}
-	path, err := pc.PathForUUID("get", uuid)
-	if err != nil {
-		return false, err
-	}
-	err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
-		"select": {`["uuid"]`},
-	})
-	if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
-		return false, nil
-	}
-	if err != nil {
-		return false, err
-	}
-	return true, nil
-}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 15efc1d..257e985 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -16,7 +16,7 @@ var (
 
 type sessionV0 struct {
 	ws          wsConn
-	proxyClient *proxyClient
+	permChecker permChecker
 	subscribed  map[string]bool
 	mtx         sync.Mutex
 	setupOnce   sync.Once
@@ -25,7 +25,7 @@ type sessionV0 struct {
 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
 	sess := &sessionV0{
 		ws:          ws,
-		proxyClient: NewProxyClient(ac),
+		permChecker: NewPermChecker(ac),
 		subscribed:  make(map[string]bool),
 	}
 
@@ -35,8 +35,8 @@ func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
 		return nil, err
 	}
 	token := ws.Request().Form.Get("api_token")
-	sess.proxyClient.SetToken(token)
-	sess.debugLogf("handlerV0: token = %+q", token)
+	sess.permChecker.SetToken(token)
+	sess.debugLogf("token = %+q", token)
 
 	return sess, nil
 }
@@ -57,7 +57,8 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 	if detail == nil {
 		return nil, nil
 	}
-	ok, err := sess.proxyClient.CheckReadPermission(detail.UUID)
+
+	ok, err := sess.permChecker.Check(detail.UUID)
 	if err != nil || !ok {
 		return nil, err
 	}

commit 83f085a0c405695d9840a0542eb9746e7b5d3f6f
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 14:50:14 2016 -0500

    8460: Refactor session logic (subscription protocol) out of handler (queueing and delivery).

diff --git a/services/ws/handler.go b/services/ws/handler.go
index fd5c7f8..d42b137c 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -1,14 +1,14 @@
 package main
 
 import (
+	"encoding/json"
 	"io"
+	"log"
 	"net/http"
 	"time"
-)
 
-type handler interface {
-	Handle(wsConn, <-chan *event)
-}
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
 
 type wsConn interface {
 	io.ReadWriter
@@ -16,3 +16,141 @@ type wsConn interface {
 	SetReadDeadline(time.Time) error
 	SetWriteDeadline(time.Time) error
 }
+
+type handler struct {
+	Client      arvados.Client
+	PingTimeout time.Duration
+	QueueSize   int
+	NewSession  func(wsConn, arvados.Client) (session, error)
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) {
+	sess, err := h.NewSession(ws, h.Client)
+	if err != nil {
+		log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
+		return
+	}
+
+	queue := make(chan *event, h.QueueSize)
+
+	stopped := make(chan struct{})
+	stop := make(chan error, 5)
+
+	go func() {
+		buf := make([]byte, 2<<20)
+		for {
+			select {
+			case <-stopped:
+				return
+			default:
+			}
+			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
+			n, err := ws.Read(buf)
+			sess.debugLogf("received frame: %q", buf[:n])
+			if err == nil && n == len(buf) {
+				err = errFrameTooBig
+			}
+			if err != nil {
+				if err != io.EOF {
+					sess.debugLogf("handler: read: %s", err)
+				}
+				stop <- err
+				return
+			}
+			msg := make(map[string]interface{})
+			err = json.Unmarshal(buf[:n], &msg)
+			if err != nil {
+				sess.debugLogf("handler: unmarshal: %s", err)
+				stop <- err
+				return
+			}
+			sess.Receive(msg)
+		}
+	}()
+
+	go func() {
+		for e := range queue {
+			if e == nil {
+				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+				_, err := ws.Write([]byte("{}\n"))
+				if err != nil {
+					sess.debugLogf("handler: write {}: %s", err)
+					stop <- err
+					break
+				}
+				continue
+			}
+
+			buf, err := sess.EventMessage(e)
+			if err != nil {
+				sess.debugLogf("EventMessage %d: err %s", err)
+				stop <- err
+				break
+			} else if len(buf) == 0 {
+				sess.debugLogf("EventMessage %d: skip", e.Serial)
+				continue
+			}
+
+			sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
+			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+			_, err = ws.Write(buf)
+			if err != nil {
+				sess.debugLogf("handler: write: %s", err)
+				stop <- err
+				break
+			}
+			sess.debugLogf("handler: sent event %d", e.Serial)
+		}
+		for _ = range queue {
+		}
+	}()
+
+	// Filter incoming events against the current subscription
+	// list, and forward matching events to the outgoing message
+	// queue. Close the queue and return when the "stopped"
+	// channel closes or the incoming event stream ends. Shut down
+	// the handler if the outgoing queue fills up.
+	go func() {
+		send := func(e *event) {
+			select {
+			case queue <- e:
+			default:
+				stop <- errQueueFull
+			}
+		}
+
+		ticker := time.NewTicker(h.PingTimeout)
+		defer ticker.Stop()
+
+		for {
+			var e *event
+			var ok bool
+			select {
+			case <-stopped:
+				close(queue)
+				return
+			case <-ticker.C:
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
+			case e, ok = <-events:
+				if !ok {
+					close(queue)
+					return
+				}
+			}
+			if sess.Filter(e) {
+				send(e)
+			}
+		}
+	}()
+
+	<-stop
+	close(stopped)
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
deleted file mode 100644
index 3f9a211..0000000
--- a/services/ws/handler_v0.go
+++ /dev/null
@@ -1,195 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"errors"
-	"io"
-	"log"
-	"sync"
-	"time"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-var (
-	errQueueFull   = errors.New("client queue full")
-	errFrameTooBig = errors.New("frame too big")
-)
-
-type handlerV0 struct {
-	Client      arvados.Client
-	PingTimeout time.Duration
-	QueueSize   int
-}
-
-func (h *handlerV0) debugLogf(ws wsConn, s string, args ...interface{}) {
-	args = append([]interface{}{ws.Request().RemoteAddr}, args...)
-	debugLogf("%s "+s, args...)
-}
-
-func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
-	queue := make(chan *event, h.QueueSize)
-	mtx := sync.Mutex{}
-	subscribed := make(map[string]bool)
-
-	proxyClient := NewProxyClient(h.Client)
-	{
-		err := ws.Request().ParseForm()
-		if err != nil {
-			log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
-			return
-		}
-		token := ws.Request().Form.Get("api_token")
-		h.debugLogf(ws, "handlerV0: token = %+q", token)
-		proxyClient.SetToken(token)
-	}
-
-	stopped := make(chan struct{})
-	stop := make(chan error, 5)
-
-	go func() {
-		buf := make([]byte, 2<<20)
-		for {
-			select {
-			case <-stopped:
-				return
-			default:
-			}
-			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
-			n, err := ws.Read(buf)
-			h.debugLogf(ws, "received frame: %q", buf[:n])
-			if err == nil && n == len(buf) {
-				err = errFrameTooBig
-			}
-			if err != nil {
-				if err != io.EOF {
-					h.debugLogf(ws, "handlerV0: read: %s", err)
-				}
-				stop <- err
-				return
-			}
-			msg := make(map[string]interface{})
-			err = json.Unmarshal(buf[:n], &msg)
-			if err != nil {
-				h.debugLogf(ws, "handlerV0: unmarshal: %s", err)
-				stop <- err
-				return
-			}
-			h.debugLogf(ws, "received message: %+v", msg)
-			h.debugLogf(ws, "subscribing to *")
-			subscribed["*"] = true
-		}
-	}()
-
-	go func() {
-		for e := range queue {
-			if e == nil {
-				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				_, err := ws.Write([]byte("{}\n"))
-				if err != nil {
-					h.debugLogf(ws, "handlerV0: write: %s", err)
-					stop <- err
-					break
-				}
-				continue
-			}
-			detail := e.Detail()
-			if detail == nil {
-				continue
-			}
-
-			ok, err := proxyClient.CheckReadPermission(detail.UUID)
-			if err != nil {
-				log.Printf("CheckReadPermission: %s", err)
-				stop <- err
-				break
-			}
-			if !ok {
-				h.debugLogf(ws, "handlerV0: skip event %d", e.Serial)
-				continue
-			}
-
-			buf, err := json.Marshal(map[string]interface{}{
-				"msgID":             e.Serial,
-				"id":                detail.ID,
-				"uuid":              detail.UUID,
-				"object_uuid":       detail.ObjectUUID,
-				"object_owner_uuid": detail.ObjectOwnerUUID,
-				"event_type":        detail.EventType,
-			})
-			if err != nil {
-				log.Printf("error encoding: ", err)
-				continue
-			}
-			h.debugLogf(ws, "handlerV0: send event %d: %q", e.Serial, buf)
-			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-			_, err = ws.Write(append(buf, byte('\n')))
-			if err != nil {
-				h.debugLogf(ws, "handlerV0: write: %s", err)
-				stop <- err
-				break
-			}
-			h.debugLogf(ws, "handlerV0: sent event %d", e.Serial)
-		}
-		for _ = range queue {
-		}
-	}()
-
-	// Filter incoming events against the current subscription
-	// list, and forward matching events to the outgoing message
-	// queue. Close the queue and return when the "stopped"
-	// channel closes or the incoming event stream ends. Shut down
-	// the handler if the outgoing queue fills up.
-	go func() {
-		send := func(e *event) {
-			select {
-			case queue <- e:
-			default:
-				stop <- errQueueFull
-			}
-		}
-
-		ticker := time.NewTicker(h.PingTimeout)
-		defer ticker.Stop()
-
-		for {
-			var e *event
-			var ok bool
-			select {
-			case <-stopped:
-				close(queue)
-				return
-			case <-ticker.C:
-				// If the outgoing queue is empty,
-				// send an empty message. This can
-				// help detect a disconnected network
-				// socket, and prevent an idle socket
-				// from being closed.
-				if len(queue) == 0 {
-					queue <- nil
-				}
-				continue
-			case e, ok = <-events:
-				if !ok {
-					close(queue)
-					return
-				}
-			}
-			detail := e.Detail()
-			mtx.Lock()
-			switch {
-			case subscribed["*"]:
-				send(e)
-			case detail == nil:
-			case subscribed[detail.ObjectUUID]:
-				send(e)
-			case subscribed[detail.ObjectOwnerUUID]:
-				send(e)
-			}
-			mtx.Unlock()
-		}
-	}()
-
-	<-stop
-	close(stopped)
-}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
deleted file mode 100644
index 1b8549e..0000000
--- a/services/ws/handler_v1.go
+++ /dev/null
@@ -1,16 +0,0 @@
-package main
-
-import (
-	"time"
-
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-type handlerV1 struct {
-	Client      arvados.Client
-	PingTimeout time.Duration
-	QueueSize   int
-}
-
-func (h *handlerV1) Handle(ws wsConn, events <-chan *event) {
-}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 8d5c604..e766f6c 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -28,7 +28,7 @@ func (c pgConfig) ConnectionString() string {
 
 type pgEventSource struct {
 	DataSource string
-	QueueSize int
+	QueueSize  int
 
 	pqListener *pq.Listener
 	sinks      map[*pgEventSink]bool
diff --git a/services/ws/router.go b/services/ws/router.go
index 30f93ea..e829ce8 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -7,6 +7,7 @@ import (
 	"net/http"
 	"sync"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"golang.org/x/net/websocket"
 )
 
@@ -20,19 +21,17 @@ type router struct {
 
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
-	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
-		Client:      rtr.Config.Client,
-		PingTimeout: rtr.Config.PingTimeout.Duration(),
-		QueueSize:   rtr.Config.ClientEventQueue,
-	}))
-	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+	rtr.mux.Handle("/websocket", rtr.makeServer(NewSessionV0))
+	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
+}
+
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+	handler := &handler{
 		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
-	}))
-}
-
-func (rtr *router) makeServer(handler handler) *websocket.Server {
+		NewSession:  newSession,
+	}
 	return &websocket.Server{
 		Handshake: func(c *websocket.Config, r *http.Request) error {
 			return nil
diff --git a/services/ws/session.go b/services/ws/session.go
new file mode 100644
index 0000000..db437b5
--- /dev/null
+++ b/services/ws/session.go
@@ -0,0 +1,8 @@
+package main
+
+type session interface {
+	Receive(map[string]interface{})
+	EventMessage(*event) ([]byte, error)
+	Filter(*event) bool
+	debugLogf(string, ...interface{})
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
new file mode 100644
index 0000000..15efc1d
--- /dev/null
+++ b/services/ws/session_v0.go
@@ -0,0 +1,91 @@
+package main
+
+import (
+	"encoding/json"
+	"errors"
+	"log"
+	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+	errQueueFull   = errors.New("client queue full")
+	errFrameTooBig = errors.New("frame too big")
+)
+
+type sessionV0 struct {
+	ws          wsConn
+	proxyClient *proxyClient
+	subscribed  map[string]bool
+	mtx         sync.Mutex
+	setupOnce   sync.Once
+}
+
+func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+	sess := &sessionV0{
+		ws:          ws,
+		proxyClient: NewProxyClient(ac),
+		subscribed:  make(map[string]bool),
+	}
+
+	err := ws.Request().ParseForm()
+	if err != nil {
+		log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+		return nil, err
+	}
+	token := ws.Request().Form.Get("api_token")
+	sess.proxyClient.SetToken(token)
+	sess.debugLogf("handlerV0: token = %+q", token)
+
+	return sess, nil
+}
+
+func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+	args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
+	debugLogf("%s "+s, args...)
+}
+
+func (sess *sessionV0) Receive(msg map[string]interface{}) {
+	sess.debugLogf("received message: %+v", msg)
+	sess.debugLogf("subscribing to *")
+	sess.subscribed["*"] = true
+}
+
+func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+	detail := e.Detail()
+	if detail == nil {
+		return nil, nil
+	}
+	ok, err := sess.proxyClient.CheckReadPermission(detail.UUID)
+	if err != nil || !ok {
+		return nil, err
+	}
+
+	return json.Marshal(map[string]interface{}{
+		"msgID":             e.Serial,
+		"id":                detail.ID,
+		"uuid":              detail.UUID,
+		"object_uuid":       detail.ObjectUUID,
+		"object_owner_uuid": detail.ObjectOwnerUUID,
+		"event_type":        detail.EventType,
+	})
+}
+
+func (sess *sessionV0) Filter(e *event) bool {
+	detail := e.Detail()
+	sess.mtx.Lock()
+	defer sess.mtx.Unlock()
+	switch {
+	case sess.subscribed["*"]:
+		return true
+	case detail == nil:
+		return false
+	case sess.subscribed[detail.ObjectUUID]:
+		return true
+	case sess.subscribed[detail.ObjectOwnerUUID]:
+		return true
+	default:
+		return false
+	}
+}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
new file mode 100644
index 0000000..bc09ed0
--- /dev/null
+++ b/services/ws/session_v1.go
@@ -0,0 +1,11 @@
+package main
+
+import (
+	"errors"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+	return nil, errors.New("Not implemented")
+}

commit f1c316086c32a0c19cfbf4ac7bb122d3e8cd049b
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 13:47:48 2016 -0500

    8460: Drop unnecessary read timeout handling.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index ba8f945..fd5c7f8 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -16,7 +16,3 @@ type wsConn interface {
 	SetReadDeadline(time.Time) error
 	SetWriteDeadline(time.Time) error
 }
-
-type timeouter interface {
-	Timeout() bool
-}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index eb076b5..3f9a211 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -61,9 +61,6 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if err == nil && n == len(buf) {
 				err = errFrameTooBig
 			}
-			if err, ok := err.(timeouter); ok && err.Timeout() {
-				continue
-			}
 			if err != nil {
 				if err != io.EOF {
 					h.debugLogf(ws, "handlerV0: read: %s", err)

commit 0040a5a5b2e82f20641278584ecae835c1c537d3
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 13:47:32 2016 -0500

    8460: Pass datasource in Go style.

diff --git a/services/ws/main.go b/services/ws/main.go
index 2866244..c0f4dd5 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -36,8 +36,8 @@ func main() {
 	}
 
 	eventSource := &pgEventSource{
-		PgConfig:  cfg.Postgres,
-		QueueSize: cfg.ServerEventQueue,
+		DataSource: cfg.Postgres.ConnectionString(),
+		QueueSize:  cfg.ServerEventQueue,
 	}
 	srv := &http.Server{
 		Addr:           cfg.Listen,
diff --git a/services/ws/pg.go b/services/ws/pg.go
index f89fd07..8d5c604 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -27,7 +27,7 @@ func (c pgConfig) ConnectionString() string {
 }
 
 type pgEventSource struct {
-	PgConfig  pgConfig
+	DataSource string
 	QueueSize int
 
 	pqListener *pq.Listener
@@ -42,12 +42,15 @@ func (ps *pgEventSource) setup() {
 }
 
 func (ps *pgEventSource) run() {
-	db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
+	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
-		log.Fatal(err)
+		log.Fatalf("sql.Open: %s", err)
+	}
+	if err = db.Ping(); err != nil {
+		log.Fatalf("db.Ping: %s", err)
 	}
 
-	listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+	listener := pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
 		if err != nil {
 			// Until we have a mechanism for catching up
 			// on missed events, we cannot recover from a

commit da44904bb64303db2ae01aa9978188c20d453445
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:45:26 2016 -0500

    8460: Combine ping and notify goroutines.

diff --git a/services/ws/pg.go b/services/ws/pg.go
index 5e8e63e..f89fd07 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -60,15 +60,11 @@ func (ps *pgEventSource) run() {
 	if err != nil {
 		log.Fatal(err)
 	}
+
 	debugLogf("pgEventSource listening")
-	go func() {
-		for _ = range time.NewTicker(time.Minute).C {
-			debugLogf("pgEventSource listener ping")
-			listener.Ping()
-		}
-	}()
 
 	eventQueue := make(chan *event, ps.QueueSize)
+
 	go func() {
 		for e := range eventQueue {
 			// Wait for the "select ... from logs" call to
@@ -87,25 +83,38 @@ func (ps *pgEventSource) run() {
 	}()
 
 	var serial uint64
-	for pqEvent := range listener.Notify {
-		if pqEvent.Channel != "logs" {
-			continue
-		}
-		logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
-		if err != nil {
-			log.Printf("bad notify payload: %+v", pqEvent)
-			continue
-		}
-		serial++
-		e := &event{
-			LogID:    logID,
-			Received: time.Now(),
-			Serial:   serial,
-			db:       db,
+	ticker := time.NewTicker(time.Minute)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ticker.C:
+			debugLogf("pgEventSource listener ping")
+			listener.Ping()
+
+		case pqEvent, ok := <-listener.Notify:
+			if !ok {
+				close(eventQueue)
+				return
+			}
+			if pqEvent.Channel != "logs" {
+				continue
+			}
+			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+			if err != nil {
+				log.Printf("bad notify payload: %+v", pqEvent)
+				continue
+			}
+			serial++
+			e := &event{
+				LogID:    logID,
+				Received: time.Now(),
+				Serial:   serial,
+				db:       db,
+			}
+			debugLogf("event %d %+v", e.Serial, e)
+			eventQueue <- e
+			go e.Detail()
 		}
-		debugLogf("event %d %+v", e.Serial, e)
-		eventQueue <- e
-		go e.Detail()
 	}
 }
 

commit f517492fccb7cec138621cd0e710668877be57ce
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:38:46 2016 -0500

    8460: Add Log type.

diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
new file mode 100644
index 0000000..caea04c
--- /dev/null
+++ b/sdk/go/arvados/log.go
@@ -0,0 +1,16 @@
+package arvados
+
+import (
+	"time"
+)
+
+// Log is an arvados#log record
+type Log struct {
+	ID              uint64                 `json:"id"`
+	UUID            string                 `json:"uuid"`
+	ObjectUUID      string                 `json:"object_uuid"`
+	ObjectOwnerUUID string                 `json:"object_owner_uuid"`
+	EventType       string                 `json:"event_type"`
+	Properties      map[string]interface{} `json:"properties"`
+	CreatedAt       *time.Time             `json:"created_at,omitempty"`
+}

commit 104b2b5a46844e94a37c332b4ddd5a861dd8d63d
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 14 10:38:14 2016 -0500

    8460: Check permissions.

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 36f4eb5..0c18d38 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -41,6 +41,8 @@ type Client struct {
 	// callers who use a Client to initialize an
 	// arvadosclient.ArvadosClient.)
 	KeepServiceURIs []string `json:",omitempty"`
+
+	dd *DiscoveryDocument
 }
 
 // The default http.Client used by a Client with Insecure==true and
@@ -198,14 +200,83 @@ func (c *Client) apiURL(path string) string {
 
 // DiscoveryDocument is the Arvados server's description of itself.
 type DiscoveryDocument struct {
-	DefaultCollectionReplication int   `json:"defaultCollectionReplication"`
-	BlobSignatureTTL             int64 `json:"blobSignatureTtl"`
+	BasePath                     string              `json:"basePath"`
+	DefaultCollectionReplication int                 `json:"defaultCollectionReplication"`
+	BlobSignatureTTL             int64               `json:"blobSignatureTtl"`
+	Schemas                      map[string]Schema   `json:"schemas"`
+	Resources                    map[string]Resource `json:"resources"`
+}
+
+type Resource struct {
+	Methods map[string]ResourceMethod `json:"methods"`
+}
+
+type ResourceMethod struct {
+	HTTPMethod string         `json:"httpMethod"`
+	Path       string         `json:"path"`
+	Response   MethodResponse `json:"response"`
+}
+
+type MethodResponse struct {
+	Ref string `json:"$ref"`
+}
+
+type Schema struct {
+	UUIDPrefix string `json:"uuidPrefix"`
 }
 
 // DiscoveryDocument returns a *DiscoveryDocument. The returned object
 // should not be modified: the same object may be returned by
 // subsequent calls.
 func (c *Client) DiscoveryDocument() (*DiscoveryDocument, error) {
+	if c.dd != nil {
+		return c.dd, nil
+	}
 	var dd DiscoveryDocument
-	return &dd, c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+	err := c.RequestAndDecode(&dd, "GET", "discovery/v1/apis/arvados/v1/rest", nil, nil)
+	if err != nil {
+		return nil, err
+	}
+	c.dd = &dd
+	return c.dd, nil
+}
+
+func (c *Client) PathForUUID(method, uuid string) (string, error) {
+	if len(uuid) != 27 {
+		return "", fmt.Errorf("invalid UUID: %q", uuid)
+	}
+	dd, err := c.DiscoveryDocument()
+	if err != nil {
+		return "", err
+	}
+	infix := uuid[6:11]
+	var model string
+	for m, s := range dd.Schemas {
+		if s.UUIDPrefix == infix {
+			model = m
+			break
+		}
+	}
+	if model == "" {
+		return "", fmt.Errorf("unrecognized UUID infix: %q", infix)
+	}
+	var resource string
+	for r, rsc := range dd.Resources {
+		if rsc.Methods["get"].Response.Ref == model {
+			resource = r
+			break
+		}
+	}
+	if resource == "" {
+		return "", fmt.Errorf("no resource for model: %q", model)
+	}
+	m, ok := dd.Resources[resource].Methods[method]
+	if !ok {
+		return "", fmt.Errorf("no method %q for resource %q", method, resource)
+	}
+	path := dd.BasePath + strings.Replace(m.Path, "{uuid}", uuid, -1)
+	if path[0] == '/' {
+		path = path[1:]
+	}
+	return path, nil
 }
diff --git a/services/ws/config.go b/services/ws/config.go
index 3e3d91f..9c2e80a 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -23,12 +23,12 @@ func DefaultConfig() Config {
 			APIHost: "localhost:443",
 		},
 		Postgres: pgConfig{
-			"dbname":          "arvados_test",
+			"dbname":          "arvados_production",
 			"user":            "arvados",
 			"password":        "xyzzy",
 			"host":            "localhost",
 			"connect_timeout": "30",
-			"sslmode":         "disable",
+			"sslmode":         "require",
 		},
 		PingTimeout:      arvados.Duration(time.Minute),
 		ClientEventQueue: 64,
diff --git a/services/ws/event.go b/services/ws/event.go
index b6dda49..e34b6b4 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -7,6 +7,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/ghodss/yaml"
 )
 
 type eventSink interface {
@@ -15,11 +16,11 @@ type eventSink interface {
 }
 
 type eventSource interface {
-	NewSink(chan *event) eventSink
+	NewSink() eventSink
 }
 
 type event struct {
-	LogUUID  string
+	LogID    uint64
 	Received time.Time
 	Serial   uint64
 
@@ -39,18 +40,24 @@ func (e *event) Detail() *arvados.Log {
 		return e.logRow
 	}
 	var logRow arvados.Log
-	var oldAttrs, newAttrs []byte
-	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+	var propYAML []byte
+	e.err = e.db.QueryRow(`SELECT id, uuid, object_uuid, object_owner_uuid, event_type, created_at, properties FROM logs WHERE id = $1`, e.LogID).Scan(
 		&logRow.ID,
 		&logRow.UUID,
 		&logRow.ObjectUUID,
 		&logRow.ObjectOwnerUUID,
 		&logRow.EventType,
 		&logRow.CreatedAt,
-		&oldAttrs,
-		&newAttrs)
+		&propYAML)
 	if e.err != nil {
-		log.Printf("retrieving log row %s: %s", e.LogUUID, e.err)
+		log.Printf("retrieving log row %d: %s", e.LogID, e.err)
+		return nil
 	}
+	e.err = yaml.Unmarshal(propYAML, &logRow.Properties)
+	if e.err != nil {
+		log.Printf("decoding yaml for log row %d: %s", e.LogID, e.err)
+		return nil
+	}
+	e.logRow = &logRow
 	return e.logRow
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
index fe47a62..ba8f945 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -14,6 +14,7 @@ type wsConn interface {
 	io.ReadWriter
 	Request() *http.Request
 	SetReadDeadline(time.Time) error
+	SetWriteDeadline(time.Time) error
 }
 
 type timeouter interface {
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index c728d12..eb076b5 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -7,6 +7,8 @@ import (
 	"log"
 	"sync"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 var (
@@ -15,6 +17,7 @@ var (
 )
 
 type handlerV0 struct {
+	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
 }
@@ -29,6 +32,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	mtx := sync.Mutex{}
 	subscribed := make(map[string]bool)
 
+	proxyClient := NewProxyClient(h.Client)
+	{
+		err := ws.Request().ParseForm()
+		if err != nil {
+			log.Printf("%s ParseForm: %s", ws.Request().RemoteAddr, err)
+			return
+		}
+		token := ws.Request().Form.Get("api_token")
+		h.debugLogf(ws, "handlerV0: token = %+q", token)
+		proxyClient.SetToken(token)
+	}
+
 	stopped := make(chan struct{})
 	stop := make(chan error, 5)
 
@@ -40,21 +55,13 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 				return
 			default:
 			}
-			ws.SetReadDeadline(time.Now().Add(h.PingTimeout))
+			ws.SetReadDeadline(time.Now().Add(24 * 365 * time.Hour))
 			n, err := ws.Read(buf)
 			h.debugLogf(ws, "received frame: %q", buf[:n])
 			if err == nil && n == len(buf) {
 				err = errFrameTooBig
 			}
 			if err, ok := err.(timeouter); ok && err.Timeout() {
-				// If the outgoing queue is empty,
-				// send an empty message. This can
-				// help detect a disconnected network
-				// socket, and prevent an idle socket
-				// from being closed.
-				if len(queue) == 0 {
-					queue <- nil
-				}
 				continue
 			}
 			if err != nil {
@@ -80,6 +87,7 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 	go func() {
 		for e := range queue {
 			if e == nil {
+				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
 				_, err := ws.Write([]byte("{}\n"))
 				if err != nil {
 					h.debugLogf(ws, "handlerV0: write: %s", err)
@@ -92,7 +100,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			if detail == nil {
 				continue
 			}
-			// FIXME: check permission
+
+			ok, err := proxyClient.CheckReadPermission(detail.UUID)
+			if err != nil {
+				log.Printf("CheckReadPermission: %s", err)
+				stop <- err
+				break
+			}
+			if !ok {
+				h.debugLogf(ws, "handlerV0: skip event %d", e.Serial)
+				continue
+			}
+
 			buf, err := json.Marshal(map[string]interface{}{
 				"msgID":             e.Serial,
 				"id":                detail.ID,
@@ -105,14 +124,18 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 				log.Printf("error encoding: ", err)
 				continue
 			}
+			h.debugLogf(ws, "handlerV0: send event %d: %q", e.Serial, buf)
+			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
 			_, err = ws.Write(append(buf, byte('\n')))
 			if err != nil {
 				h.debugLogf(ws, "handlerV0: write: %s", err)
 				stop <- err
 				break
 			}
+			h.debugLogf(ws, "handlerV0: sent event %d", e.Serial)
+		}
+		for _ = range queue {
 		}
-		for _ = range queue {}
 	}()
 
 	// Filter incoming events against the current subscription
@@ -129,6 +152,9 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			}
 		}
 
+		ticker := time.NewTicker(h.PingTimeout)
+		defer ticker.Stop()
+
 		for {
 			var e *event
 			var ok bool
@@ -136,6 +162,16 @@ func (h *handlerV0) Handle(ws wsConn, events <-chan *event) {
 			case <-stopped:
 				close(queue)
 				return
+			case <-ticker.C:
+				// If the outgoing queue is empty,
+				// send an empty message. This can
+				// help detect a disconnected network
+				// socket, and prevent an idle socket
+				// from being closed.
+				if len(queue) == 0 {
+					queue <- nil
+				}
+				continue
 			case e, ok = <-events:
 				if !ok {
 					close(queue)
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index 4160d86..1b8549e 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -2,9 +2,12 @@ package main
 
 import (
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 type handlerV1 struct {
+	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index 0f97823..2866244 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -35,18 +35,20 @@ func main() {
 		return
 	}
 
+	eventSource := &pgEventSource{
+		PgConfig:  cfg.Postgres,
+		QueueSize: cfg.ServerEventQueue,
+	}
 	srv := &http.Server{
 		Addr:           cfg.Listen,
 		ReadTimeout:    time.Minute,
 		WriteTimeout:   time.Minute,
 		MaxHeaderBytes: 1 << 20,
 		Handler: &router{
-			Config: &cfg,
-			eventSource: &pgEventSource{
-				PgConfig:  cfg.Postgres,
-				QueueSize: cfg.ServerEventQueue,
-			},
+			Config:      &cfg,
+			eventSource: eventSource,
 		},
 	}
+	eventSource.NewSink().Stop()
 	log.Fatal(srv.ListenAndServe())
 }
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 51bc92c..5e8e63e 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -3,6 +3,7 @@ package main
 import (
 	"database/sql"
 	"log"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -52,15 +53,17 @@ func (ps *pgEventSource) run() {
 			// on missed events, we cannot recover from a
 			// dropped connection without breaking our
 			// promises to clients.
-			log.Fatal(err)
+			log.Fatalf("pgEventSource listener problem: %s", err)
 		}
 	})
 	err = listener.Listen("logs")
 	if err != nil {
 		log.Fatal(err)
 	}
+	debugLogf("pgEventSource listening")
 	go func() {
 		for _ = range time.NewTicker(time.Minute).C {
+			debugLogf("pgEventSource listener ping")
 			listener.Ping()
 		}
 	}()
@@ -74,7 +77,7 @@ func (ps *pgEventSource) run() {
 			// concurrent queries would be bounded by
 			// client_count X client_queue_size.
 			e.Detail()
-			debugLogf("%+v", e)
+			debugLogf("event %d detail %+v", e.Serial, e.Detail())
 			ps.mtx.Lock()
 			for sink := range ps.sinks {
 				sink.channel <- e
@@ -88,33 +91,35 @@ func (ps *pgEventSource) run() {
 		if pqEvent.Channel != "logs" {
 			continue
 		}
+		logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
+		if err != nil {
+			log.Printf("bad notify payload: %+v", pqEvent)
+			continue
+		}
 		serial++
 		e := &event{
-			LogUUID:  pqEvent.Extra,
+			LogID:    logID,
 			Received: time.Now(),
 			Serial:   serial,
 			db:       db,
 		}
-		debugLogf("%+v", e)
+		debugLogf("event %d %+v", e.Serial, e)
 		eventQueue <- e
 		go e.Detail()
 	}
 }
 
-// NewSink subscribes to the event source. If c is not nil, it will be
-// used as the event channel. Otherwise, a new channel will be
-// created. Either way, the sink channel will be returned by the
-// Channel() method of the returned eventSink. All subsequent events
-// will be sent to the sink channel. The caller must ensure events are
-// received from the sink channel as quickly as possible: when one
-// sink blocks, all other sinks also block.
-func (ps *pgEventSource) NewSink(c chan *event) eventSink {
+// NewSink subscribes to the event source. NewSink returns an
+// eventSink, whose Channel() method returns a channel: a pointer to
+// each subsequent event will be sent to that channel.
+//
+// The caller must ensure events are received from the sink channel as
+// quickly as possible because when one sink stops being ready, all
+// other sinks block.
+func (ps *pgEventSource) NewSink() eventSink {
 	ps.setupOnce.Do(ps.setup)
-	if c == nil {
-		c = make(chan *event, 1)
-	}
 	sink := &pgEventSink{
-		channel: c,
+		channel: make(chan *event, 1),
 		source:  ps,
 	}
 	ps.mtx.Lock()
diff --git a/services/ws/proxy_client.go b/services/ws/proxy_client.go
new file mode 100644
index 0000000..28be2e2
--- /dev/null
+++ b/services/ws/proxy_client.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+	"net/http"
+	"net/url"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type proxyClient struct {
+	*arvados.Client
+}
+
+func NewProxyClient(ac arvados.Client) *proxyClient {
+	ac.AuthToken = ""
+	return &proxyClient{
+		Client: &ac,
+	}
+}
+
+func (pc *proxyClient) SetToken(token string) {
+	pc.Client.AuthToken = token
+}
+
+func (pc *proxyClient) CheckReadPermission(uuid string) (bool, error) {
+	var buf map[string]interface{}
+	path, err := pc.PathForUUID("get", uuid)
+	if err != nil {
+		return false, err
+	}
+	err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
+		"select": {`["uuid"]`},
+	})
+	if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
+		return false, nil
+	}
+	if err != nil {
+		return false, err
+	}
+	return true, nil
+}
diff --git a/services/ws/router.go b/services/ws/router.go
index 685b613..30f93ea 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -21,10 +21,12 @@ type router struct {
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
 	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
+		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
 	}))
@@ -37,7 +39,7 @@ func (rtr *router) makeServer(handler handler) *websocket.Server {
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
 			log.Printf("%v accepted", ws.Request().RemoteAddr)
-			sink := rtr.eventSource.NewSink(nil)
+			sink := rtr.eventSource.NewSink()
 			handler.Handle(ws, sink.Channel())
 			sink.Stop()
 			ws.Close()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list