[ARVADOS] updated: f675fb2c202516021b961b5aa2de4528ba9f0d1f
Git user
git at public.curoverse.com
Wed Nov 16 18:16:09 EST 2016
Summary of changes:
.gitignore | 3 +
build/run-tests.sh | 6 +-
doc/_config.yml | 83 +++--
doc/_layouts/default.html.liquid | 2 +-
doc/api/authentication.html.textile.liquid | 40 ---
doc/api/crunch-scripts.html.textile.liquid | 5 +-
doc/api/execution.html.textile.liquid | 27 ++
doc/api/index.html.textile.liquid | 44 +--
doc/api/methods.html.textile.liquid | 120 +++----
.../api_client_authorizations.html.textile.liquid | 54 +++-
doc/api/methods/api_clients.html.textile.liquid | 40 ++-
.../methods/authorized_keys.html.textile.liquid | 43 ++-
doc/api/methods/collections.html.textile.liquid | 55 +++-
.../methods/container_requests.html.textile.liquid | 100 ++++--
doc/api/methods/containers.html.textile.liquid | 76 ++++-
doc/api/methods/groups.html.textile.liquid | 47 +--
doc/api/methods/humans.html.textile.liquid | 39 ++-
doc/api/methods/job_tasks.html.textile.liquid | 55 +++-
doc/api/methods/jobs.html.textile.liquid | 101 ++++--
doc/api/methods/keep_disks.html.textile.liquid | 52 ++-
doc/api/methods/keep_services.html.textile.liquid | 52 +--
doc/api/methods/links.html.textile.liquid | 69 ++--
doc/api/methods/logs.html.textile.liquid | 49 ++-
doc/api/methods/nodes.html.textile.liquid | 50 ++-
.../methods/pipeline_instances.html.textile.liquid | 44 ++-
.../methods/pipeline_templates.html.textile.liquid | 182 ++++++++++-
doc/api/methods/repositories.html.textile.liquid | 45 ++-
doc/api/methods/specimens.html.textile.liquid | 41 ++-
doc/api/methods/traits.html.textile.liquid | 40 ++-
doc/api/methods/users.html.textile.liquid | 67 ++--
.../methods/virtual_machines.html.textile.liquid | 52 +--
doc/api/methods/workflows.html.textile.liquid | 44 ++-
doc/api/permission-model.html.textile.liquid | 143 +++------
doc/api/requests.html.textile.liquid | 349 +++++++++++++++++++++
doc/api/resources.html.textile.liquid | 46 ++-
doc/api/schema/ApiClient.html.textile.liquid | 24 --
.../ApiClientAuthorization.html.textile.liquid | 29 --
doc/api/schema/AuthorizedKey.html.textile.liquid | 24 --
doc/api/schema/Collection.html.textile.liquid | 39 ---
doc/api/schema/Container.html.textile.liquid | 59 ----
.../schema/ContainerRequest.html.textile.liquid | 70 -----
doc/api/schema/Group.html.textile.liquid | 25 --
doc/api/schema/Human.html.textile.liquid | 19 --
doc/api/schema/Job.html.textile.liquid | 69 ----
doc/api/schema/JobTask.html.textile.liquid | 47 ---
doc/api/schema/KeepDisk.html.textile.liquid | 31 --
doc/api/schema/KeepService.html.textile.liquid | 24 --
doc/api/schema/Link.html.textile.liquid | 83 -----
doc/api/schema/Log.html.textile.liquid | 33 --
doc/api/schema/Node.html.textile.liquid | 28 --
.../schema/PipelineInstance.html.textile.liquid | 26 --
.../schema/PipelineTemplate.html.textile.liquid | 161 ----------
doc/api/schema/Repository.html.textile.liquid | 25 --
doc/api/schema/Specimen.html.textile.liquid | 22 --
doc/api/schema/Trait.html.textile.liquid | 22 --
doc/api/schema/User.html.textile.liquid | 30 --
doc/api/schema/VirtualMachine.html.textile.liquid | 21 --
doc/api/schema/Workflow.html.textile.liquid | 23 --
doc/api/storage.html.textile.liquid | 170 ++++++++++
doc/api/tokens.html.textile.liquid | 63 ++++
doc/images/Arvados_Permissions.svg | 4 +
doc/images/Crunch_dispatch.svg | 4 +
doc/images/Keep_manifests.svg | 4 +
doc/images/Keep_reading_writing_block.svg | 4 +
doc/images/Keep_rendezvous_hashing.svg | 4 +
doc/images/Session_Establishment.svg | 4 +
.../install-prerequisites.html.textile.liquid | 4 +-
doc/sdk/cli/install.html.textile.liquid | 1 -
doc/sdk/go/example.html.textile.liquid | 76 +++++
doc/sdk/go/index.html.textile.liquid | 15 +-
doc/sdk/index.html.textile.liquid | 13 +-
doc/sdk/java/example.html.textile.liquid | 78 +++++
doc/sdk/java/index.html.textile.liquid | 11 +-
doc/sdk/perl/example.html.textile.liquid | 81 +++++
doc/sdk/perl/index.html.textile.liquid | 59 +---
doc/sdk/python/events.html.textile.liquid | 2 +-
doc/sdk/python/example.html.textile.liquid | 51 +++
doc/sdk/python/sdk-python.html.textile.liquid | 3 +-
doc/sdk/ruby/example.html.textile.liquid | 75 +++++
doc/sdk/ruby/index.html.textile.liquid | 74 +----
doc/user/cwl/cwl-runner.html.textile.liquid | 1 +
.../reference/job-pipeline-ref.html.textile.liquid | 2 +-
.../running-external-program.html.textile.liquid | 4 +-
.../tutorial-submit-job.html.textile.liquid | 4 +-
sdk/cwl/arvados_cwl/__init__.py | 23 +-
sdk/cwl/arvados_cwl/arvcontainer.py | 7 +-
sdk/cwl/arvados_cwl/arvjob.py | 6 +-
sdk/cwl/arvados_cwl/crunch_script.py | 7 +-
sdk/cwl/arvados_cwl/runner.py | 3 +-
sdk/cwl/tests/test_container.py | 23 +-
sdk/cwl/tests/test_make_output.py | 9 +-
sdk/cwl/tests/test_submit.py | 88 ++++++
sdk/go/arvados/container.go | 38 ++-
sdk/go/arvados/log.go | 8 +
sdk/python/tests/test_events.py | 29 +-
services/api/app/models/container.rb | 11 +-
services/api/app/models/container_request.rb | 20 +-
...43147_add_scheduling_parameters_to_container.rb | 6 +
services/api/db/structure.sql | 10 +-
services/api/test/unit/container_request_test.rb | 32 ++
services/arv-git-httpd/main.go | 2 +-
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 6 +-
.../crunch-dispatch-slurm_test.go | 2 +-
services/keep-web/main.go | 2 +-
services/keepproxy/keepproxy.go | 2 +-
services/keepstore/keepstore.go | 2 +-
services/ws/event.go | 1 +
services/ws/handler.go | 35 ++-
services/ws/log.go | 41 +++
services/ws/main.go | 6 +-
services/ws/permission.go | 36 ++-
services/ws/pg.go | 5 +
services/ws/router.go | 38 +--
services/ws/session.go | 23 +-
services/ws/session_v0.go | 285 ++++++++++++-----
services/ws/session_v1.go | 3 +-
tools/arvbox/lib/arvbox/docker/Dockerfile.base | 48 ++-
tools/arvbox/lib/arvbox/docker/common.sh | 6 +-
.../lib/arvbox/docker/service/ready/run-service | 4 +-
119 files changed, 2838 insertions(+), 2039 deletions(-)
delete mode 100644 doc/api/authentication.html.textile.liquid
create mode 100644 doc/api/execution.html.textile.liquid
create mode 100644 doc/api/requests.html.textile.liquid
delete mode 100644 doc/api/schema/ApiClient.html.textile.liquid
delete mode 100644 doc/api/schema/ApiClientAuthorization.html.textile.liquid
delete mode 100644 doc/api/schema/AuthorizedKey.html.textile.liquid
delete mode 100644 doc/api/schema/Collection.html.textile.liquid
delete mode 100644 doc/api/schema/Container.html.textile.liquid
delete mode 100644 doc/api/schema/ContainerRequest.html.textile.liquid
delete mode 100644 doc/api/schema/Group.html.textile.liquid
delete mode 100644 doc/api/schema/Human.html.textile.liquid
delete mode 100644 doc/api/schema/Job.html.textile.liquid
delete mode 100644 doc/api/schema/JobTask.html.textile.liquid
delete mode 100644 doc/api/schema/KeepDisk.html.textile.liquid
delete mode 100644 doc/api/schema/KeepService.html.textile.liquid
delete mode 100644 doc/api/schema/Link.html.textile.liquid
delete mode 100644 doc/api/schema/Log.html.textile.liquid
delete mode 100644 doc/api/schema/Node.html.textile.liquid
delete mode 100644 doc/api/schema/PipelineInstance.html.textile.liquid
delete mode 100644 doc/api/schema/PipelineTemplate.html.textile.liquid
delete mode 100644 doc/api/schema/Repository.html.textile.liquid
delete mode 100644 doc/api/schema/Specimen.html.textile.liquid
delete mode 100644 doc/api/schema/Trait.html.textile.liquid
delete mode 100644 doc/api/schema/User.html.textile.liquid
delete mode 100644 doc/api/schema/VirtualMachine.html.textile.liquid
delete mode 100644 doc/api/schema/Workflow.html.textile.liquid
create mode 100644 doc/api/storage.html.textile.liquid
create mode 100644 doc/api/tokens.html.textile.liquid
create mode 100644 doc/images/Arvados_Permissions.svg
create mode 100644 doc/images/Crunch_dispatch.svg
create mode 100644 doc/images/Keep_manifests.svg
create mode 100644 doc/images/Keep_reading_writing_block.svg
create mode 100644 doc/images/Keep_rendezvous_hashing.svg
create mode 100644 doc/images/Session_Establishment.svg
create mode 100644 doc/sdk/go/example.html.textile.liquid
create mode 100644 doc/sdk/java/example.html.textile.liquid
create mode 100644 doc/sdk/perl/example.html.textile.liquid
create mode 100644 doc/sdk/python/example.html.textile.liquid
create mode 100644 doc/sdk/ruby/example.html.textile.liquid
create mode 100644 services/api/db/migrate/20161111143147_add_scheduling_parameters_to_container.rb
create mode 100644 services/ws/log.go
via f675fb2c202516021b961b5aa2de4528ba9f0d1f (commit)
via dfb995ab9ea0f1d8808c812870db717164ac95f4 (commit)
via 567269bca35ce82045643f86d38992f45d75f435 (commit)
via 8e37378b2955346c2b4a3c1e38fcdfb2e74b7e07 (commit)
via bc4dd2d01cda24a71f95b62ae1fa72aa9fb1226d (commit)
via 5ffb79040668114c58bf35c3e18a8302b8d94445 (commit)
via d6b6b39bfe67926490506125c88f3567e45e7dcc (commit)
via 7ae5533a074d80882171b33f7b659c9bcace1bd3 (commit)
via 0c6cbd8a07e31decd703ef7fd9eca591e5661c32 (commit)
via f6f8feeada332a55d1a966e9f4a240d99dc58b55 (commit)
via d2f9e7809bd1f63638600c7fc8189182c0f327c0 (commit)
via 2251688e66191ff1169080f50868bf57e463659c (commit)
via c14246b9a21d038fc6fa850f4032659a98397784 (commit)
via 5d2ef6f7a2a8f93ec411c420287f30af92294520 (commit)
via 780d334ec4b2d47379d0330ace77b3821c880842 (commit)
via 5a420beeb6c64efc3ca0ef13d4ab9ac6c654c3ab (commit)
via eae48c31bb338689ec67fbc6a14a2e0b1fb5e3b6 (commit)
via 1d656f4f1ec1f890a7677e748bea43a08cfa0b6c (commit)
via 3af6db5dc4e2f08b2ebb49a82109c4325ad7fcc4 (commit)
via 38fae0458644b89322ddeac125971800b9e452e5 (commit)
via 02010431f52911a6ff908e673c534291beb929ac (commit)
via 82fa37ac01169178f6a9b1c142926de7b50e8841 (commit)
via 1129e9428dc1f3a300c4148bf12821eecf511ab3 (commit)
via 90c48f84391d6b4d6b8ed366d5a42d24bf6c696f (commit)
via f6a8bf41c9f038ece715ec2744c36160f9c6c591 (commit)
via 4225d058e0bc4380203fe5959e7e54febc91e83b (commit)
via ddeef8adc0cb7cdb55be644b9335ea51919ed513 (commit)
via c198274863bb5d72ef34dfc311c62bf50d6bd4f4 (commit)
via 1e6d7656fbfd1f954571157fc7e7e4f75ea5911e (commit)
via aa49b45a4d25cb1e4282e242a2502c8a591f8615 (commit)
via 79786a56410ef381499fb0bfdc5a18407ab33082 (commit)
via 364fed6e1d4036719e4c461cfe0bc24e7f52f144 (commit)
via 5fc627d22b47723289251e1e1d9dc45c0e1bd49e (commit)
via 26e1c10f963a586e40ea9dcb46a87b0107c97b7c (commit)
via 7e52fd153f2d16f94061ef1eabfe653d4a83852a (commit)
via b8610f34c21f1cf44b938802f37971b06af4361c (commit)
via 0f2ab548f96e8604a929e0636517f634b7dfb0ad (commit)
via e59d21d3f47cbee83a6dc389584bd7b17bec270c (commit)
via 051efbd3d843baa690b334e57fd09fad6a908bb9 (commit)
via 3c2fee34ad8f668f5cf9001d6b7d605965ec28bb (commit)
via c10e9e5f3398d40a3346c7d7c1f84bf50262b8ec (commit)
via e0903a2cff2df4e6169e95f7439c0fa361c60ea8 (commit)
via 94e52d9256cb17dddbc9c383d2ab90e713c25e3b (commit)
via 42b9e37cd53d63980d3fa4a238f9ff6adad9ccc4 (commit)
via becada3b24006cf39417335794cb46556d7aa605 (commit)
from 3e39ed33427a0c3000ed41b4826fcfa182934f71 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f675fb2c202516021b961b5aa2de4528ba9f0d1f
Merge: dfb995a d2f9e78
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 18:15:56 2016 -0500
8460: Merge branch 'master' into 8460-websocket-go
commit dfb995ab9ea0f1d8808c812870db717164ac95f4
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 14:09:15 2016 -0500
8460: Return recent events if last_log_id given.
diff --git a/services/ws/event.go b/services/ws/event.go
index 09c9d0f..77acf44 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -17,6 +17,7 @@ type eventSink interface {
type eventSource interface {
NewSink() eventSink
+ DB() *sql.DB
}
type event struct {
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 59d690f..1470c66 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -21,7 +21,7 @@ type handler struct {
Client arvados.Client
PingTimeout time.Duration
QueueSize int
- NewSession func(wsConn, arvados.Client) (session, error)
+ NewSession func(wsConn) (session, error)
}
type handlerStats struct {
@@ -32,7 +32,7 @@ type handlerStats struct {
}
func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
- sess, err := h.NewSession(ws, h.Client)
+ sess, err := h.NewSession(ws)
if err != nil {
log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
return
@@ -72,6 +72,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
return
}
for _, buf := range sess.Receive(msg, buf[:n]) {
+ sess.debugLogf("handler: to queue: %s", string(buf))
queue <- buf
}
}
@@ -81,6 +82,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
for e := range queue {
if buf, ok := e.([]byte); ok {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ sess.debugLogf("handler: send msg: %s", string(buf))
_, err := ws.Write(buf)
if err != nil {
sess.debugLogf("handler: write {}: %s", err)
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 090e5ff..1dc06b8 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
pc.tidy()
now := time.Now()
if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
- debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
+ debugLogf("perm (cached): %+q %+q ...%v", pc.Client.AuthToken, uuid, perm.allowed)
return perm.allowed, nil
}
var buf map[string]interface{}
@@ -64,10 +64,10 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
} else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
allowed = false
} else {
- errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
+ errorLogf("perm err: %+q %+q: %T %s", pc.Client.AuthToken, uuid, err, err)
return false, err
}
- debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
+ debugLogf("perm: %+q %+q ...%v", pc.Client.AuthToken, uuid, allowed)
pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
return allowed, nil
}
diff --git a/services/ws/pg.go b/services/ws/pg.go
index a5af9f7..08fbee1 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -152,6 +152,11 @@ func (ps *pgEventSource) NewSink() eventSink {
return sink
}
+func (ps *pgEventSource) DB() *sql.DB {
+ ps.setupOnce.Do(ps.setup)
+ return ps.db
+}
+
type pgEventSink struct {
channel chan *event
source *pgEventSource
diff --git a/services/ws/router.go b/services/ws/router.go
index ba8b46b..2a4e52e 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,6 +1,7 @@
package main
import (
+ "database/sql"
"encoding/json"
"log"
"net/http"
@@ -25,12 +26,13 @@ func (rtr *router) setup() {
rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(NewSessionV1))
}
-func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session, error)) *websocket.Server {
+func (rtr *router) makeServer(newSession func(wsConn, arvados.Client, *sql.DB) (session, error)) *websocket.Server {
handler := &handler{
- Client: rtr.Config.Client,
PingTimeout: rtr.Config.PingTimeout.Duration(),
QueueSize: rtr.Config.ClientEventQueue,
- NewSession: newSession,
+ NewSession: func(ws wsConn) (session, error) {
+ return newSession(ws, rtr.Config.Client, rtr.eventSource.DB())
+ },
}
return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 5622304..33cdb2f 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -1,11 +1,10 @@
package main
import (
+ "database/sql"
"encoding/json"
"errors"
- "fmt"
"log"
- "net/url"
"sync"
"time"
@@ -24,15 +23,17 @@ var (
type v0session struct {
ws wsConn
+ db *sql.DB
permChecker permChecker
subscriptions []v0subscribe
mtx sync.Mutex
setupOnce sync.Once
}
-func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV0(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
sess := &v0session{
ws: ws,
+ db: db,
permChecker: NewPermChecker(ac),
}
@@ -67,42 +68,7 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
- resp := [][]byte{v0subscribeOK}
-
- if sub.LastLogID > 0 {
- // Hack 1: use the permission checker's
- // Arvados client to retrieve old log IDs. Use
- // "created < 2 hours ago" to ensure the
- // database query doesn't get too expensive
- // when our client gives us last_log_id==1.
- ac := sess.permChecker.(*cachingPermChecker).Client
- var old arvados.LogList
- ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
- "limit": {"1000"},
- "filters": {fmt.Sprintf(
- `[["id",">",%d],["created_at",">","%s"]]`,
- sub.LastLogID,
- time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
- })
- for _, log := range old.Items {
- // Hack 2: populate the event's logRow
- // using the API response -- otherwise
- // Detail() would crash because e.db
- // is nil.
- e := &event{
- LogID: log.ID,
- Received: time.Now(),
- logRow: &log,
- }
- msg, err := sess.EventMessage(e)
- if err != nil {
- continue
- }
- resp = append(resp, msg)
- }
- }
-
- return [][]byte{v0subscribeOK}
+ return append([][]byte{v0subscribeOK}, sub.getOldEvents(sess)...)
}
return [][]byte{v0subscribeFail}
}
@@ -159,6 +125,58 @@ func (sess *v0session) Filter(e *event) bool {
return false
}
+func (sub *v0subscribe) getOldEvents(sess *v0session) (msgs [][]byte) {
+ if sub.LastLogID == 0 {
+ return
+ }
+ debugLogf("getOldEvents(%d)", sub.LastLogID)
+ // Here we do a "select id" query and queue an event for every
+ // log since the given ID, then use (*event)Detail() to
+ // retrieve the whole row and decide whether to send it. This
+ // approach is very inefficient if the subscriber asks for
+ // last_log_id==1, even if the filters end up matching very
+ // few events.
+ //
+ // To mitigate this, filter on "created > 10 minutes ago" when
+ // retrieving the list of old event IDs to consider.
+ rows, err := sess.db.Query(
+ `SELECT id FROM logs WHERE id > $1 AND created_at > $2 ORDER BY id`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-10*time.Minute).Format(time.RFC3339Nano))
+ if err != nil {
+ errorLogf("db.Query: %s", err)
+ return
+ }
+ for rows.Next() {
+ var id uint64
+ err := rows.Scan(&id)
+ if err != nil {
+ errorLogf("Scan: %s", err)
+ continue
+ }
+ e := &event{
+ LogID: id,
+ Received: time.Now(),
+ db: sess.db,
+ }
+ if !sub.match(e) {
+ debugLogf("skip old event %+v", e)
+ continue
+ }
+ msg, err := sess.EventMessage(e)
+ if err != nil {
+ debugLogf("event marshal: %s", err)
+ continue
+ }
+ debugLogf("old event: %s", string(msg))
+ msgs = append(msgs, msg)
+ }
+ if err := rows.Err(); err != nil {
+ errorLogf("db.Query: %s", err)
+ }
+ return
+}
+
type v0subscribe struct {
Method string
Filters []v0filter
@@ -172,15 +190,16 @@ type v0filter [3]interface{}
func (sub *v0subscribe) match(e *event) bool {
detail := e.Detail()
if detail == nil {
+ debugLogf("match(%d): failed on no detail", e.LogID)
return false
}
- debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
for i, f := range sub.funcs {
if !f(e) {
- debugLogf("sub.match: failed on func %d", i)
+ debugLogf("match(%d): failed on func %d", e.LogID, i)
return false
}
}
+ debugLogf("match(%d): passed %d funcs", e.LogID, len(sub.funcs))
return true
}
diff --git a/services/ws/session_v1.go b/services/ws/session_v1.go
index bc09ed0..60d12c4 100644
--- a/services/ws/session_v1.go
+++ b/services/ws/session_v1.go
@@ -1,11 +1,12 @@
package main
import (
+ "database/sql"
"errors"
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
-func NewSessionV1(ws wsConn, ac arvados.Client) (session, error) {
+func NewSessionV1(ws wsConn, ac arvados.Client, db *sql.DB) (session, error) {
return nil, errors.New("Not implemented")
}
commit 567269bca35ce82045643f86d38992f45d75f435
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 14:08:27 2016 -0500
8460: Pass tests even if websocket server sends events that do not match our filters.
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index 7cc35f9..7ce4dc9 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -51,21 +51,22 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
self.assertEqual(200, events.get(True, 5)['status'])
human = arvados.api('v1').humans().create(body={}).execute()
- log_object_uuids = []
- for i in range(0, expected):
- log_object_uuids.append(events.get(True, 5)['object_uuid'])
-
+ want_uuids = []
if expected > 0:
- self.assertIn(human['uuid'], log_object_uuids)
-
+ want_uuids.append(human['uuid'])
if expected > 1:
- self.assertIn(ancestor['uuid'], log_object_uuids)
+ want_uuids.append(ancestor['uuid'])
+ log_object_uuids = []
+ while set(want_uuids) - set(log_object_uuids):
+ log_object_uuids.append(events.get(True, 5)['object_uuid'])
- with self.assertRaises(Queue.Empty):
- # assertEqual just serves to show us what unexpected thing
- # comes out of the queue when the assertRaises fails; when
- # the test passes, this assertEqual doesn't get called.
- self.assertEqual(events.get(True, 2), None)
+ if expected < 2:
+ with self.assertRaises(Queue.Empty):
+ # assertEqual just serves to show us what unexpected
+ # thing comes out of the queue when the assertRaises
+ # fails; when the test passes, this assertEqual
+ # doesn't get called.
+ self.assertEqual(events.get(True, 2), None)
def test_subscribe_websocket(self):
self._test_subscribe(
commit 8e37378b2955346c2b4a3c1e38fcdfb2e74b7e07
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 01:45:41 2016 -0500
8460: Retrieve recent logs and send old matching events if last_log_id given.
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
index caea04c..ef56e85 100644
--- a/sdk/go/arvados/log.go
+++ b/sdk/go/arvados/log.go
@@ -14,3 +14,11 @@ type Log struct {
Properties map[string]interface{} `json:"properties"`
CreatedAt *time.Time `json:"created_at,omitempty"`
}
+
+// LogList is an arvados#logList resource.
+type LogList struct {
+ Items []Log `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index c888a84..5622304 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -3,7 +3,9 @@ package main
import (
"encoding/json"
"errors"
+ "fmt"
"log"
+ "net/url"
"sync"
"time"
@@ -64,6 +66,42 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
+
+ resp := [][]byte{v0subscribeOK}
+
+ if sub.LastLogID > 0 {
+ // Hack 1: use the permission checker's
+ // Arvados client to retrieve old log IDs. Use
+ // "created < 2 hours ago" to ensure the
+ // database query doesn't get too expensive
+ // when our client gives us last_log_id==1.
+ ac := sess.permChecker.(*cachingPermChecker).Client
+ var old arvados.LogList
+ ac.RequestAndDecode(&old, "GET", "arvados/v1/logs", nil, url.Values{
+ "limit": {"1000"},
+ "filters": {fmt.Sprintf(
+ `[["id",">",%d],["created_at",">","%s"]]`,
+ sub.LastLogID,
+ time.Now().UTC().Add(-2*time.Hour).Format(time.RFC3339Nano))},
+ })
+ for _, log := range old.Items {
+ // Hack 2: populate the event's logRow
+ // using the API response -- otherwise
+ // Detail() would crash because e.db
+ // is nil.
+ e := &event{
+ LogID: log.ID,
+ Received: time.Now(),
+ logRow: &log,
+ }
+ msg, err := sess.EventMessage(e)
+ if err != nil {
+ continue
+ }
+ resp = append(resp, msg)
+ }
+ }
+
return [][]byte{v0subscribeOK}
}
return [][]byte{v0subscribeFail}
@@ -122,9 +160,11 @@ func (sess *v0session) Filter(e *event) bool {
}
type v0subscribe struct {
- Method string
- Filters []v0filter
- funcs []func(*event) bool
+ Method string
+ Filters []v0filter
+ LastLogID int64 `json:"last_log_id"`
+
+ funcs []func(*event) bool
}
type v0filter [3]interface{}
commit bc4dd2d01cda24a71f95b62ae1fa72aa9fb1226d
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 01:11:36 2016 -0500
8460: Allow session Receive handler to queue multiple messages.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 721c36f..59d690f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -71,9 +71,8 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
stop <- err
return
}
- e := sess.Receive(msg, buf[:n])
- if e != nil {
- queue <- e
+ for _, buf := range sess.Receive(msg, buf[:n]) {
+ queue <- buf
}
}
}()
diff --git a/services/ws/session.go b/services/ws/session.go
index 9111c6c..a0658d9 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -2,9 +2,9 @@ package main
type session interface {
// Receive processes a message received from the client. If
- // the returned response is non-nil, it will be queued and
- // sent the client.
- Receive(map[string]interface{}, []byte) []byte
+ // the returned list of messages is non-nil, they will be
+ // queued for sending to the client.
+ Receive(map[string]interface{}, []byte) [][]byte
// Filter returns true if the event should be queued for
// sending to the client. It should return as fast as
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 2035acb..c888a84 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -15,6 +15,9 @@ var (
errFrameTooBig = errors.New("frame too big")
sendObjectAttributes = []string{"state", "name"}
+
+ v0subscribeOK = []byte(`{"status":200}`)
+ v0subscribeFail = []byte(`{"status":400}`)
)
type v0session struct {
@@ -48,7 +51,7 @@ func (sess *v0session) debugLogf(s string, args ...interface{}) {
debugLogf("%s "+s, args...)
}
-func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) [][]byte {
sess.debugLogf("received message: %+v", msg)
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
@@ -61,9 +64,9 @@ func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
sess.mtx.Lock()
sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
- return []byte(`{"status":200}`)
+ return [][]byte{v0subscribeOK}
}
- return []byte(`{"status":400}`)
+ return [][]byte{v0subscribeFail}
}
func (sess *v0session) EventMessage(e *event) ([]byte, error) {
commit 5ffb79040668114c58bf35c3e18a8302b8d94445
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Nov 16 00:56:46 2016 -0500
8460: Support created_at filters.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 399de7e..2c28817 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -294,6 +294,8 @@ stop_services() {
if [[ -n "$ARVADOS_TEST_API_HOST" ]]; then
unset ARVADOS_TEST_API_HOST
cd "$WORKSPACE" \
+ && python sdk/python/tests/run_test_server.py stop_nginx \
+ && python sdk/python/tests/run_test_server.py stop_ws \
&& python sdk/python/tests/run_test_server.py stop
fi
}
diff --git a/sdk/python/tests/test_events.py b/sdk/python/tests/test_events.py
index f2cdba2..7cc35f9 100644
--- a/sdk/python/tests/test_events.py
+++ b/sdk/python/tests/test_events.py
@@ -143,8 +143,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
return time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(t)) + self.isotz(-time.timezone/60)
def isotz(self, offset):
- """Convert minutes-east-of-UTC to ISO8601 time zone designator"""
- return '{:+03d}{:02d}'.format(offset/60, offset%60)
+ """Convert minutes-east-of-UTC to RFC3339- and ISO-compatible time zone designator"""
+ return '{:+03d}:{:02d}'.format(offset/60, offset%60)
# Test websocket reconnection on (un)execpted close
def _test_websocket_reconnect(self, close_unexpected):
diff --git a/services/ws/permission.go b/services/ws/permission.go
index 3f16a89..090e5ff 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -46,7 +46,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
pc.tidy()
now := time.Now()
if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
- debugLogf("perm (cached): %+q %+q => %s", pc.Client.AuthToken, uuid, perm.allowed)
+ debugLogf("perm (cached): %+q %+q => %v", pc.Client.AuthToken, uuid, perm.allowed)
return perm.allowed, nil
}
var buf map[string]interface{}
@@ -67,7 +67,7 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
return false, err
}
- debugLogf("perm: %+q %+q => %s", pc.Client.AuthToken, uuid, allowed)
+ debugLogf("perm: %+q %+q => %v", pc.Client.AuthToken, uuid, allowed)
pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
return allowed, nil
}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3a9cc31..2035acb 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -5,6 +5,7 @@ import (
"errors"
"log"
"sync"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
@@ -16,28 +17,18 @@ var (
sendObjectAttributes = []string{"state", "name"}
)
-type sessionV0 struct {
- ws wsConn
- permChecker permChecker
- subscribed map[string]bool
- eventTypes map[string]bool
- mtx sync.Mutex
- setupOnce sync.Once
+type v0session struct {
+ ws wsConn
+ permChecker permChecker
+ subscriptions []v0subscribe
+ mtx sync.Mutex
+ setupOnce sync.Once
}
-type v0subscribe struct {
- Method string
- Filters []v0filter
-}
-
-type v0filter []interface{}
-
func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
- sess := &sessionV0{
+ sess := &v0session{
ws: ws,
permChecker: NewPermChecker(ac),
- subscribed: make(map[string]bool),
- eventTypes: make(map[string]bool),
}
err := ws.Request().ParseForm()
@@ -52,56 +43,12 @@ func NewSessionV0(ws wsConn, ac arvados.Client) (session, error) {
return sess, nil
}
-func (sess *sessionV0) debugLogf(s string, args ...interface{}) {
+func (sess *v0session) debugLogf(s string, args ...interface{}) {
args = append([]interface{}{sess.ws.Request().RemoteAddr}, args...)
debugLogf("%s "+s, args...)
}
-// If every client subscription message includes filters consisting
-// only of [["event_type","in",...]] then send only the requested
-// event types. Otherwise, clear sess.eventTypes and send all event
-// types from now on.
-func (sess *sessionV0) checkFilters(filters []v0filter) {
- if sess.eventTypes == nil {
- // Already received a subscription request without
- // event_type filters.
- return
- }
- eventTypes := sess.eventTypes
- sess.eventTypes = nil
- if len(filters) == 0 {
- return
- }
- useFilters := false
- for _, f := range filters {
- col, ok := f[0].(string)
- if !ok || col != "event_type" {
- continue
- }
- op, ok := f[1].(string)
- if !ok || op != "in" {
- return
- }
- arr, ok := f[2].([]interface{})
- if !ok {
- return
- }
- useFilters = true
- for _, s := range arr {
- if s, ok := s.(string); ok {
- eventTypes[s] = true
- } else {
- return
- }
- }
- }
- if useFilters {
- sess.debugLogf("eventTypes %+v", eventTypes)
- sess.eventTypes = eventTypes
- }
-}
-
-func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
+func (sess *v0session) Receive(msg map[string]interface{}, buf []byte) []byte {
sess.debugLogf("received message: %+v", msg)
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
@@ -109,17 +56,17 @@ func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
return nil
}
if sub.Method == "subscribe" {
- sess.debugLogf("subscribing to *")
+ sub.prepare()
+ sess.debugLogf("subscription: %v", sub)
sess.mtx.Lock()
- sess.checkFilters(sub.Filters)
- sess.subscribed["*"] = true
+ sess.subscriptions = append(sess.subscriptions, sub)
sess.mtx.Unlock()
return []byte(`{"status":200}`)
}
return []byte(`{"status":400}`)
}
-func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
+func (sess *v0session) EventMessage(e *event) ([]byte, error) {
detail := e.Detail()
if detail == nil {
return nil, nil
@@ -160,24 +107,110 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
return json.Marshal(msg)
}
-func (sess *sessionV0) Filter(e *event) bool {
- detail := e.Detail()
+func (sess *v0session) Filter(e *event) bool {
sess.mtx.Lock()
defer sess.mtx.Unlock()
- switch {
- case sess.eventTypes != nil && detail == nil:
- return false
- case sess.eventTypes != nil && !sess.eventTypes[detail.EventType]:
- return false
- case sess.subscribed["*"]:
- return true
- case detail == nil:
- return false
- case sess.subscribed[detail.ObjectUUID]:
- return true
- case sess.subscribed[detail.ObjectOwnerUUID]:
- return true
- default:
+ for _, sub := range sess.subscriptions {
+ if sub.match(e) {
+ return true
+ }
+ }
+ return false
+}
+
+type v0subscribe struct {
+ Method string
+ Filters []v0filter
+ funcs []func(*event) bool
+}
+
+type v0filter [3]interface{}
+
+func (sub *v0subscribe) match(e *event) bool {
+ detail := e.Detail()
+ if detail == nil {
return false
}
+ debugLogf("sub.match: len(funcs)==%d", len(sub.funcs))
+ for i, f := range sub.funcs {
+ if !f(e) {
+ debugLogf("sub.match: failed on func %d", i)
+ return false
+ }
+ }
+ return true
+}
+
+func (sub *v0subscribe) prepare() {
+ for _, f := range sub.Filters {
+ if len(f) != 3 {
+ continue
+ }
+ if col, ok := f[0].(string); ok && col == "event_type" {
+ op, ok := f[1].(string)
+ if !ok || op != "in" {
+ continue
+ }
+ arr, ok := f[2].([]interface{})
+ if !ok {
+ continue
+ }
+ var strs []string
+ for _, s := range arr {
+ if s, ok := s.(string); ok {
+ strs = append(strs, s)
+ }
+ }
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("event_type func: %v in %v", e.Detail().EventType, strs)
+ for _, s := range strs {
+ if s == e.Detail().EventType {
+ return true
+ }
+ }
+ return false
+ })
+ } else if ok && col == "created_at" {
+ op, ok := f[1].(string)
+ if !ok {
+ continue
+ }
+ tstr, ok := f[2].(string)
+ if !ok {
+ continue
+ }
+ t, err := time.Parse(time.RFC3339Nano, tstr)
+ if err != nil {
+ debugLogf("time.Parse(%q): %s", tstr, err)
+ continue
+ }
+ switch op {
+ case ">=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v >= %v", e.Detail().CreatedAt, t)
+ return !e.Detail().CreatedAt.Before(t)
+ })
+ case "<=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v <= %v", e.Detail().CreatedAt, t)
+ return !e.Detail().CreatedAt.After(t)
+ })
+ case ">":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v > %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.After(t)
+ })
+ case "<":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v < %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.Before(t)
+ })
+ case "=":
+ sub.funcs = append(sub.funcs, func(e *event) bool {
+ debugLogf("created_at func: %v = %v", e.Detail().CreatedAt, t)
+ return e.Detail().CreatedAt.Equal(t)
+ })
+ }
+ }
+ }
}
commit d6b6b39bfe67926490506125c88f3567e45e7dcc
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 15 23:26:57 2016 -0500
8460: Send {"status":200} messages. Bring up ws server for Python SDK tests.
diff --git a/build/run-tests.sh b/build/run-tests.sh
index c771bdc..399de7e 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -265,6 +265,8 @@ start_api() {
&& eval $(python sdk/python/tests/run_test_server.py start --auth admin) \
&& export ARVADOS_TEST_API_HOST="$ARVADOS_API_HOST" \
&& export ARVADOS_TEST_API_INSTALLED="$$" \
+ && python sdk/python/tests/run_test_server.py start_ws \
+ && python sdk/python/tests/run_test_server.py start_nginx \
&& (env | egrep ^ARVADOS)
}
@@ -273,8 +275,8 @@ start_nginx_proxy_services() {
cd "$WORKSPACE" \
&& python sdk/python/tests/run_test_server.py start_keep_proxy \
&& python sdk/python/tests/run_test_server.py start_keep-web \
- && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_arv-git-httpd \
+ && python sdk/python/tests/run_test_server.py start_ws \
&& python sdk/python/tests/run_test_server.py start_nginx \
&& export ARVADOS_TEST_PROXY_SERVICES=1
}
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 28b121f..721c36f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -38,7 +38,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
return
}
- queue := make(chan *event, h.QueueSize)
+ queue := make(chan interface{}, h.QueueSize)
stopped := make(chan struct{})
stop := make(chan error, 5)
@@ -71,15 +71,18 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
stop <- err
return
}
- sess.Receive(msg, buf[:n])
+ e := sess.Receive(msg, buf[:n])
+ if e != nil {
+ queue <- e
+ }
}
}()
go func() {
for e := range queue {
- if e == nil {
+ if buf, ok := e.([]byte); ok {
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
- _, err := ws.Write([]byte("{}"))
+ _, err := ws.Write(buf)
if err != nil {
sess.debugLogf("handler: write {}: %s", err)
stop <- err
@@ -87,6 +90,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
}
continue
}
+ e := e.(*event)
buf, err := sess.EventMessage(e)
if err != nil {
@@ -148,7 +152,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
// socket, and prevent an idle socket
// from being closed.
if len(queue) == 0 {
- queue <- nil
+ queue <- []byte(`{}`)
}
continue
case e, ok = <-events:
diff --git a/services/ws/session.go b/services/ws/session.go
index 98164e3..9111c6c 100644
--- a/services/ws/session.go
+++ b/services/ws/session.go
@@ -1,8 +1,27 @@
package main
type session interface {
- Receive(map[string]interface{}, []byte)
- EventMessage(*event) ([]byte, error)
+ // Receive processes a message received from the client. If
+ // the returned response is non-nil, it will be queued and
+ // sent the client.
+ Receive(map[string]interface{}, []byte) []byte
+
+ // Filter returns true if the event should be queued for
+ // sending to the client. It should return as fast as
+ // possible, and must not block.
Filter(*event) bool
+
+ // EventMessage encodes the given event (from the front of the
+ // queue) into a form suitable to send to the client. If a
+ // non-nil error is returned, the connection is terminated. If
+ // the returned buffer is empty, nothing is sent to the client
+ // and the event is not counted in statistics.
+ //
+ // Unlike Filter, EventMessage can block without affecting
+ // other connections. If EventMessage is slow, additional
+ // incoming events will be queued. If the event queue fills
+ // up, the connection will be dropped.
+ EventMessage(*event) ([]byte, error)
+
debugLogf(string, ...interface{})
}
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 3894e30..3a9cc31 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -101,12 +101,12 @@ func (sess *sessionV0) checkFilters(filters []v0filter) {
}
}
-func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
+func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) []byte {
sess.debugLogf("received message: %+v", msg)
var sub v0subscribe
if err := json.Unmarshal(buf, &sub); err != nil {
sess.debugLogf("ignored unrecognized request: %s", err)
- return
+ return nil
}
if sub.Method == "subscribe" {
sess.debugLogf("subscribing to *")
@@ -114,7 +114,9 @@ func (sess *sessionV0) Receive(msg map[string]interface{}, buf []byte) {
sess.checkFilters(sub.Filters)
sess.subscribed["*"] = true
sess.mtx.Unlock()
+ return []byte(`{"status":200}`)
}
+ return []byte(`{"status":400}`)
}
func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
commit 7ae5533a074d80882171b33f7b659c9bcace1bd3
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 15 23:00:58 2016 -0500
8460: Log connection stats.
diff --git a/services/ws/handler.go b/services/ws/handler.go
index 1c9d5ba..28b121f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -24,7 +24,14 @@ type handler struct {
NewSession func(wsConn, arvados.Client) (session, error)
}
-func (h *handler) Handle(ws wsConn, events <-chan *event) {
+type handlerStats struct {
+ QueueDelay time.Duration
+ WriteDelay time.Duration
+ EventBytes uint64
+ EventCount uint64
+}
+
+func (h *handler) Handle(ws wsConn, events <-chan *event) (stats handlerStats) {
sess, err := h.NewSession(ws, h.Client)
if err != nil {
log.Printf("%s NewSession: %s", ws.Request().RemoteAddr, err)
@@ -93,6 +100,7 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
sess.debugLogf("handler: send event %d: %q", e.Serial, buf)
ws.SetWriteDeadline(time.Now().Add(h.PingTimeout))
+ t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
sess.debugLogf("handler: write: %s", err)
@@ -100,6 +108,10 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
break
}
sess.debugLogf("handler: sent event %d", e.Serial)
+ stats.WriteDelay += time.Since(t0)
+ stats.QueueDelay += t0.Sub(e.Received)
+ stats.EventBytes += uint64(len(buf))
+ stats.EventCount++
}
for _ = range queue {
}
@@ -153,4 +165,6 @@ func (h *handler) Handle(ws wsConn, events <-chan *event) {
<-stop
close(stopped)
+
+ return
}
diff --git a/services/ws/log.go b/services/ws/log.go
new file mode 100644
index 0000000..1511691
--- /dev/null
+++ b/services/ws/log.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+ "encoding/json"
+ "fmt"
+ "log"
+ "time"
+)
+
+func init() {
+ log.SetFlags(0)
+}
+
+func errorLogf(f string, args ...interface{}) {
+ log.Print(`{"error":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+var debugLogf = func(f string, args ...interface{}) {
+ log.Print(`{"debug":`, string(mustMarshal(fmt.Sprintf(f, args...))), `}`)
+}
+
+func mustMarshal(v interface{}) []byte {
+ buf, err := json.Marshal(v)
+ if err != nil {
+ panic(err)
+ }
+ return buf
+}
+
+func logj(args ...interface{}) {
+ m := map[string]interface{}{"Time": time.Now().UTC()}
+ for i := 0; i < len(args)-1; i += 2 {
+ m[fmt.Sprintf("%s", args[i])] = args[i+1]
+ }
+ buf, err := json.Marshal(m)
+ if err != nil {
+ errorLogf("logj: %s", err)
+ return
+ }
+ log.Print(string(buf))
+}
diff --git a/services/ws/main.go b/services/ws/main.go
index a143ae9..719128f 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -10,8 +10,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/config"
)
-var debugLogf = func(string, ...interface{}) {}
-
func main() {
configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
@@ -22,8 +20,8 @@ func main() {
if err != nil {
log.Fatal(err)
}
- if cfg.Debug {
- debugLogf = log.Printf
+ if !cfg.Debug {
+ debugLogf = func(string, ...interface{}) {}
}
if *dumpConfig {
diff --git a/services/ws/router.go b/services/ws/router.go
index 69654cb..ba8b46b 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -37,8 +37,18 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session,
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
+ logj("Type", "connect",
+ "RemoteAddr", ws.Request().RemoteAddr)
+ t0 := time.Now()
+
sink := rtr.eventSource.NewSink()
- handler.Handle(ws, sink.Channel())
+ stats := handler.Handle(ws, sink.Channel())
+
+ logj("Type", "disconnect",
+ "RemoteAddr", ws.Request().RemoteAddr,
+ "Elapsed", time.Now().Sub(t0).Seconds(),
+ "Stats", stats)
+
sink.Stop()
ws.Close()
}),
@@ -47,22 +57,10 @@ func (rtr *router) makeServer(newSession func(wsConn, arvados.Client) (session,
func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rtr.setupOnce.Do(rtr.setup)
- t0 := time.Now()
- reqLog(map[string]interface{}{
- "Connect": req.RemoteAddr,
- "RemoteAddr": req.RemoteAddr,
- "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
- "Time": t0.UTC(),
- })
+ logj("Type", "request",
+ "RemoteAddr", req.RemoteAddr,
+ "X-Forwarded-For", req.Header.Get("X-Forwarded-For"))
rtr.mux.ServeHTTP(resp, req)
- t1 := time.Now()
- reqLog(map[string]interface{}{
- "Disconnect": req.RemoteAddr,
- "RemoteAddr": req.RemoteAddr,
- "X-Forwarded-For": req.Header.Get("X-Forwarded-For"),
- "Time": t1.UTC(),
- "Elapsed": time.Now().Sub(t0).Seconds(),
- })
}
func reqLog(m map[string]interface{}) {
commit 0c6cbd8a07e31decd703ef7fd9eca591e5661c32
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 15 22:44:33 2016 -0500
8460: Cache negative permission.
diff --git a/services/ws/permission.go b/services/ws/permission.go
index b2b962c..3f16a89 100644
--- a/services/ws/permission.go
+++ b/services/ws/permission.go
@@ -22,14 +22,19 @@ func NewPermChecker(ac arvados.Client) permChecker {
ac.AuthToken = ""
return &cachingPermChecker{
Client: &ac,
- cache: make(map[string]time.Time),
+ cache: make(map[string]cacheEnt),
maxCurrent: 16,
}
}
+type cacheEnt struct {
+ time.Time
+ allowed bool
+}
+
type cachingPermChecker struct {
*arvados.Client
- cache map[string]time.Time
+ cache map[string]cacheEnt
maxCurrent int
}
@@ -39,9 +44,10 @@ func (pc *cachingPermChecker) SetToken(token string) {
func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
pc.tidy()
- if t, ok := pc.cache[uuid]; ok && time.Now().Sub(t) < maxPermCacheAge {
- debugLogf("perm ok (cached): %+q %+q", pc.Client.AuthToken, uuid)
- return true, nil
+ now := time.Now()
+ if perm, ok := pc.cache[uuid]; ok && now.Sub(perm.Time) < maxPermCacheAge {
+ debugLogf("perm (cached): %+q %+q => %s", pc.Client.AuthToken, uuid, perm.allowed)
+ return perm.allowed, nil
}
var buf map[string]interface{}
path, err := pc.PathForUUID("get", uuid)
@@ -51,17 +57,19 @@ func (pc *cachingPermChecker) Check(uuid string) (bool, error) {
err = pc.RequestAndDecode(&buf, "GET", path, nil, url.Values{
"select": {`["uuid"]`},
})
- if err, ok := err.(arvados.TransactionError); ok && err.StatusCode == http.StatusNotFound {
- debugLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
- return false, nil
- }
- if err != nil {
- debugLogf("perm !ok: %+q %+q", pc.Client.AuthToken, uuid)
+
+ var allowed bool
+ if err == nil {
+ allowed = true
+ } else if txErr, ok := err.(arvados.TransactionError); ok && txErr.StatusCode == http.StatusNotFound {
+ allowed = false
+ } else {
+ errorLogf("perm err: %+q %+q: %s", pc.Client.AuthToken, uuid, err)
return false, err
}
- debugLogf("perm ok: %+q %+q", pc.Client.AuthToken, uuid)
- pc.cache[uuid] = time.Now()
- return true, nil
+ debugLogf("perm: %+q %+q => %s", pc.Client.AuthToken, uuid, allowed)
+ pc.cache[uuid] = cacheEnt{Time: now, allowed: allowed}
+ return allowed, nil
}
func (pc *cachingPermChecker) tidy() {
commit f6f8feeada332a55d1a966e9f4a240d99dc58b55
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 15 16:30:01 2016 -0500
8460: Send selected old/new attributes with v0 events.
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 467d156..3894e30 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -12,6 +12,8 @@ import (
var (
errQueueFull = errors.New("client queue full")
errFrameTooBig = errors.New("frame too big")
+
+ sendObjectAttributes = []string{"state", "name"}
)
type sessionV0 struct {
@@ -136,6 +138,22 @@ func (sess *sessionV0) EventMessage(e *event) ([]byte, error) {
}
if detail.Properties != nil && detail.Properties["text"] != nil {
msg["properties"] = detail.Properties
+ } else {
+ msgProps := map[string]map[string]interface{}{}
+ for _, ak := range []string{"old_attributes", "new_attributes"} {
+ eventAttrs, ok := detail.Properties[ak].(map[string]interface{})
+ if !ok {
+ continue
+ }
+ msgAttrs := map[string]interface{}{}
+ for _, k := range sendObjectAttributes {
+ if v, ok := eventAttrs[k]; ok {
+ msgAttrs[k] = v
+ }
+ }
+ msgProps[ak] = msgAttrs
+ }
+ msg["properties"] = msgProps
}
return json.Marshal(msg)
}
commit 5a420beeb6c64efc3ca0ef13d4ab9ac6c654c3ab
Merge: 3e39ed3 3af6db5
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 15 13:25:03 2016 -0500
8460: Merge branch 'master' into 8460-websocket-go
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list