[ARVADOS] updated: 784371c83a910f286d0a9123eb9d099ad476931e

Git user git at public.curoverse.com
Tue Feb 28 01:10:09 EST 2017


Summary of changes:
 services/ws/event_source.go      |  18 ++++++
 services/ws/event_source_test.go |  29 +++------
 services/ws/event_test.go        |  27 +-------
 services/ws/session_v0_test.go   | 133 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 163 insertions(+), 44 deletions(-)
 create mode 100644 services/ws/session_v0_test.go

       via  784371c83a910f286d0a9123eb9d099ad476931e (commit)
      from  aea8596a74dabdc894fc1139c4d8dc195e6c86b2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 784371c83a910f286d0a9123eb9d099ad476931e
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Feb 28 01:09:21 2017 -0500

    10764: Test v0 session. (WIP)

diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index 622084c..ed1ac0d 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -43,6 +43,9 @@ type pgEventSource struct {
 	eventsOut  uint64
 
 	cancel func()
+
+	setupOnce sync.Once
+	ready     chan bool
 }
 
 var _ debugStatuser = (*pgEventSource)(nil)
@@ -63,12 +66,25 @@ func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
 	ps.cancel()
 }
 
+func (ps *pgEventSource) setup() {
+	ps.ready = make(chan bool)
+}
+
+// waitReady returns when private fields (cancel, db) are available
+// for tests to use.
+func (ps *pgEventSource) waitReady() {
+	ps.setupOnce.Do(ps.setup)
+	<-ps.ready
+}
+
 // Run listens for event notifications on the "logs" channel and sends
 // them to all subscribers.
 func (ps *pgEventSource) Run() {
 	logger(nil).Debug("pgEventSource Run starting")
 	defer logger(nil).Debug("pgEventSource Run finished")
 
+	ps.setupOnce.Do(ps.setup)
+
 	ctx, cancel := context.WithCancel(context.Background())
 	ps.cancel = cancel
 	defer cancel()
@@ -102,6 +118,8 @@ func (ps *pgEventSource) Run() {
 	defer ps.pqListener.Close()
 	logger(nil).Debug("pq Listen setup done")
 
+	close(ps.ready)
+
 	ps.queue = make(chan *event, ps.QueueSize)
 	defer close(ps.queue)
 
diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go
index ee1da08..675ac90 100644
--- a/services/ws/event_source_test.go
+++ b/services/ws/event_source_test.go
@@ -14,7 +14,7 @@ var _ = check.Suite(&eventSourceSuite{})
 
 type eventSourceSuite struct{}
 
-func testDBConfig() (pgConfig, error) {
+func testDBConfig() pgConfig {
 	var railsDB struct {
 		Test struct {
 			Database string
@@ -25,7 +25,7 @@ func testDBConfig() (pgConfig, error) {
 	}
 	err := config.LoadFile(&railsDB, "../api/config/database.yml")
 	if err != nil {
-		return nil, err
+		panic(err)
 	}
 	cfg := pgConfig{
 		"dbname":   railsDB.Test.Database,
@@ -33,22 +33,20 @@ func testDBConfig() (pgConfig, error) {
 		"password": railsDB.Test.Password,
 		"user":     railsDB.Test.Username,
 	}
-	return cfg, nil
+	return cfg
 }
 
-func testDB() (*sql.DB, error) {
-	cfg, err := testDBConfig()
+func testDB() *sql.DB {
+	db, err := sql.Open("postgres", testDBConfig().ConnectionString())
 	if err != nil {
-		return nil, err
+		panic(err)
 	}
-	return sql.Open("postgres", cfg.ConnectionString())
+	return db
 }
 
 func (*eventSourceSuite) TestEventSource(c *check.C) {
-	cfg, err := testDBConfig()
-	if err != nil {
-		c.Fatal(err)
-	}
+	cfg := testDBConfig()
+	db := testDB()
 	pges := &pgEventSource{
 		DataSource: cfg.ConnectionString(),
 		QueueSize:  4,
@@ -59,16 +57,9 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
 		sinks[i] = pges.NewSink()
 	}
 
-	// wait for listener to start, as evidenced by queue channel
-	// appearing (relying on internal implementation detail here)
-	for deadline := time.Now().Add(10 * time.Second); pges.queue == nil; time.Sleep(10 * time.Millisecond) {
-		c.Assert(time.Now().After(deadline), check.Equals, false)
-	}
+	pges.waitReady()
 	defer pges.cancel()
 
-	db, err := testDB()
-	c.Assert(err, check.IsNil)
-
 	done := make(chan bool, 1)
 
 	go func() {
diff --git a/services/ws/event_test.go b/services/ws/event_test.go
index eb67a34..f095372 100644
--- a/services/ws/event_test.go
+++ b/services/ws/event_test.go
@@ -1,38 +1,15 @@
 package main
 
-import (
-	"database/sql"
-
-	"git.curoverse.com/arvados.git/sdk/go/config"
-	check "gopkg.in/check.v1"
-)
+import check "gopkg.in/check.v1"
 
 var _ = check.Suite(&eventSuite{})
 
 type eventSuite struct{}
 
 func (*eventSuite) TestDetail(c *check.C) {
-	var railsDB struct {
-		Test struct {
-			Database string
-			Username string
-			Password string
-			Host     string
-		}
-	}
-	err := config.LoadFile(&railsDB, "../api/config/database.yml")
-	c.Assert(err, check.IsNil)
-	cfg := pgConfig{
-		"dbname":   railsDB.Test.Database,
-		"host":     railsDB.Test.Host,
-		"password": railsDB.Test.Password,
-		"user":     railsDB.Test.Username,
-	}
-	db, err := sql.Open("postgres", cfg.ConnectionString())
-	c.Assert(err, check.IsNil)
 	e := &event{
 		LogID: 17,
-		db:    db,
+		db:    testDB(),
 	}
 	logRow := e.Detail()
 	c.Assert(logRow, check.NotNil)
diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go
new file mode 100644
index 0000000..6fca38a
--- /dev/null
+++ b/services/ws/session_v0_test.go
@@ -0,0 +1,133 @@
+package main
+
+import (
+	"encoding/json"
+	"net"
+	"net/http"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"golang.org/x/net/websocket"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&v0Suite{})
+
+type v0Suite struct{}
+
+func (s *v0Suite) TestSubscribe(c *check.C) {
+	conn, r, w := s.testClient()
+	defer conn.Close()
+
+	var ack struct {
+		Status int
+	}
+	err := w.Encode(map[string]interface{}{"21": 12})
+	c.Check(err, check.IsNil)
+	c.Check(r.Decode(&ack), check.IsNil)
+	c.Check(ack.Status >= 400, check.Equals, true)
+
+	err = w.Encode(map[string]interface{}{"method": "subscribe", "filters": []string{}})
+	c.Check(err, check.IsNil)
+	c.Check(r.Decode(&ack), check.IsNil)
+	c.Check(ack.Status, check.Equals, 200)
+
+	c.Fatal("TODO: receive some event messages")
+}
+
+func (s *v0Suite) TestFilters(c *check.C) {
+	conn, r, w := s.testClient()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method":  "subscribe",
+		"filters": [][]interface{}{{"event_type", "in", []string{"update"}}},
+	}), check.IsNil)
+	msg := map[string]interface{}{}
+	c.Check(r.Decode(&msg), check.IsNil)
+	c.Check(int(msg["status"].(float64)), check.Equals, 200)
+
+	c.Fatal("TODO")
+}
+
+func (s *v0Suite) TestPermission(c *check.C) {
+	conn, r, w := s.testClient()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method": "subscribe",
+	}), check.IsNil)
+	msg := map[string]interface{}{}
+	c.Check(r.Decode(&msg), check.IsNil)
+	c.Check(int(msg["status"].(float64)), check.Equals, 200)
+
+	c.Fatal("TODO")
+}
+
+func (s *v0Suite) TestLastLogID(c *check.C) {
+	conn, r, w := s.testClient()
+	defer conn.Close()
+
+	c.Check(w.Encode(map[string]interface{}{
+		"method":      "subscribe",
+		"last_log_id": 12,
+	}), check.IsNil)
+	msg := map[string]interface{}{}
+	c.Check(r.Decode(&msg), check.IsNil)
+	c.Check(int(msg["status"].(float64)), check.Equals, 200)
+
+	c.Fatal("TODO")
+}
+
+func (s *v0Suite) testClient() (*websocket.Conn, *json.Decoder, *json.Encoder) {
+	srv := newTestServer()
+	conn, err := websocket.Dial("ws://"+srv.addr+"/websocket", "", "http://"+srv.addr)
+	if err != nil {
+		panic(err)
+	}
+	w := json.NewEncoder(conn)
+	r := json.NewDecoder(conn)
+	return conn, r, w
+}
+
+type testServer struct {
+	http.Server
+	addr string
+	stop chan bool
+}
+
+func newTestServer() *testServer {
+	ln, err := net.Listen("tcp", ":")
+	if err != nil {
+		panic(err)
+	}
+	cfg := defaultConfig()
+	cfg.Client = *(arvados.NewClientFromEnv())
+	es := &pgEventSource{
+		DataSource: testDBConfig().ConnectionString(),
+		QueueSize:  4,
+	}
+	srv := &testServer{
+		Server: http.Server{
+			Addr:         ":",
+			ReadTimeout:  10 * time.Second,
+			WriteTimeout: 10 * time.Second,
+			Handler: &router{
+				Config:         &cfg,
+				eventSource:    es,
+				newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
+			},
+		},
+		addr: ln.Addr().String(),
+		stop: make(chan bool),
+	}
+	go es.Run()
+	go srv.Serve(ln)
+	go func() {
+		<-srv.stop
+		ln.Close()
+		es.cancel()
+	}()
+	es.waitReady()
+	return srv
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list