[ARVADOS] updated: b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be

Git user git at public.curoverse.com
Wed Mar 1 02:37:04 EST 2017


Summary of changes:
 sdk/go/arvados/client.go       |   1 +
 sdk/go/arvados/collection.go   |   4 +-
 sdk/go/arvados/log.go          |  12 +--
 sdk/go/arvados/workflow.go     |  22 ++++
 services/ws/session_v0_test.go | 225 ++++++++++++++++++++++++++++++++++-------
 5 files changed, 221 insertions(+), 43 deletions(-)
 create mode 100644 sdk/go/arvados/workflow.go

  discards  784371c83a910f286d0a9123eb9d099ad476931e (commit)
       via  b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (784371c83a910f286d0a9123eb9d099ad476931e)
            \
             N -- N -- N (b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b3fa9983ac0b7b38a5b3787af56a7bb1502ae3be
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Mar 1 02:36:18 2017 -0500

    10764: Test v0 session.

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 8319b5d..9691e7a 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -181,6 +181,7 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
 	if err != nil {
 		return err
 	}
+	req.Header.Set("Content-type", "application/x-www-form-urlencoded")
 	return c.DoAndDecode(dst, req)
 }
 
diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go
index df7c224..bdd8e6e 100644
--- a/sdk/go/arvados/collection.go
+++ b/sdk/go/arvados/collection.go
@@ -3,9 +3,10 @@ package arvados
 import (
 	"bufio"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/blockdigest"
 	"strings"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/blockdigest"
 )
 
 // Collection is an arvados#collection resource.
@@ -14,6 +15,7 @@ type Collection struct {
 	TrashAt                *time.Time `json:"trash_at,omitempty"`
 	ManifestText           string     `json:"manifest_text,omitempty"`
 	UnsignedManifestText   string     `json:"unsigned_manifest_text,omitempty"`
+	Name                   string     `json:"name,omitempty"`
 	CreatedAt              *time.Time `json:"created_at,omitempty"`
 	ModifiedAt             *time.Time `json:"modified_at,omitempty"`
 	PortableDataHash       string     `json:"portable_data_hash,omitempty"`
diff --git a/sdk/go/arvados/log.go b/sdk/go/arvados/log.go
index a48f1c6..5adc528 100644
--- a/sdk/go/arvados/log.go
+++ b/sdk/go/arvados/log.go
@@ -6,13 +6,13 @@ import (
 
 // Log is an arvados#log record
 type Log struct {
-	ID              uint64                 `json:"id"`
-	UUID            string                 `json:"uuid"`
-	ObjectUUID      string                 `json:"object_uuid"`
-	ObjectOwnerUUID string                 `json:"object_owner_uuid"`
-	EventType       string                 `json:"event_type"`
+	ID              uint64                 `json:"id,omitempty"`
+	UUID            string                 `json:"uuid,omitempty"`
+	ObjectUUID      string                 `json:"object_uuid,omitempty"`
+	ObjectOwnerUUID string                 `json:"object_owner_uuid,omitempty"`
+	EventType       string                 `json:"event_type,omitempty"`
 	EventAt         *time.Time             `json:"event,omitempty"`
-	Properties      map[string]interface{} `json:"properties"`
+	Properties      map[string]interface{} `json:"properties,omitempty"`
 	CreatedAt       *time.Time             `json:"created_at,omitempty"`
 }
 
diff --git a/sdk/go/arvados/workflow.go b/sdk/go/arvados/workflow.go
new file mode 100644
index 0000000..42a851e
--- /dev/null
+++ b/sdk/go/arvados/workflow.go
@@ -0,0 +1,22 @@
+package arvados
+
+import "time"
+
+// Workflow is an arvados#workflow resource.
+type Workflow struct {
+	UUID        string     `json:"uuid,omitempty"`
+	OwnerUUID   string     `json:"owner_uuid,omitempty"`
+	Name        string     `json:"name,omitempty"`
+	Description string     `json:"description,omitempty"`
+	Definition  string     `json:"definition,omitempty"`
+	CreatedAt   *time.Time `json:"created_at,omitempty"`
+	ModifiedAt  *time.Time `json:"modified_at,omitempty"`
+}
+
+// WorkflowList is an arvados#workflowList resource.
+type WorkflowList struct {
+	Items          []Workflow `json:"items"`
+	ItemsAvailable int        `json:"items_available"`
+	Offset         int        `json:"offset"`
+	Limit          int        `json:"limit"`
+}
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index 622084c..ed1ac0d 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -43,6 +43,9 @@ type pgEventSource struct {
 	eventsOut  uint64
 
 	cancel func()
+
+	setupOnce sync.Once
+	ready     chan bool
 }
 
 var _ debugStatuser = (*pgEventSource)(nil)
@@ -63,12 +66,25 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
 	ps.cancel()
 }
 
+func (ps *pgEventSource) setup() {
+	ps.ready = make(chan bool)
+}
+
+// waitReady returns when private fields (cancel, db) are available
+// for tests to use.
+func (ps *pgEventSource) waitReady() {
+	ps.setupOnce.Do(ps.setup)
+	<-ps.ready
+}
+
 // Run listens for event notifications on the "logs" channel and sends
 // them to all subscribers.
 func (ps *pgEventSource) Run() {
 	logger(nil).Debug("pgEventSource Run starting")
 	defer logger(nil).Debug("pgEventSource Run finished")
 
+	ps.setupOnce.Do(ps.setup)
+
 	ctx, cancel := context.WithCancel(context.Background())
 	ps.cancel = cancel
 	defer cancel()
@@ -102,6 +118,8 @@ func (ps *pgEventSource) Run() {
 	defer ps.pqListener.Close()
 	logger(nil).Debug("pq Listen setup done")
 
+	close(ps.ready)
+
 	ps.queue = make(chan *event, ps.QueueSize)
 	defer close(ps.queue)
 
diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go
index ee1da08..675ac90 100644
--- a/services/ws/event_source_test.go
+++ b/services/ws/event_source_test.go
@@ -14,7 +14,7 @@ var _ = check.Suite(&eventSourceSuite{})
 
 type eventSourceSuite struct{}
 
-func testDBConfig() (pgConfig, error) {
+func testDBConfig() pgConfig {
 	var railsDB struct {
 		Test struct {
 			Database string
@@ -25,7 +25,7 @@ func testDBConfig() (pgConfig, error) {
 	}
 	err := config.LoadFile(&railsDB, "../api/config/database.yml")
 	if err != nil {
-		return nil, err
+		panic(err)
 	}
 	cfg := pgConfig{
 		"dbname":   railsDB.Test.Database,
@@ -33,22 +33,20 @@ func testDBConfig() (pgConfig, error) {
 		"password": railsDB.Test.Password,
 		"user":     railsDB.Test.Username,
 	}
-	return cfg, nil
+	return cfg
 }
 
-func testDB() (*sql.DB, error) {
-	cfg, err := testDBConfig()
+func testDB() *sql.DB {
+	db, err := sql.Open("postgres", testDBConfig().ConnectionString())
 	if err != nil {
-		return nil, err
+		panic(err)
 	}
-	return sql.Open("postgres", cfg.ConnectionString())
+	return db
 }
 
 func (*eventSourceSuite) TestEventSource(c *check.C) {
-	cfg, err := testDBConfig()
-	if err != nil {
-		c.Fatal(err)
-	}
+	cfg := testDBConfig()
+	db := testDB()
 	pges := &pgEventSource{
 		DataSource: cfg.ConnectionString(),
 		QueueSize:  4,
@@ -59,16 +57,9 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
 		sinks[i] = pges.NewSink()
 	}
 
-	// wait for listener to start, as evidenced by queue channel
-	// appearing (relying on internal implementation detail here)
-	for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
-		c.Assert(time.Now().After(deadline), check.Equals, false)
-	}
+	pges.waitReady()
 	defer pges.cancel()
 
-	db, err := testDB()
-	c.Assert(err, check.IsNil)
-
 	done := make(chan bool, 1)
 
 	go func() {
diff --git a/services/ws/event_test.go b/services/ws/event_test.go
index eb67a34..f095372 100644
--- a/services/ws/event_test.go
+++ b/services/ws/event_test.go
@@ -1,38 +1,15 @@
 package main
 
-import (
-	"database/sql"
-
-	"git.curoverse.com/arvados.git/sdk/go/config"
-	check "gopkg.in/check.v1"
-)
+import check "gopkg.in/check.v1"
 
 var _ = check.Suite(&eventSuite{})
 
 type eventSuite struct{}
 
 func (*eventSuite) TestDetail(c *check.C) {
-	var railsDB struct {
-		Test struct {
-			Database string
-			Username string
-			Password string
-			Host     string
-		}
-	}
-	err := config.LoadFile(&railsDB, "../api/config/database.yml")
-	c.Assert(err, check.IsNil)
-	cfg := pgConfig{
-		"dbname":   railsDB.Test.Database,
-		"host":     railsDB.Test.Host,
-		"password": railsDB.Test.Password,
-		"user":     railsDB.Test.Username,
-	}
-	db, err := sql.Open("postgres", cfg.ConnectionString())
-	c.Assert(err, check.IsNil)
 	e := &event{
 		LogID: 17,
-		db:    db,
+		db:    testDB(),
 	}
 	logRow := e.Detail()
 	c.Assert(logRow, check.NotNil)
diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go
new file mode 100644
index 0000000..f4dc23f
--- /dev/null
+++ b/services/ws/session_v0_test.go
@@ -0,0 +1,286 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+	"os"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
+	"golang.org/x/net/websocket"
+	check "gopkg.in/check.v1"
+)
+
+func init() {
+	if os.Getenv("ARVADOS_DEBUG") != "" {
+		ctxlog.SetLevel("debug")
+	}
+}
+
+var _ = check.Suite(&v0Suite{})
+
+type v0Suite struct {
+	token    string
+	toDelete []string
+}
+
+func (s *v0Suite) SetUpTest(c *check.C) {
+	s.token = arvadostest.ActiveToken
+}
+
+func (s *v0Suite) TearDownSuite(c *check.C) {
+	ac := arvados.NewClientFromEnv()
+	ac.AuthToken = arvadostest.AdminToken
+	for _, path := range s.toDelete {
+		err := ac.RequestAndDecode(nil, "DELETE", path, nil, nil)
+		if err != nil {
+			panic(err)
+		}
+	}
+}
+
+func (s *v0Suite) TestFilters(c *check.C) {
+	srv, conn, r, w := s.testClient()
+	defer srv.Close()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method":  "subscribe",
+		"filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
+	}), check.IsNil)
+	s.expectStatus(c, r, 200)
+
+	go s.emitEvents(nil)
+	lg := s.expectLog(c, r)
+	c.Check(lg.EventType, check.Equals, "update")
+}
+
+func (s *v0Suite) TestLastLogID(c *check.C) {
+	var lastID uint64
+	c.Assert(testDB().QueryRow(`SELECT MAX(id) FROM logs`).Scan(&lastID), check.IsNil)
+
+	srv, conn, r, w := s.testClient()
+	defer srv.Close()
+	defer conn.Close()
+
+	uuidChan := make(chan string, 2)
+	s.emitEvents(uuidChan)
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method":      "subscribe",
+		"last_log_id": lastID,
+	}), check.IsNil)
+	s.expectStatus(c, r, 200)
+
+	go func() {
+		s.emitEvents(uuidChan)
+		close(uuidChan)
+	}()
+
+	done := make(chan bool)
+	go func() {
+		for uuid := range uuidChan {
+			for _, etype := range []string{"create", "blip", "update"} {
+				lg := s.expectLog(c, r)
+				c.Check(lg.ObjectUUID, check.Equals, uuid)
+				c.Check(lg.EventType, check.Equals, etype)
+			}
+		}
+		close(done)
+	}()
+
+	select {
+	case <-time.After(10 * time.Second):
+		c.Fatal("timeout")
+	case <-done:
+	}
+}
+
+func (s *v0Suite) TestPermission(c *check.C) {
+	srv, conn, r, w := s.testClient()
+	defer srv.Close()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method": "subscribe",
+	}), check.IsNil)
+	s.expectStatus(c, r, 200)
+
+	uuidChan := make(chan string, 1)
+	go func() {
+		s.token = arvadostest.AdminToken
+		s.emitEvents(nil)
+		s.token = arvadostest.ActiveToken
+		s.emitEvents(uuidChan)
+	}()
+
+	lg := s.expectLog(c, r)
+	c.Check(lg.ObjectUUID, check.Equals, <-uuidChan)
+}
+
+func (s *v0Suite) TestSendBadJSON(c *check.C) {
+	srv, conn, r, w := s.testClient()
+	defer srv.Close()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method": "subscribe",
+	}), check.IsNil)
+	s.expectStatus(c, r, 200)
+
+	_, err := fmt.Fprint(conn, "^]beep\n")
+	c.Check(err, check.IsNil)
+	s.expectStatus(c, r, 400)
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method": "subscribe",
+	}), check.IsNil)
+	s.expectStatus(c, r, 200)
+}
+
+func (s *v0Suite) TestSubscribe(c *check.C) {
+	srv, conn, r, w := s.testClient()
+	defer srv.Close()
+	defer conn.Close()
+
+	s.emitEvents(nil)
+
+	err := w.Encode(map[string]interface{}{"21": 12})
+	c.Check(err, check.IsNil)
+	s.expectStatus(c, r, 400)
+
+	err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+	c.Check(err, check.IsNil)
+	s.expectStatus(c, r, 200)
+
+	uuidChan := make(chan string, 1)
+	go s.emitEvents(uuidChan)
+	uuid := <-uuidChan
+
+	for _, etype := range []string{"create", "blip", "update"} {
+		lg := s.expectLog(c, r)
+		c.Check(lg.ObjectUUID, check.Equals, uuid)
+		c.Check(lg.EventType, check.Equals, etype)
+	}
+}
+
+// Generate some events by creating and updating a workflow object,
+// and creating a custom log entry (event_type="blip") about the newly
+// created workflow. If uuidChan is not nil, send the new workflow
+// UUID to uuidChan as soon as it's known.
+func (s *v0Suite) emitEvents(uuidChan chan<- string) {
+	ac := arvados.NewClientFromEnv()
+	ac.AuthToken = s.token
+	wf := &arvados.Workflow{
+		Name: "ws_test",
+	}
+	err := ac.RequestAndDecode(wf, "POST", "arvados/v1/workflows", s.jsonBody("workflow", wf), map[string]interface{}{"ensure_unique_name": true})
+	if err != nil {
+		panic(err)
+	}
+	if uuidChan != nil {
+		uuidChan <- wf.UUID
+	}
+	lg := &arvados.Log{}
+	err = ac.RequestAndDecode(lg, "POST", "arvados/v1/logs", s.jsonBody("log", &arvados.Log{
+		ObjectUUID: wf.UUID,
+		EventType:  "blip",
+		Properties: map[string]interface{}{
+			"beep": "boop",
+		},
+	}), nil)
+	if err != nil {
+		panic(err)
+	}
+	err = ac.RequestAndDecode(wf, "PUT", "arvados/v1/workflows/"+wf.UUID, s.jsonBody("workflow", wf), nil)
+	if err != nil {
+		panic(err)
+	}
+	s.toDelete = append(s.toDelete, "arvados/v1/workflows/"+wf.UUID, "arvados/v1/logs/"+lg.UUID)
+}
+
+func (s *v0Suite) jsonBody(rscName string, ob interface{}) io.Reader {
+	j, err := json.Marshal(ob)
+	if err != nil {
+		panic(err)
+	}
+	v := url.Values{}
+	v[rscName] = []string{string(j)}
+	return bytes.NewBufferString(v.Encode())
+}
+
+func (s *v0Suite) expectStatus(c *check.C, r *json.Decoder, status int) {
+	msg := map[string]interface{}{}
+	c.Check(r.Decode(&msg), check.IsNil)
+	c.Check(int(msg["status"].(float64)), check.Equals, status)
+}
+
+func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
+	lg := &arvados.Log{}
+	c.Check(r.Decode(lg), check.IsNil)
+	return lg
+}
+
+func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+	srv := newTestServer()
+	conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+	if err != nil {
+		panic(err)
+	}
+	w := json.NewEncoder(conn)
+	r := json.NewDecoder(conn)
+	return srv, conn, r, w
+}
+
+type testServer struct {
+	http.Server
+	addr string
+	stop chan bool
+}
+
+func (srv *testServer) Close() {
+	close(srv.stop)
+}
+
+func newTestServer() *testServer {
+	ln, err := net.Listen("tcp", ":")
+	if err != nil {
+		panic(err)
+	}
+	cfg := defaultConfig()
+	cfg.Client = *(arvados.NewClientFromEnv())
+	es := &pgEventSource{
+		DataSource: testDBConfig().ConnectionString(),
+		QueueSize:  4,
+	}
+	srv := &testServer{
+		Server: http.Server{
+			Addr:         ":",
+			ReadTimeout:  10 * time.Second,
+			WriteTimeout: 10 * time.Second,
+			Handler: &router{
+				Config:         &cfg,
+				eventSource:    es,
+				newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+			},
+		},
+		addr: ln.Addr().String(),
+		stop: make(chan bool),
+	}
+	go es.Run()
+	go srv.Serve(ln)
+	go func() {
+		<-srv.stop
+		ln.Close()
+		es.cancel()
+	}()
+	es.waitReady()
+	return srv
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list