[ARVADOS] updated: f675fb2c202516021b961b5aa2de4528ba9f0d1f

Git user git at public.curoverse.com
Wed Nov 16 18:16:09 EST 2016


Summary of changes:
 .gitignore                                         |   3 +
 build/run-tests.sh                                 |   6 +-
 doc/_config.yml                                    |  83 +++--
 doc/_layouts/default.html.liquid                   |   2 +-
 doc/api/authentication.html.textile.liquid         |  40 ---
 doc/api/crunch-scripts.html.textile.liquid         |   5 +-
 doc/api/execution.html.textile.liquid              |  27 ++
 doc/api/index.html.textile.liquid                  |  44 +--
 doc/api/methods.html.textile.liquid                | 120 +++----
 .../api_client_authorizations.html.textile.liquid  |  54 +++-
 doc/api/methods/api_clients.html.textile.liquid    |  40 ++-
 .../methods/authorized_keys.html.textile.liquid    |  43 ++-
 doc/api/methods/collections.html.textile.liquid    |  55 +++-
 .../methods/container_requests.html.textile.liquid | 100 ++++--
 doc/api/methods/containers.html.textile.liquid     |  76 ++++-
 doc/api/methods/groups.html.textile.liquid         |  47 +--
 doc/api/methods/humans.html.textile.liquid         |  39 ++-
 doc/api/methods/job_tasks.html.textile.liquid      |  55 +++-
 doc/api/methods/jobs.html.textile.liquid           | 101 ++++--
 doc/api/methods/keep_disks.html.textile.liquid     |  52 ++-
 doc/api/methods/keep_services.html.textile.liquid  |  52 +--
 doc/api/methods/links.html.textile.liquid          |  69 ++--
 doc/api/methods/logs.html.textile.liquid           |  49 ++-
 doc/api/methods/nodes.html.textile.liquid          |  50 ++-
 .../methods/pipeline_instances.html.textile.liquid |  44 ++-
 .../methods/pipeline_templates.html.textile.liquid | 182 ++++++++++-
 doc/api/methods/repositories.html.textile.liquid   |  45 ++-
 doc/api/methods/specimens.html.textile.liquid      |  41 ++-
 doc/api/methods/traits.html.textile.liquid         |  40 ++-
 doc/api/methods/users.html.textile.liquid          |  67 ++--
 .../methods/virtual_machines.html.textile.liquid   |  52 +--
 doc/api/methods/workflows.html.textile.liquid      |  44 ++-
 doc/api/permission-model.html.textile.liquid       | 143 +++------
 doc/api/requests.html.textile.liquid               | 349 +++++++++++++++++++++
 doc/api/resources.html.textile.liquid              |  46 ++-
 doc/api/schema/ApiClient.html.textile.liquid       |  24 --
 .../ApiClientAuthorization.html.textile.liquid     |  29 --
 doc/api/schema/AuthorizedKey.html.textile.liquid   |  24 --
 doc/api/schema/Collection.html.textile.liquid      |  39 ---
 doc/api/schema/Container.html.textile.liquid       |  59 ----
 .../schema/ContainerRequest.html.textile.liquid    |  70 -----
 doc/api/schema/Group.html.textile.liquid           |  25 --
 doc/api/schema/Human.html.textile.liquid           |  19 --
 doc/api/schema/Job.html.textile.liquid             |  69 ----
 doc/api/schema/JobTask.html.textile.liquid         |  47 ---
 doc/api/schema/KeepDisk.html.textile.liquid        |  31 --
 doc/api/schema/KeepService.html.textile.liquid     |  24 --
 doc/api/schema/Link.html.textile.liquid            |  83 -----
 doc/api/schema/Log.html.textile.liquid             |  33 --
 doc/api/schema/Node.html.textile.liquid            |  28 --
 .../schema/PipelineInstance.html.textile.liquid    |  26 --
 .../schema/PipelineTemplate.html.textile.liquid    | 161 ----------
 doc/api/schema/Repository.html.textile.liquid      |  25 --
 doc/api/schema/Specimen.html.textile.liquid        |  22 --
 doc/api/schema/Trait.html.textile.liquid           |  22 --
 doc/api/schema/User.html.textile.liquid            |  30 --
 doc/api/schema/VirtualMachine.html.textile.liquid  |  21 --
 doc/api/schema/Workflow.html.textile.liquid        |  23 --
 doc/api/storage.html.textile.liquid                | 170 ++++++++++
 doc/api/tokens.html.textile.liquid                 |  63 ++++
 doc/images/Arvados_Permissions.svg                 |   4 +
 doc/images/Crunch_dispatch.svg                     |   4 +
 doc/images/Keep_manifests.svg                      |   4 +
 doc/images/Keep_reading_writing_block.svg          |   4 +
 doc/images/Keep_rendezvous_hashing.svg             |   4 +
 doc/images/Session_Establishment.svg               |   4 +
 .../install-prerequisites.html.textile.liquid      |   4 +-
 doc/sdk/cli/install.html.textile.liquid            |   1 -
 doc/sdk/go/example.html.textile.liquid             |  76 +++++
 doc/sdk/go/index.html.textile.liquid               |  15 +-
 doc/sdk/index.html.textile.liquid                  |  13 +-
 doc/sdk/java/example.html.textile.liquid           |  78 +++++
 doc/sdk/java/index.html.textile.liquid             |  11 +-
 doc/sdk/perl/example.html.textile.liquid           |  81 +++++
 doc/sdk/perl/index.html.textile.liquid             |  59 +---
 doc/sdk/python/events.html.textile.liquid          |   2 +-
 doc/sdk/python/example.html.textile.liquid         |  51 +++
 doc/sdk/python/sdk-python.html.textile.liquid      |   3 +-
 doc/sdk/ruby/example.html.textile.liquid           |  75 +++++
 doc/sdk/ruby/index.html.textile.liquid             |  74 +----
 doc/user/cwl/cwl-runner.html.textile.liquid        |   1 +
 .../reference/job-pipeline-ref.html.textile.liquid |   2 +-
 .../running-external-program.html.textile.liquid   |   4 +-
 .../tutorial-submit-job.html.textile.liquid        |   4 +-
 sdk/cwl/arvados_cwl/__init__.py                    |  23 +-
 sdk/cwl/arvados_cwl/arvcontainer.py                |   7 +-
 sdk/cwl/arvados_cwl/arvjob.py                      |   6 +-
 sdk/cwl/arvados_cwl/crunch_script.py               |   7 +-
 sdk/cwl/arvados_cwl/runner.py                      |   3 +-
 sdk/cwl/tests/test_container.py                    |  23 +-
 sdk/cwl/tests/test_make_output.py                  |   9 +-
 sdk/cwl/tests/test_submit.py                       |  88 ++++++
 sdk/go/arvados/container.go                        |  38 ++-
 sdk/go/arvados/log.go                              |   8 +
 sdk/python/tests/test_events.py                    |  29 +-
 services/api/app/models/container.rb               |  11 +-
 services/api/app/models/container_request.rb       |  20 +-
 ...43147_add_scheduling_parameters_to_container.rb |   6 +
 services/api/db/structure.sql                      |  10 +-
 services/api/test/unit/container_request_test.rb   |  32 ++
 services/arv-git-httpd/main.go                     |   2 +-
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |   6 +-
 .../crunch-dispatch-slurm_test.go                  |   2 +-
 services/keep-web/main.go                          |   2 +-
 services/keepproxy/keepproxy.go                    |   2 +-
 services/keepstore/keepstore.go                    |   2 +-
 services/ws/event.go                               |   1 +
 services/ws/handler.go                             |  35 ++-
 services/ws/log.go                                 |  41 +++
 services/ws/main.go                                |   6 +-
 services/ws/permission.go                          |  36 ++-
 services/ws/pg.go                                  |   5 +
 services/ws/router.go                              |  38 +--
 services/ws/session.go                             |  23 +-
 services/ws/session_v0.go                          | 285 ++++++++++++-----
 services/ws/session_v1.go                          |   3 +-
 tools/arvbox/lib/arvbox/docker/Dockerfile.base     |  48 ++-
 tools/arvbox/lib/arvbox/docker/common.sh           |   6 +-
 .../lib/arvbox/docker/service/ready/run-service    |   4 +-
 119 files changed, 2838 insertions(+), 2039 deletions(-)
 delete mode 100644 doc/api/authentication.html.textile.liquid
 create mode 100644 doc/api/execution.html.textile.liquid
 create mode 100644 doc/api/requests.html.textile.liquid
 delete mode 100644 doc/api/schema/ApiClient.html.textile.liquid
 delete mode 100644 doc/api/schema/ApiClientAuthorization.html.textile.liquid
 delete mode 100644 doc/api/schema/AuthorizedKey.html.textile.liquid
 delete mode 100644 doc/api/schema/Collection.html.textile.liquid
 delete mode 100644 doc/api/schema/Container.html.textile.liquid
 delete mode 100644 doc/api/schema/ContainerRequest.html.textile.liquid
 delete mode 100644 doc/api/schema/Group.html.textile.liquid
 delete mode 100644 doc/api/schema/Human.html.textile.liquid
 delete mode 100644 doc/api/schema/Job.html.textile.liquid
 delete mode 100644 doc/api/schema/JobTask.html.textile.liquid
 delete mode 100644 doc/api/schema/KeepDisk.html.textile.liquid
 delete mode 100644 doc/api/schema/KeepService.html.textile.liquid
 delete mode 100644 doc/api/schema/Link.html.textile.liquid
 delete mode 100644 doc/api/schema/Log.html.textile.liquid
 delete mode 100644 doc/api/schema/Node.html.textile.liquid
 delete mode 100644 doc/api/schema/PipelineInstance.html.textile.liquid
 delete mode 100644 doc/api/schema/PipelineTemplate.html.textile.liquid
 delete mode 100644 doc/api/schema/Repository.html.textile.liquid
 delete mode 100644 doc/api/schema/Specimen.html.textile.liquid
 delete mode 100644 doc/api/schema/Trait.html.textile.liquid
 delete mode 100644 doc/api/schema/User.html.textile.liquid
 delete mode 100644 doc/api/schema/VirtualMachine.html.textile.liquid
 delete mode 100644 doc/api/schema/Workflow.html.textile.liquid
 create mode 100644 doc/api/storage.html.textile.liquid
 create mode 100644 doc/api/tokens.html.textile.liquid
 create mode 100644 doc/images/Arvados_Permissions.svg
 create mode 100644 doc/images/Crunch_dispatch.svg
 create mode 100644 doc/images/Keep_manifests.svg
 create mode 100644 doc/images/Keep_reading_writing_block.svg
 create mode 100644 doc/images/Keep_rendezvous_hashing.svg
 create mode 100644 doc/images/Session_Establishment.svg
 create mode 100644 doc/sdk/go/example.html.textile.liquid
 create mode 100644 doc/sdk/java/example.html.textile.liquid
 create mode 100644 doc/sdk/perl/example.html.textile.liquid
 create mode 100644 doc/sdk/python/example.html.textile.liquid
 create mode 100644 doc/sdk/ruby/example.html.textile.liquid
 create mode 100644 services/api/db/migrate/20161111143147_add_scheduling_parameters_to_container.rb
 create mode 100644 services/ws/log.go

       via  f675fb2c202516021b961b5aa2de4528ba9f0d1f (commit)
       via  dfb995ab9ea0f1d8808c812870db717164ac95f4 (commit)
       via  567269bca35ce82045643f86d38992f45d75f435 (commit)
       via  8e37378b2955346c2b4a3c1e38fcdfb2e74b7e07 (commit)
       via  bc4dd2d01cda24a71f95b62ae1fa72aa9fb1226d (commit)
       via  5ffb79040668114c58bf35c3e18a8302b8d94445 (commit)
       via  d6b6b39bfe67926490506125c88f3567e45e7dcc (commit)
       via  7ae5533a074d80882171b33f7b659c9bcace1bd3 (commit)
       via  0c6cbd8a07e31decd703ef7fd9eca591e5661c32 (commit)
       via  f6f8feeada332a55d1a966e9f4a240d99dc58b55 (commit)
       via  d2f9e7809bd1f63638600c7fc8189182c0f327c0 (commit)
       via  2251688e66191ff1169080f50868bf57e463659c (commit)
       via  c14246b9a21d038fc6fa850f4032659a98397784 (commit)
       via  5d2ef6f7a2a8f93ec411c420287f30af92294520 (commit)
       via  780d334ec4b2d47379d0330ace77b3821c880842 (commit)
       via  5a420beeb6c64efc3ca0ef13d4ab9ac6c654c3ab (commit)
       via  eae48c31bb338689ec67fbc6a14a2e0b1fb5e3b6 (commit)
       via  1d656f4f1ec1f890a7677e748bea43a08cfa0b6c (commit)
       via  3af6db5dc4e2f08b2ebb49a82109c4325ad7fcc4 (commit)
       via  38fae0458644b89322ddeac125971800b9e452e5 (commit)
       via  02010431f52911a6ff908e673c534291beb929ac (commit)
       via  82fa37ac01169178f6a9b1c142926de7b50e8841 (commit)
       via  1129e9428dc1f3a300c4148bf12821eecf511ab3 (commit)
       via  90c48f84391d6b4d6b8ed366d5a42d24bf6c696f (commit)
       via  f6a8bf41c9f038ece715ec2744c36160f9c6c591 (commit)
       via  4225d058e0bc4380203fe5959e7e54febc91e83b (commit)
       via  ddeef8adc0cb7cdb55be644b9335ea51919ed513 (commit)
       via  c198274863bb5d72ef34dfc311c62bf50d6bd4f4 (commit)
       via  1e6d7656fbfd1f954571157fc7e7e4f75ea5911e (commit)
       via  aa49b45a4d25cb1e4282e242a2502c8a591f8615 (commit)
       via  79786a56410ef381499fb0bfdc5a18407ab33082 (commit)
       via  364fed6e1d4036719e4c461cfe0bc24e7f52f144 (commit)
       via  5fc627d22b47723289251e1e1d9dc45c0e1bd49e (commit)
       via  26e1c10f963a586e40ea9dcb46a87b0107c97b7c (commit)
       via  7e52fd153f2d16f94061ef1eabfe653d4a83852a (commit)
       via  b8610f34c21f1cf44b938802f37971b06af4361c (commit)
       via  0f2ab548f96e8604a929e0636517f634b7dfb0ad (commit)
       via  e59d21d3f47cbee83a6dc389584bd7b17bec270c (commit)
       via  051efbd3d843baa690b334e57fd09fad6a908bb9 (commit)
       via  3c2fee34ad8f668f5cf9001d6b7d605965ec28bb (commit)
       via  c10e9e5f3398d40a3346c7d7c1f84bf50262b8ec (commit)
       via  e0903a2cff2df4e6169e95f7439c0fa361c60ea8 (commit)
       via  94e52d9256cb17dddbc9c383d2ab90e713c25e3b (commit)
       via  42b9e37cd53d63980d3fa4a238f9ff6adad9ccc4 (commit)
       via  becada3b24006cf39417335794cb46556d7aa605 (commit)
      from  3e39ed33427a0c3000ed41b4826fcfa182934f71 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f675fb2c202516021b961b5aa2de4528ba9f0d1f
Merge: dfb995a d2f9e78
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 18:15:56 2016 -0500

    8460: Merge branch 'master' into 8460-websocket-go


commit dfb995ab9ea0f1d8808c812870db717164ac95f4
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 14:09:15 2016 -0500

    8460: Return recent events if last_log_id given.

diff --git a/services/ws/event.go b/services/ws/event.go
index 09c9d0f..77acf44 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -17,6 +17,7 @@ type eventSink interface {
 
 type eventSource interface {
 	NewSink() eventSink
+	DB() *sql.DB
 }
 
 type event struct {
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 59d690f..1470c66 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -21,7 +21,7 @@ type handler struct {
 	Client      arvados.Client
 	PingTimeout time.Duration
 	QueueSize   int
-	NewSession  func(wsConn, arvados.Client) (session, error)
+	NewSession  func(wsConn) (session, error)
 }
 
 type handlerStats struct {
@@ -32,7 +32,7 @@ type handlerStats struct {
 }
 
 func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
-	sess, err := h.NewSession(ws, h.Client)
+	sess, err := h.NewSession(ws)
 	if err != nil {
 		log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
 		return
@@ -72,6 +72,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 				return
 			}
 			for _, buf := range sess.Receive(msg, buf[:n]) {
+				sess.debugLogf("handler: to queue: %s", string(buf))
 				queue <- buf
 			}
 		}
@@ -81,6 +82,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 		for e := range queue {
 			if buf, ok := e.([]byte); ok {
 				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+				sess.debugLogf("handler: send msg: %s", string(buf))
 				_, err := ws.Write(buf)
 				if err != nil {
 					sess.debugLogf("handler: write {}: %s", err)
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 090e5ff..1dc06b8 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	pc.tidy()
 	now := time.Now()
 	if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
-		debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
+		debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
 		return perm.allowed, nil
 	}
 	var buf map[string]interface{}
@@ -64,10 +64,10 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	} else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
 		allowed = false
 	} else {
-		errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+		errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
 		return false, err
 	}
-	debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
+	debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
 	pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
 	return allowed, nil
 }
diff --git a/services/ws/pg.go b/services/ws/pg.go
index a5af9f7..08fbee1 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -152,6 +152,11 @@ func (ps *pgEventSource) NewSink() eventSink {
 	return sink
 }
 
+func (ps *pgEventSource) DB() *sql.DB {
+	ps.setupOnce.Do(ps.setup)
+	return ps.db
+}
+
 type pgEventSink struct {
 	channel chan *event
 	source  *pgEventSource
diff --git a/services/ws/router.go b/services/ws/router.go
index ba8b46b..2a4e52e 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"database/sql"
 	"encoding/json"
 	"log"
 	"net/http"
@@ -25,12 +26,13 @@ func (rtr *router) setup() {
 	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
 }
 
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
 	handler := &handler{
-		Client:      rtr.Config.Client,
 		PingTimeout: rtr.Config.PingTimeout.Duration(),
 		QueueSize:   rtr.Config.ClientEventQueue,
-		NewSession:  newSession,
+		NewSession:  func(ws wsConn) (session, error) {
+			return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+		},
 	}
 	return &websocket.Server{
 		Handshake: func(c *websocket.Config, r *http.Request) error {
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 5622304..33cdb2f 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -1,11 +1,10 @@
 package main
 
 import (
+	"database/sql"
 	"encoding/json"
 	"errors"
-	"fmt"
 	"log"
-	"net/url"
 	"sync"
 	"time"
 
@@ -24,15 +23,17 @@ var (
 
 type v0session struct {
 	ws            wsConn
+	db            *sql.DB
 	permChecker   permChecker
 	subscriptions []v0subscribe
 	mtx           sync.Mutex
 	setupOnce     sync.Once
 }
 
-func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
 	sess := &v0session{
 		ws:          ws,
+		db:          db,
 		permChecker: NewPermChecker(ac),
 	}
 
@@ -67,42 +68,7 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
 		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
 
-		resp := [][]byte{v0subscribeOK}
-
-		if sub.LastLogID > 0 {
-			// Hack 1: use the permission checker's
-			// Arvados client to retrieve old log IDs. Use
-			// "created < 2 hours ago" to ensure the
-			// database query doesn't get too expensive
-			// when our client gives us last_log_id==1.
-			ac := sess.permChecker.(*cachingPermChecker).Client
-			var old arvados.LogList
-			ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
-				"limit": {"1000"},
-				"filters": {fmt.Sprintf(
-					`[["id",">",%d],["created_at",">","%s"]]`,
-					sub.LastLogID,
-					time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
-			})
-			for _, log := range old.Items {
-				// Hack 2: populate the event's logRow
-				// using the API response -- otherwise
-				// Detail() would crash because e.db
-				// is nil.
-				e := &event{
-					LogID:    log.ID,
-					Received: time.Now(),
-					logRow:   &log,
-				}
-				msg, err := sess.EventMessage(e)
-				if err != nil {
-					continue
-				}
-				resp = append(resp, msg)
-			}
-		}
-
-		return [][]byte{v0subscribeOK}
+		return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
 	}
 	return [][]byte{v0subscribeFail}
 }
@@ -159,6 +125,58 @@ func (sess *v0session) Filter(e *event) bool {
 	return false
 }
 
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+	if sub.LastLogID == 0 {
+		return
+	}
+	debugLogf("getOldEvents(%d)", sub.LastLogID)
+	// Here we do a "select id" query and queue an event for every
+	// log since the given ID, then use (*event)Detail() to
+	// retrieve the whole row and decide whether to send it. This
+	// approach is very inefficient if the subscriber asks for
+	// last_log_id==1, even if the filters end up matching very
+	// few events.
+	//
+	// To mitigate this, filter on "created > 10 minutes ago" when
+	// retrieving the list of old event IDs to consider.
+	rows, err := sess.db.Query(
+		`SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+		sub.LastLogID,
+		time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+	if err != nil {
+		errorLogf("db.Query: %s", err)
+		return
+	}
+	for rows.Next() {
+		var id uint64
+		err := rows.Scan(&id)
+		if err != nil {
+			errorLogf("Scan: %s", err)
+			continue
+		}
+		e := &event{
+			LogID:    id,
+			Received: time.Now(),
+			db:       sess.db,
+		}
+		if !sub.match(e) {
+			debugLogf("skip old event %+v", e)
+			continue
+		}
+		msg, err := sess.EventMessage(e)
+		if err != nil {
+			debugLogf("event marshal: %s", err)
+			continue
+		}
+		debugLogf("old event: %s", string(msg))
+		msgs = append(msgs, msg)
+	}
+	if err := rows.Err(); err != nil {
+		errorLogf("db.Query: %s", err)
+	}
+	return
+}
+
 type v0subscribe struct {
 	Method    string
 	Filters   []v0filter
@@ -172,15 +190,16 @@ type v0filter [3]interface{}
 func (sub *v0subscribe) match(e *event) bool {
 	detail := e.Detail()
 	if detail == nil {
+		debugLogf("match(%d): failed on no detail", e.LogID)
 		return false
 	}
-	debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
 	for i, f := range sub.funcs {
 		if !f(e) {
-			debugLogf("sub.match: failed on func %d", i)
+			debugLogf("match(%d): failed on func %d", e.LogID, i)
 			return false
 		}
 	}
+	debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
 	return true
 }
 
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index bc09ed0..60d12c4 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -1,11 +1,12 @@
 package main
 
 import (
+	"database/sql"
 	"errors"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
 	return nil, errors.New("Not implemented")
 }

commit 567269bca35ce82045643f86d38992f45d75f435
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 14:08:27 2016 -0500

    8460: Pass tests even if websocket server sends events that do not match our filters.

diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 7cc35f9..7ce4dc9 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -51,21 +51,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         self.assertEqual(200, events.get(True, 5)['status'])
         human = arvados.api('v1').humans().create(body={}).execute()
 
-        log_object_uuids = []
-        for i in range(0, expected):
-            log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+        want_uuids = []
         if expected > 0:
-            self.assertIn(human['uuid'], log_object_uuids)
-
+            want_uuids.append(human['uuid'])
         if expected > 1:
-            self.assertIn(ancestor['uuid'], log_object_uuids)
+            want_uuids.append(ancestor['uuid'])
+        log_object_uuids = []
+        while set(want_uuids) - set(log_object_uuids):
+            log_object_uuids.append(events.get(True, 5)['object_uuid'])
 
-        with self.assertRaises(Queue.Empty):
-            # assertEqual just serves to show us what unexpected thing
-            # comes out of the queue when the assertRaises fails; when
-            # the test passes, this assertEqual doesn't get called.
-            self.assertEqual(events.get(True, 2), None)
+        if expected < 2:
+            with self.assertRaises(Queue.Empty):
+                # assertEqual just serves to show us what unexpected
+                # thing comes out of the queue when the assertRaises
+                # fails; when the test passes, this assertEqual
+                # doesn't get called.
+                self.assertEqual(events.get(True, 2), None)
 
     def test_subscribe_websocket(self):
         self._test_subscribe(

commit 8e37378b2955346c2b4a3c1e38fcdfb2e74b7e07
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 01:45:41 2016 -0500

    8460: Retrieve recent logs and send old matching events if last_log_id given.

diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
index caea04c..ef56e85 100644
--- a/sdk/go/arvados/log.go
+++ b/sdk/go/arvados/log.go
@@ -14,3 +14,11 @@ type Log struct {
 	Properties      map[string]interface{} `json:"properties"`
 	CreatedAt       *time.Time             `json:"created_at,omitempty"`
 }
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+	Items          []Log `json:"items"`
+	ItemsAvailable int   `json:"items_available"`
+	Offset         int   `json:"offset"`
+	Limit          int   `json:"limit"`
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index c888a84..5622304 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -3,7 +3,9 @@ package main
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"log"
+	"net/url"
 	"sync"
 	"time"
 
@@ -64,6 +66,42 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
 		sess.mtx.Lock()
 		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
+
+		resp := [][]byte{v0subscribeOK}
+
+		if sub.LastLogID > 0 {
+			// Hack 1: use the permission checker's
+			// Arvados client to retrieve old log IDs. Use
+			// "created < 2 hours ago" to ensure the
+			// database query doesn't get too expensive
+			// when our client gives us last_log_id==1.
+			ac := sess.permChecker.(*cachingPermChecker).Client
+			var old arvados.LogList
+			ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
+				"limit": {"1000"},
+				"filters": {fmt.Sprintf(
+					`[["id",">",%d],["created_at",">","%s"]]`,
+					sub.LastLogID,
+					time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
+			})
+			for _, log := range old.Items {
+				// Hack 2: populate the event's logRow
+				// using the API response -- otherwise
+				// Detail() would crash because e.db
+				// is nil.
+				e := &event{
+					LogID:    log.ID,
+					Received: time.Now(),
+					logRow:   &log,
+				}
+				msg, err := sess.EventMessage(e)
+				if err != nil {
+					continue
+				}
+				resp = append(resp, msg)
+			}
+		}
+
 		return [][]byte{v0subscribeOK}
 	}
 	return [][]byte{v0subscribeFail}
@@ -122,9 +160,11 @@ func (sess *v0session) Filter(e *event) bool {
 }
 
 type v0subscribe struct {
-	Method  string
-	Filters []v0filter
-	funcs   []func(*event) bool
+	Method    string
+	Filters   []v0filter
+	LastLogID int64 `json:"last_log_id"`
+
+	funcs []func(*event) bool
 }
 
 type v0filter [3]interface{}

commit bc4dd2d01cda24a71f95b62ae1fa72aa9fb1226d
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 01:11:36 2016 -0500

    8460: Allow session Receive handler to queue multiple messages.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index 721c36f..59d690f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -71,9 +71,8 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 				stop <- err
 				return
 			}
-			e := sess.Receive(msg, buf[:n])
-			if e != nil {
-				queue <- e
+			for _, buf := range sess.Receive(msg, buf[:n]) {
+				queue <- buf
 			}
 		}
 	}()
diff --git a/services/ws/session.go b/services/ws/session.go
index 9111c6c..a0658d9 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -2,9 +2,9 @@ package main
 
 type session interface {
 	// Receive processes a message received from the client. If
-	// the returned response is non-nil, it will be queued and
-	// sent the client.
-	Receive(map[string]interface{}, []byte) []byte
+	// the returned list of messages is non-nil, they will be
+	// queued for sending to the client.
+	Receive(map[string]interface{}, []byte) [][]byte
 
 	// Filter returns true if the event should be queued for
 	// sending to the client. It should return as fast as
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 2035acb..c888a84 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -15,6 +15,9 @@ var (
 	errFrameTooBig = errors.New("frame too big")
 
 	sendObjectAttributes = []string{"state", "name"}
+
+	v0subscribeOK   = []byte(`{"status":200}`)
+	v0subscribeFail = []byte(`{"status":400}`)
 )
 
 type v0session struct {
@@ -48,7 +51,7 @@ func (sess *v0session) debugLogf(s string, args ...interface{}) {
 	debugLogf("%s "+s, args...)
 }
 
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
 	sess.debugLogf("received message: %+v", msg)
 	var sub v0subscribe
 	if err := json.Unmarshal(buf, &sub); err != nil {
@@ -61,9 +64,9 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
 		sess.mtx.Lock()
 		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
-		return []byte(`{"status":200}`)
+		return [][]byte{v0subscribeOK}
 	}
-	return []byte(`{"status":400}`)
+	return [][]byte{v0subscribeFail}
 }
 
 func (sess *v0session) EventMessage(e *event) ([]byte, error) {

commit 5ffb79040668114c58bf35c3e18a8302b8d94445
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Nov 16 00:56:46 2016 -0500

    8460: Support created_at filters.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 399de7e..2c28817 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -294,6 +294,8 @@ stop_services() {
     if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
         unset ARVADOS_TEST_API_HOST
         cd "$WORKSPACE" \
+            && python sdk/python/tests/run_test_server.py stop_nginx \
+            && python sdk/python/tests/run_test_server.py stop_ws \
             && python sdk/python/tests/run_test_server.py stop
     fi
 }
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index f2cdba2..7cc35f9 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -143,8 +143,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
         return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
 
     def isotz(self, offset):
-        """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
-        return '{:+03d}{:02d}'.format(offset/60, offset%60)
+        """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+        return '{:+03d}:{:02d}'.format(offset/60, offset%60)
 
     # Test websocket reconnection on (un)execpted close
     def _test_websocket_reconnect(self, close_unexpected):
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 3f16a89..090e5ff 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	pc.tidy()
 	now := time.Now()
 	if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
-		debugLogf("perm (cached): %+q %+q => %s", pc.Client.AuthToken, uuid, perm.allowed)
+		debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
 		return perm.allowed, nil
 	}
 	var buf map[string]interface{}
@@ -67,7 +67,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 		errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
 		return false, err
 	}
-	debugLogf("perm: %+q %+q => %s", pc.Client.AuthToken, uuid, allowed)
+	debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
 	pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
 	return allowed, nil
 }
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3a9cc31..2035acb 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"log"
 	"sync"
+	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
@@ -16,28 +17,18 @@ var (
 	sendObjectAttributes = []string{"state", "name"}
 )
 
-type sessionV0 struct {
-	ws          wsConn
-	permChecker permChecker
-	subscribed  map[string]bool
-	eventTypes  map[string]bool
-	mtx         sync.Mutex
-	setupOnce   sync.Once
+type v0session struct {
+	ws            wsConn
+	permChecker   permChecker
+	subscriptions []v0subscribe
+	mtx           sync.Mutex
+	setupOnce     sync.Once
 }
 
-type v0subscribe struct {
-	Method  string
-	Filters []v0filter
-}
-
-type v0filter []interface{}
-
 func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
-	sess := &sessionV0{
+	sess := &v0session{
 		ws:          ws,
 		permChecker: NewPermChecker(ac),
-		subscribed:  make(map[string]bool),
-		eventTypes:  make(map[string]bool),
 	}
 
 	err := ws.Request().ParseForm()
@@ -52,56 +43,12 @@ func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
 	return sess, nil
 }
 
-func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+func (sess *v0session) debugLogf(s string, args ...interface{}) {
 	args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
 	debugLogf("%s "+s, args...)
 }
 
-// If every client subscription message includes filters consisting
-// only of [["event_type","in",...]] then send only the requested
-// event types. Otherwise, clear sess.eventTypes and send all event
-// types from now on.
-func (sess *sessionV0) checkFilters(filters []v0filter) {
-	if sess.eventTypes == nil {
-		// Already received a subscription request without
-		// event_type filters.
-		return
-	}
-	eventTypes := sess.eventTypes
-	sess.eventTypes = nil
-	if len(filters) == 0 {
-		return
-	}
-	useFilters := false
-	for _, f := range filters {
-		col, ok := f[0].(string)
-		if !ok || col != "event_type" {
-			continue
-		}
-		op, ok := f[1].(string)
-		if !ok || op != "in" {
-			return
-		}
-		arr, ok := f[2].([]interface{})
-		if !ok {
-			return
-		}
-		useFilters = true
-		for _, s := range arr {
-			if s, ok := s.(string); ok {
-				eventTypes[s] = true
-			} else {
-				return
-			}
-		}
-	}
-	if useFilters {
-		sess.debugLogf("eventTypes %+v", eventTypes)
-		sess.eventTypes = eventTypes
-	}
-}
-
-func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
 	sess.debugLogf("received message: %+v", msg)
 	var sub v0subscribe
 	if err := json.Unmarshal(buf, &sub); err != nil {
@@ -109,17 +56,17 @@ func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
 		return nil
 	}
 	if sub.Method == "subscribe" {
-		sess.debugLogf("subscribing to *")
+		sub.prepare()
+		sess.debugLogf("subscription: %v", sub)
 		sess.mtx.Lock()
-		sess.checkFilters(sub.Filters)
-		sess.subscribed["*"] = true
+		sess.subscriptions = append(sess.subscriptions, sub)
 		sess.mtx.Unlock()
 		return []byte(`{"status":200}`)
 	}
 	return []byte(`{"status":400}`)
 }
 
-func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
 	detail := e.Detail()
 	if detail == nil {
 		return nil, nil
@@ -160,24 +107,110 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 	return json.Marshal(msg)
 }
 
-func (sess *sessionV0) Filter(e *event) bool {
-	detail := e.Detail()
+func (sess *v0session) Filter(e *event) bool {
 	sess.mtx.Lock()
 	defer sess.mtx.Unlock()
-	switch {
-	case sess.eventTypes != nil && detail == nil:
-		return false
-	case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
-		return false
-	case sess.subscribed["*"]:
-		return true
-	case detail == nil:
-		return false
-	case sess.subscribed[detail.ObjectUUID]:
-		return true
-	case sess.subscribed[detail.ObjectOwnerUUID]:
-		return true
-	default:
+	for _, sub := range sess.subscriptions {
+		if sub.match(e) {
+			return true
+		}
+	}
+	return false
+}
+
+type v0subscribe struct {
+	Method  string
+	Filters []v0filter
+	funcs   []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(e *event) bool {
+	detail := e.Detail()
+	if detail == nil {
 		return false
 	}
+	debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
+	for i, f := range sub.funcs {
+		if !f(e) {
+			debugLogf("sub.match: failed on func %d", i)
+			return false
+		}
+	}
+	return true
+}
+
+func (sub *v0subscribe) prepare() {
+	for _, f := range sub.Filters {
+		if len(f) != 3 {
+			continue
+		}
+		if col, ok := f[0].(string); ok && col == "event_type" {
+			op, ok := f[1].(string)
+			if !ok || op != "in" {
+				continue
+			}
+			arr, ok := f[2].([]interface{})
+			if !ok {
+				continue
+			}
+			var strs []string
+			for _, s := range arr {
+				if s, ok := s.(string); ok {
+					strs = append(strs, s)
+				}
+			}
+			sub.funcs = append(sub.funcs, func(e *event) bool {
+				debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
+				for _, s := range strs {
+					if s == e.Detail().EventType {
+						return true
+					}
+				}
+				return false
+			})
+		} else if ok && col == "created_at" {
+			op, ok := f[1].(string)
+			if !ok {
+				continue
+			}
+			tstr, ok := f[2].(string)
+			if !ok {
+				continue
+			}
+			t, err := time.Parse(time.RFC3339Nano, tstr)
+			if err != nil {
+				debugLogf("time.Parse(%q): %s", tstr, err)
+				continue
+			}
+			switch op {
+			case ">=":
+				sub.funcs = append(sub.funcs, func(e *event) bool {
+					debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+					return !e.Detail().CreatedAt.Before(t)
+				})
+			case "<=":
+				sub.funcs = append(sub.funcs, func(e *event) bool {
+					debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+					return !e.Detail().CreatedAt.After(t)
+				})
+			case ">":
+				sub.funcs = append(sub.funcs, func(e *event) bool {
+					debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+					return e.Detail().CreatedAt.After(t)
+				})
+			case "<":
+				sub.funcs = append(sub.funcs, func(e *event) bool {
+					debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+					return e.Detail().CreatedAt.Before(t)
+				})
+			case "=":
+				sub.funcs = append(sub.funcs, func(e *event) bool {
+					debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+					return e.Detail().CreatedAt.Equal(t)
+				})
+			}
+		}
+	}
 }

commit d6b6b39bfe67926490506125c88f3567e45e7dcc
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 23:26:57 2016 -0500

    8460: Send {"status":200} messages. Bring up ws server for Python SDK tests.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index c771bdc..399de7e 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -265,6 +265,8 @@ start_api() {
         && eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
         && export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
         && export ARVADOS_TEST_API_INSTALLED="$$" \
+        && python sdk/python/tests/run_test_server.py start_ws \
+        && python sdk/python/tests/run_test_server.py start_nginx \
         && (env | egrep ^ARVADOS)
 }
 
@@ -273,8 +275,8 @@ start_nginx_proxy_services() {
     cd "$WORKSPACE" \
         && python sdk/python/tests/run_test_server.py start_keep_proxy \
         && python sdk/python/tests/run_test_server.py start_keep-web \
-        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+        && python sdk/python/tests/run_test_server.py start_ws \
         && python sdk/python/tests/run_test_server.py start_nginx \
         && export ARVADOS_TEST_PROXY_SERVICES=1
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 28b121f..721c36f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -38,7 +38,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 		return
 	}
 
-	queue := make(chan *event, h.QueueSize)
+	queue := make(chan interface{}, h.QueueSize)
 
 	stopped := make(chan struct{})
 	stop := make(chan error, 5)
@@ -71,15 +71,18 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 				stop <- err
 				return
 			}
-			sess.Receive(msg, buf[:n])
+			e := sess.Receive(msg, buf[:n])
+			if e != nil {
+				queue <- e
+			}
 		}
 	}()
 
 	go func() {
 		for e := range queue {
-			if e == nil {
+			if buf, ok := e.([]byte); ok {
 				ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
-				_, err := ws.Write([]byte("{}"))
+				_, err := ws.Write(buf)
 				if err != nil {
 					sess.debugLogf("handler: write {}: %s", err)
 					stop <- err
@@ -87,6 +90,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 				}
 				continue
 			}
+			e := e.(*event)
 
 			buf, err := sess.EventMessage(e)
 			if err != nil {
@@ -148,7 +152,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 				// socket, and prevent an idle socket
 				// from being closed.
 				if len(queue) == 0 {
-					queue <- nil
+					queue <- []byte(`{}`)
 				}
 				continue
 			case e, ok = <-events:
diff --git a/services/ws/session.go b/services/ws/session.go
index 98164e3..9111c6c 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,8 +1,27 @@
 package main
 
 type session interface {
-	Receive(map[string]interface{}, []byte)
-	EventMessage(*event) ([]byte, error)
+	// Receive processes a message received from the client. If
+	// the returned response is non-nil, it will be queued and
+	// sent the client.
+	Receive(map[string]interface{}, []byte) []byte
+
+	// Filter returns true if the event should be queued for
+	// sending to the client. It should return as fast as
+	// possible, and must not block.
 	Filter(*event) bool
+
+	// EventMessage encodes the given event (from the front of the
+	// queue) into a form suitable to send to the client. If a
+	// non-nil error is returned, the connection is terminated. If
+	// the returned buffer is empty, nothing is sent to the client
+	// and the event is not counted in statistics.
+	//
+	// Unlike Filter, EventMessage can block without affecting
+	// other connections. If EventMessage is slow, additional
+	// incoming events will be queued. If the event queue fills
+	// up, the connection will be dropped.
+	EventMessage(*event) ([]byte, error)
+
 	debugLogf(string, ...interface{})
 }
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3894e30..3a9cc31 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -101,12 +101,12 @@ func (sess *sessionV0) checkFilters(filters []v0filter) {
 	}
 }
 
-func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
 	sess.debugLogf("received message: %+v", msg)
 	var sub v0subscribe
 	if err := json.Unmarshal(buf, &sub); err != nil {
 		sess.debugLogf("ignored unrecognized request: %s", err)
-		return
+		return nil
 	}
 	if sub.Method == "subscribe" {
 		sess.debugLogf("subscribing to *")
@@ -114,7 +114,9 @@ func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
 		sess.checkFilters(sub.Filters)
 		sess.subscribed["*"] = true
 		sess.mtx.Unlock()
+		return []byte(`{"status":200}`)
 	}
+	return []byte(`{"status":400}`)
 }
 
 func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {

commit 7ae5533a074d80882171b33f7b659c9bcace1bd3
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 23:00:58 2016 -0500

    8460: Log connection stats.

diff --git a/services/ws/handler.go b/services/ws/handler.go
index 1c9d5ba..28b121f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -24,7 +24,14 @@ type handler struct {
 	NewSession  func(wsConn, arvados.Client) (session, error)
 }
 
-func (h *handler) Handle(ws wsConn, events <-chan *event) {
+type handlerStats struct {
+	QueueDelay time.Duration
+	WriteDelay time.Duration
+	EventBytes uint64
+	EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
 	sess, err := h.NewSession(ws, h.Client)
 	if err != nil {
 		log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
@@ -93,6 +100,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 
 			sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
 			ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+			t0 := time.Now()
 			_, err = ws.Write(buf)
 			if err != nil {
 				sess.debugLogf("handler: write: %s", err)
@@ -100,6 +108,10 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 				break
 			}
 			sess.debugLogf("handler: sent event %d", e.Serial)
+			stats.WriteDelay += time.Since(t0)
+			stats.QueueDelay += t0.Sub(e.Received)
+			stats.EventBytes += uint64(len(buf))
+			stats.EventCount++
 		}
 		for _ = range queue {
 		}
@@ -153,4 +165,6 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
 
 	<-stop
 	close(stopped)
+
+	return
 }
diff --git a/services/ws/log.go b/services/ws/log.go
new file mode 100644
index 0000000..1511691
--- /dev/null
+++ b/services/ws/log.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"time"
+)
+
+func init() {
+	log.SetFlags(0)
+}
+
+func errorLogf(f string, args ...interface{}) {
+	log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+var debugLogf = func(f string, args ...interface{}) {
+	log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+func mustMarshal(v interface{}) []byte {
+	buf, err := json.Marshal(v)
+	if err != nil {
+		panic(err)
+	}
+	return buf
+}
+
+func logj(args ...interface{}) {
+	m := map[string]interface{}{"Time": time.Now().UTC()}
+	for i := 0; i < len(args)-1; i += 2 {
+		m[fmt.Sprintf("%s", args[i])] = args[i+1]
+	}
+	buf, err := json.Marshal(m)
+	if err != nil {
+		errorLogf("logj: %s", err)
+		return
+	}
+	log.Print(string(buf))
+}
diff --git a/services/ws/main.go b/services/ws/main.go
index a143ae9..719128f 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -10,8 +10,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/config"
 )
 
-var debugLogf = func(string, ...interface{}) {}
-
 func main() {
 	configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
 	dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
@@ -22,8 +20,8 @@ func main() {
 	if err != nil {
 		log.Fatal(err)
 	}
-	if cfg.Debug {
-		debugLogf = log.Printf
+	if !cfg.Debug {
+		debugLogf = func(string, ...interface{}) {}
 	}
 
 	if *dumpConfig {
diff --git a/services/ws/router.go b/services/ws/router.go
index 69654cb..ba8b46b 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -37,8 +37,18 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session,
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
+			logj("Type", "connect",
+				"RemoteAddr", ws.Request().RemoteAddr)
+			t0 := time.Now()
+
 			sink := rtr.eventSource.NewSink()
-			handler.Handle(ws, sink.Channel())
+			stats := handler.Handle(ws, sink.Channel())
+
+			logj("Type", "disconnect",
+				"RemoteAddr", ws.Request().RemoteAddr,
+				"Elapsed", time.Now().Sub(t0).Seconds(),
+				"Stats", stats)
+
 			sink.Stop()
 			ws.Close()
 		}),
@@ -47,22 +57,10 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session,
 
 func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	rtr.setupOnce.Do(rtr.setup)
-	t0 := time.Now()
-	reqLog(map[string]interface{}{
-		"Connect":         req.RemoteAddr,
-		"RemoteAddr":      req.RemoteAddr,
-		"X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
-		"Time":            t0.UTC(),
-	})
+	logj("Type", "request",
+		"RemoteAddr", req.RemoteAddr,
+		"X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
 	rtr.mux.ServeHTTP(resp, req)
-	t1 := time.Now()
-	reqLog(map[string]interface{}{
-		"Disconnect":      req.RemoteAddr,
-		"RemoteAddr":      req.RemoteAddr,
-		"X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
-		"Time":            t1.UTC(),
-		"Elapsed":         time.Now().Sub(t0).Seconds(),
-	})
 }
 
 func reqLog(m map[string]interface{}) {

commit 0c6cbd8a07e31decd703ef7fd9eca591e5661c32
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 22:44:33 2016 -0500

    8460: Cache negative permission.

diff --git a/services/ws/permission.go b/services/ws/permission.go
index b2b962c..3f16a89 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -22,14 +22,19 @@ func NewPermChecker(ac arvados.Client) permChecker {
 	ac.AuthToken = ""
 	return &cachingPermChecker{
 		Client:     &ac,
-		cache:      make(map[string]time.Time),
+		cache:      make(map[string]cacheEnt),
 		maxCurrent: 16,
 	}
 }
 
+type cacheEnt struct {
+	time.Time
+	allowed bool
+}
+
 type cachingPermChecker struct {
 	*arvados.Client
-	cache      map[string]time.Time
+	cache      map[string]cacheEnt
 	maxCurrent int
 }
 
@@ -39,9 +44,10 @@ func (pc *cachingPermChecker) SetToken(token string) {
 
 func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	pc.tidy()
-	if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge {
-		debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid)
-		return true, nil
+	now := time.Now()
+	if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+		debugLogf("perm (cached): %+q %+q => %s", pc.Client.AuthToken, uuid, perm.allowed)
+		return perm.allowed, nil
 	}
 	var buf map[string]interface{}
 	path, err := pc.PathForUUID("get", uuid)
@@ -51,17 +57,19 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
 	err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
 		"select": {`["uuid"]`},
 	})
-	if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
-		debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
-		return false, nil
-	}
-	if err != nil {
-		debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid)
+
+	var allowed bool
+	if err == nil {
+		allowed = true
+	} else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+		allowed = false
+	} else {
+		errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
 		return false, err
 	}
-	debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid)
-	pc.cache[uuid] = time.Now()
-	return true, nil
+	debugLogf("perm: %+q %+q => %s", pc.Client.AuthToken, uuid, allowed)
+	pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+	return allowed, nil
 }
 
 func (pc *cachingPermChecker) tidy() {

commit f6f8feeada332a55d1a966e9f4a240d99dc58b55
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 16:30:01 2016 -0500

    8460: Send selected old/new attributes with v0 events.

diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 467d156..3894e30 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -12,6 +12,8 @@ import (
 var (
 	errQueueFull   = errors.New("client queue full")
 	errFrameTooBig = errors.New("frame too big")
+
+	sendObjectAttributes = []string{"state", "name"}
 )
 
 type sessionV0 struct {
@@ -136,6 +138,22 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
 	}
 	if detail.Properties != nil && detail.Properties["text"] != nil {
 		msg["properties"] = detail.Properties
+	} else {
+		msgProps := map[string]map[string]interface{}{}
+		for _, ak := range []string{"old_attributes", "new_attributes"} {
+			eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+			if !ok {
+				continue
+			}
+			msgAttrs := map[string]interface{}{}
+			for _, k := range sendObjectAttributes {
+				if v, ok := eventAttrs[k]; ok {
+					msgAttrs[k] = v
+				}
+			}
+			msgProps[ak] = msgAttrs
+		}
+		msg["properties"] = msgProps
 	}
 	return json.Marshal(msg)
 }

commit 5a420beeb6c64efc3ca0ef13d4ab9ac6c654c3ab
Merge: 3e39ed3 3af6db5
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 15 13:25:03 2016 -0500

    8460: Merge branch 'master' into 8460-websocket-go


-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list