[ARVADOS] updated: b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be
Git user
git at public.curoverse.com
Wed Mar 1 02:37:04 EST 2017
Summary of changes:
sdk/go/arvados/client.go | 1 +
sdk/go/arvados/collection.go | 4 +-
sdk/go/arvados/log.go | 12 +--
sdk/go/arvados/workflow.go | 22 ++++
services/ws/session_v0_test.go | 225 ++++++++++++++++++++++++++++++++++-------
5 files changed, 221 insertions(+), 43 deletions(-)
create mode 100644 sdk/go/arvados/workflow.go
discards 784371c83a910f286d0a9123eb9d099ad476931e (commit)
via b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (784371c83a910f286d0a9123eb9d099ad476931e)
\
N -- N -- N (b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Mar 1 02:36:18 2017 -0500
10764: Test v0 session.
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 8319b5d..9691e7a 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -181,6 +181,7 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
if err != nil {
return err
}
+ req.Header.Set("Content-type", "application/x-www-form-urlencoded")
return c.DoAndDecode(dst, req)
}
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index df7c224..bdd8e6e 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -3,9 +3,10 @@ package arvados
import (
"bufio"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
)
// Collection is an arvados#collection resource.
@@ -14,6 +15,7 @@ type Collection struct {
TrashAt *time.Time `json:"trash_at,omitempty"`
ManifestText string `json:"manifest_text,omitempty"`
UnsignedManifestText string `json:"unsigned_manifest_text,omitempty"`
+ Name string `json:"name,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
ModifiedAt *time.Time `json:"modified_at,omitempty"`
PortableDataHash string `json:"portable_data_hash,omitempty"`
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
index a48f1c6..5adc528 100644
--- a/sdk/go/arvados/log.go
+++ b/sdk/go/arvados/log.go
@@ -6,13 +6,13 @@ import (
// Log is an arvados#log record
type Log struct {
- ID uint64 `json:"id"`
- UUID string `json:"uuid"`
- ObjectUUID string `json:"object_uuid"`
- ObjectOwnerUUID string `json:"object_owner_uuid"`
- EventType string `json:"event_type"`
+ ID uint64 `json:"id,omitempty"`
+ UUID string `json:"uuid,omitempty"`
+ ObjectUUID string `json:"object_uuid,omitempty"`
+ ObjectOwnerUUID string `json:"object_owner_uuid,omitempty"`
+ EventType string `json:"event_type,omitempty"`
EventAt *time.Time `json:"event,omitempty"`
- Properties map[string]interface{} `json:"properties"`
+ Properties map[string]interface{} `json:"properties,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
}
diff --git a/sdk/go/arvados/workflow.go b/sdk/go/arvados/workflow.go
new file mode 100644
index 0000000..42a851e
--- /dev/null
+++ b/sdk/go/arvados/workflow.go
@@ -0,0 +1,22 @@
+package arvados
+
+import "time"
+
+// Workflow is an arvados#workflow resource.
+type Workflow struct {
+ UUID string `json:"uuid,omitempty"`
+ OwnerUUID string `json:"owner_uuid,omitempty"`
+ Name string `json:"name,omitempty"`
+ Description string `json:"description,omitempty"`
+ Definition string `json:"definition,omitempty"`
+ CreatedAt *time.Time `json:"created_at,omitempty"`
+ ModifiedAt *time.Time `json:"modified_at,omitempty"`
+}
+
+// WorkflowList is an arvados#workflowList resource.
+type WorkflowList struct {
+ Items []Workflow `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Offset int `json:"offset"`
+ Limit int `json:"limit"`
+}
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index 622084c..ed1ac0d 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -43,6 +43,9 @@ type pgEventSource struct {
eventsOut uint64
cancel func()
+
+ setupOnce sync.Once
+ ready chan bool
}
var _ debugStatuser = (*pgEventSource)(nil)
@@ -63,12 +66,25 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
ps.cancel()
}
+func (ps *pgEventSource) setup() {
+ ps.ready = make(chan bool)
+}
+
+// waitReady returns when private fields (cancel, db) are available
+// for tests to use.
+func (ps *pgEventSource) waitReady() {
+ ps.setupOnce.Do(ps.setup)
+ <-ps.ready
+}
+
// Run listens for event notifications on the "logs" channel and sends
// them to all subscribers.
func (ps *pgEventSource) Run() {
logger(nil).Debug("pgEventSource Run starting")
defer logger(nil).Debug("pgEventSource Run finished")
+ ps.setupOnce.Do(ps.setup)
+
ctx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
defer cancel()
@@ -102,6 +118,8 @@ func (ps *pgEventSource) Run() {
defer ps.pqListener.Close()
logger(nil).Debug("pq Listen setup done")
+ close(ps.ready)
+
ps.queue = make(chan *event, ps.QueueSize)
defer close(ps.queue)
diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go
index ee1da08..675ac90 100644
--- a/services/ws/event_source_test.go
+++ b/services/ws/event_source_test.go
@@ -14,7 +14,7 @@ var _ = check.Suite(&eventSourceSuite{})
type eventSourceSuite struct{}
-func testDBConfig() (pgConfig, error) {
+func testDBConfig() pgConfig {
var railsDB struct {
Test struct {
Database string
@@ -25,7 +25,7 @@ func testDBConfig() (pgConfig, error) {
}
err := config.LoadFile(&railsDB, "../api/config/database.yml")
if err != nil {
- return nil, err
+ panic(err)
}
cfg := pgConfig{
"dbname": railsDB.Test.Database,
@@ -33,22 +33,20 @@ func testDBConfig() (pgConfig, error) {
"password": railsDB.Test.Password,
"user": railsDB.Test.Username,
}
- return cfg, nil
+ return cfg
}
-func testDB() (*sql.DB, error) {
- cfg, err := testDBConfig()
+func testDB() *sql.DB {
+ db, err := sql.Open("postgres", testDBConfig().ConnectionString())
if err != nil {
- return nil, err
+ panic(err)
}
- return sql.Open("postgres", cfg.ConnectionString())
+ return db
}
func (*eventSourceSuite) TestEventSource(c *check.C) {
- cfg, err := testDBConfig()
- if err != nil {
- c.Fatal(err)
- }
+ cfg := testDBConfig()
+ db := testDB()
pges := &pgEventSource{
DataSource: cfg.ConnectionString(),
QueueSize: 4,
@@ -59,16 +57,9 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
sinks[i] = pges.NewSink()
}
- // wait for listener to start, as evidenced by queue channel
- // appearing (relying on internal implementation detail here)
- for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
- c.Assert(time.Now().After(deadline), check.Equals, false)
- }
+ pges.waitReady()
defer pges.cancel()
- db, err := testDB()
- c.Assert(err, check.IsNil)
-
done := make(chan bool, 1)
go func() {
diff --git a/services/ws/event_test.go b/services/ws/event_test.go
index eb67a34..f095372 100644
--- a/services/ws/event_test.go
+++ b/services/ws/event_test.go
@@ -1,38 +1,15 @@
package main
-import (
- "database/sql"
-
- "git.curoverse.com/arvados.git/sdk/go/config"
- check "gopkg.in/check.v1"
-)
+import check "gopkg.in/check.v1"
var _ = check.Suite(&eventSuite{})
type eventSuite struct{}
func (*eventSuite) TestDetail(c *check.C) {
- var railsDB struct {
- Test struct {
- Database string
- Username string
- Password string
- Host string
- }
- }
- err := config.LoadFile(&railsDB, "../api/config/database.yml")
- c.Assert(err, check.IsNil)
- cfg := pgConfig{
- "dbname": railsDB.Test.Database,
- "host": railsDB.Test.Host,
- "password": railsDB.Test.Password,
- "user": railsDB.Test.Username,
- }
- db, err := sql.Open("postgres", cfg.ConnectionString())
- c.Assert(err, check.IsNil)
e := &event{
LogID: 17,
- db: db,
+ db: testDB(),
}
logRow := e.Detail()
c.Assert(logRow, check.NotNil)
diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go
new file mode 100644
index 0000000..f4dc23f
--- /dev/null
+++ b/services/ws/session_v0_test.go
@@ -0,0 +1,286 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "os"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "golang.org/x/net/websocket"
+ check "gopkg.in/check.v1"
+)
+
+func init() {
+ if os.Getenv("ARVADOS_DEBUG") != "" {
+ ctxlog.SetLevel("debug")
+ }
+}
+
+var _ = check.Suite(&v0Suite{})
+
+type v0Suite struct {
+ token string
+ toDelete []string
+}
+
+func (s *v0Suite) SetUpTest(c *check.C) {
+ s.token = arvadostest.ActiveToken
+}
+
+func (s *v0Suite) TearDownSuite(c *check.C) {
+ ac := arvados.NewClientFromEnv()
+ ac.AuthToken = arvadostest.AdminToken
+ for _, path := range s.toDelete {
+ err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
+ if err != nil {
+ panic(err)
+ }
+ }
+}
+
+func (s *v0Suite) TestFilters(c *check.C) {
+ srv, conn, r, w := s.testClient()
+ defer srv.Close()
+ defer conn.Close()
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ go s.emitEvents(nil)
+ lg := s.expectLog(c, r)
+ c.Check(lg.EventType, check.Equals, "update")
+}
+
+func (s *v0Suite) TestLastLogID(c *check.C) {
+ var lastID uint64
+ c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+
+ srv, conn, r, w := s.testClient()
+ defer srv.Close()
+ defer conn.Close()
+
+ uuidChan := make(chan string, 2)
+ s.emitEvents(uuidChan)
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ "last_log_id": lastID,
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ go func() {
+ s.emitEvents(uuidChan)
+ close(uuidChan)
+ }()
+
+ done := make(chan bool)
+ go func() {
+ for uuid := range uuidChan {
+ for _, etype := range []string{"create", "blip", "update"} {
+ lg := s.expectLog(c, r)
+ c.Check(lg.ObjectUUID, check.Equals, uuid)
+ c.Check(lg.EventType, check.Equals, etype)
+ }
+ }
+ close(done)
+ }()
+
+ select {
+ case <-time.After(10 * time.Second):
+ c.Fatal("timeout")
+ case <-done:
+ }
+}
+
+func (s *v0Suite) TestPermission(c *check.C) {
+ srv, conn, r, w := s.testClient()
+ defer srv.Close()
+ defer conn.Close()
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ uuidChan := make(chan string, 1)
+ go func() {
+ s.token = arvadostest.AdminToken
+ s.emitEvents(nil)
+ s.token = arvadostest.ActiveToken
+ s.emitEvents(uuidChan)
+ }()
+
+ lg := s.expectLog(c, r)
+ c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
+}
+
+func (s *v0Suite) TestSendBadJSON(c *check.C) {
+ srv, conn, r, w := s.testClient()
+ defer srv.Close()
+ defer conn.Close()
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ _, err := fmt.Fprint(conn, "^]beep\n")
+ c.Check(err, check.IsNil)
+ s.expectStatus(c, r, 400)
+
+ c.Check(w.Encode(map[string]interface{}{
+ "method": "subscribe",
+ }), check.IsNil)
+ s.expectStatus(c, r, 200)
+}
+
+func (s *v0Suite) TestSubscribe(c *check.C) {
+ srv, conn, r, w := s.testClient()
+ defer srv.Close()
+ defer conn.Close()
+
+ s.emitEvents(nil)
+
+ err := w.Encode(map[string]interface{}{"21": 12})
+ c.Check(err, check.IsNil)
+ s.expectStatus(c, r, 400)
+
+ err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+ c.Check(err, check.IsNil)
+ s.expectStatus(c, r, 200)
+
+ uuidChan := make(chan string, 1)
+ go s.emitEvents(uuidChan)
+ uuid := <-uuidChan
+
+ for _, etype := range []string{"create", "blip", "update"} {
+ lg := s.expectLog(c, r)
+ c.Check(lg.ObjectUUID, check.Equals, uuid)
+ c.Check(lg.EventType, check.Equals, etype)
+ }
+}
+
+// Generate some events by creating and updating a workflow object,
+// and creating a custom log entry (event_type="blip") about the newly
+// created workflow. If uuidChan is not nil, send the new workflow
+// UUID to uuidChan as soon as it's known.
+func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+ ac := arvados.NewClientFromEnv()
+ ac.AuthToken = s.token
+ wf := &arvados.Workflow{
+ Name: "ws_test",
+ }
+ err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
+ if err != nil {
+ panic(err)
+ }
+ if uuidChan != nil {
+ uuidChan <- wf.UUID
+ }
+ lg := &arvados.Log{}
+ err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
+ ObjectUUID: wf.UUID,
+ EventType: "blip",
+ Properties: map[string]interface{}{
+ "beep": "boop",
+ },
+ }), nil)
+ if err != nil {
+ panic(err)
+ }
+ err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
+ if err != nil {
+ panic(err)
+ }
+ s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
+}
+
+func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
+ j, err := json.Marshal(ob)
+ if err != nil {
+ panic(err)
+ }
+ v := url.Values{}
+ v[rscName] = []string{string(j)}
+ return bytes.NewBufferString(v.Encode())
+}
+
+func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
+ msg := map[string]interface{}{}
+ c.Check(r.Decode(&msg), check.IsNil)
+ c.Check(int(msg["status"].(float64)), check.Equals, status)
+}
+
+func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
+ lg := &arvados.Log{}
+ c.Check(r.Decode(lg), check.IsNil)
+ return lg
+}
+
+func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+ srv := newTestServer()
+ conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+ if err != nil {
+ panic(err)
+ }
+ w := json.NewEncoder(conn)
+ r := json.NewDecoder(conn)
+ return srv, conn, r, w
+}
+
+type testServer struct {
+ http.Server
+ addr string
+ stop chan bool
+}
+
+func (srv *testServer) Close() {
+ close(srv.stop)
+}
+
+func newTestServer() *testServer {
+ ln, err := net.Listen("tcp", ":")
+ if err != nil {
+ panic(err)
+ }
+ cfg := defaultConfig()
+ cfg.Client = *(arvados.NewClientFromEnv())
+ es := &pgEventSource{
+ DataSource: testDBConfig().ConnectionString(),
+ QueueSize: 4,
+ }
+ srv := &testServer{
+ Server: http.Server{
+ Addr: ":",
+ ReadTimeout: 10 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ Handler: &router{
+ Config: &cfg,
+ eventSource: es,
+ newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+ },
+ },
+ addr: ln.Addr().String(),
+ stop: make(chan bool),
+ }
+ go es.Run()
+ go srv.Serve(ln)
+ go func() {
+ <-srv.stop
+ ln.Close()
+ es.cancel()
+ }()
+ es.waitReady()
+ return srv
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list