[ARVADOS] updated: a58e9ca25aaa0963545f256985eb75b2f840e80f

Git user git at public.curoverse.com
Wed Mar 1 11:04:28 EST 2017


Summary of changes:
 services/ws/event_source.go      | 28 ++++++++++++----
 services/ws/event_source_test.go |  2 +-
 services/ws/main.go              | 32 ++----------------
 services/ws/server.go            | 71 ++++++++++++++++++++++++++++++++++++++++
 services/ws/server_test.go       | 61 ++++++++++++++++++++++++++++++++++
 services/ws/session_v0_test.go   | 50 ++--------------------------
 6 files changed, 158 insertions(+), 86 deletions(-)
 create mode 100644 services/ws/server.go
 create mode 100644 services/ws/server_test.go

       via  a58e9ca25aaa0963545f256985eb75b2f840e80f (commit)
      from  f36162457a771824059fefa098a3ffb89c59263f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a58e9ca25aaa0963545f256985eb75b2f840e80f
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Mar 1 10:58:16 2017 -0500

    10764: De-duplicate real/test server startup. Add test for broken config.

diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ed1ac0d..fe1876c 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -70,9 +70,14 @@ func (ps *pgEventSource) setup() {
 	ps.ready = make(chan bool)
 }
 
-// waitReady returns when private fields (cancel, db) are available
-// for tests to use.
-func (ps *pgEventSource) waitReady() {
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+	ps.WaitReady()
+	ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
 	ps.setupOnce.Do(ps.setup)
 	<-ps.ready
 }
@@ -84,6 +89,12 @@ func (ps *pgEventSource) Run() {
 	defer logger(nil).Debug("pgEventSource Run finished")
 
 	ps.setupOnce.Do(ps.setup)
+	ready := ps.ready
+	defer func() {
+		if ready != nil {
+			close(ready)
+		}
+	}()
 
 	ctx, cancel := context.WithCancel(context.Background())
 	ps.cancel = cancel
@@ -101,11 +112,11 @@ func (ps *pgEventSource) Run() {
 
 	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
-		logger(nil).WithError(err).Fatal("sql.Open failed")
+		logger(nil).WithError(err).Error("sql.Open failed")
 		return
 	}
 	if err = db.Ping(); err != nil {
-		logger(nil).WithError(err).Fatal("db.Ping failed")
+		logger(nil).WithError(err).Error("db.Ping failed")
 		return
 	}
 	ps.db = db
@@ -113,12 +124,15 @@ func (ps *pgEventSource) Run() {
 	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
 	err = ps.pqListener.Listen("logs")
 	if err != nil {
-		logger(nil).WithError(err).Fatal("pq Listen failed")
+		logger(nil).WithError(err).Error("pq Listen failed")
+		return
 	}
 	defer ps.pqListener.Close()
 	logger(nil).Debug("pq Listen setup done")
 
-	close(ps.ready)
+	close(ready)
+	// Avoid double-close in deferred func
+	ready = nil
 
 	ps.queue = make(chan *event, ps.QueueSize)
 	defer close(ps.queue)
diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go
index 675ac90..b157cfa 100644
--- a/services/ws/event_source_test.go
+++ b/services/ws/event_source_test.go
@@ -57,7 +57,7 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
 		sinks[i] = pges.NewSink()
 	}
 
-	pges.waitReady()
+	pges.WaitReady()
 	defer pges.cancel()
 
 	done := make(chan bool, 1)
diff --git a/services/ws/main.go b/services/ws/main.go
index 9eee813..b2b8670 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -3,12 +3,9 @@ package main
 import (
 	"flag"
 	"fmt"
-	"net/http"
-	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
-	"github.com/coreos/go-systemd/daemon"
 )
 
 var logger = ctxlog.FromContext
@@ -39,31 +36,6 @@ func main() {
 	}
 
 	log.Info("started")
-	eventSource := &pgEventSource{
-		DataSource: cfg.Postgres.ConnectionString(),
-		QueueSize:  cfg.ServerEventQueue,
-	}
-	srv := &http.Server{
-		Addr:           cfg.Listen,
-		ReadTimeout:    time.Minute,
-		WriteTimeout:   time.Minute,
-		MaxHeaderBytes: 1 << 20,
-		Handler: &router{
-			Config:         &cfg,
-			eventSource:    eventSource,
-			newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
-		},
-	}
-
-	go func() {
-		eventSource.Run()
-		log.Fatal("event source stopped")
-	}()
-
-	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
-		log.WithError(err).Warn("error notifying init daemon")
-	}
-
-	log.WithField("Listen", srv.Addr).Info("listening")
-	log.Fatal(srv.ListenAndServe())
+	srv := &server{wsConfig: &cfg}
+	log.Fatal(srv.Run())
 }
diff --git a/services/ws/server.go b/services/ws/server.go
new file mode 100644
index 0000000..8870ca1
--- /dev/null
+++ b/services/ws/server.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+	"net"
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/coreos/go-systemd/daemon"
+)
+
+type server struct {
+	httpServer  *http.Server
+	listener    net.Listener
+	wsConfig    *wsConfig
+	eventSource *pgEventSource
+	setupOnce   sync.Once
+}
+
+func (srv *server) Close() {
+	srv.WaitReady()
+	srv.eventSource.Close()
+	srv.listener.Close()
+}
+
+func (srv *server) WaitReady() {
+	srv.setupOnce.Do(srv.setup)
+	srv.eventSource.WaitReady()
+}
+
+func (srv *server) Run() error {
+	srv.setupOnce.Do(srv.setup)
+	return srv.httpServer.Serve(srv.listener)
+}
+
+func (srv *server) setup() {
+	log := logger(nil)
+
+	ln, err := net.Listen("tcp", srv.wsConfig.Listen)
+	if err != nil {
+		log.WithField("Listen", srv.wsConfig.Listen).Fatal(err)
+	}
+	log.WithField("Listen", ln.Addr().String()).Info("listening")
+
+	srv.listener = ln
+	srv.eventSource = &pgEventSource{
+		DataSource: srv.wsConfig.Postgres.ConnectionString(),
+		QueueSize:  srv.wsConfig.ServerEventQueue,
+	}
+	srv.httpServer = &http.Server{
+		Addr:           srv.wsConfig.Listen,
+		ReadTimeout:    time.Minute,
+		WriteTimeout:   time.Minute,
+		MaxHeaderBytes: 1 << 20,
+		Handler: &router{
+			Config:         srv.wsConfig,
+			eventSource:    srv.eventSource,
+			newPermChecker: func() permChecker { return newPermChecker(srv.wsConfig.Client) },
+		},
+	}
+
+	go func() {
+		srv.eventSource.Run()
+		log.Info("event source stopped")
+		srv.Close()
+	}()
+
+	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+		log.WithError(err).Warn("error notifying init daemon")
+	}
+}
diff --git a/services/ws/server_test.go b/services/ws/server_test.go
new file mode 100644
index 0000000..d74f7df
--- /dev/null
+++ b/services/ws/server_test.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&serverSuite{})
+
+type serverSuite struct {
+}
+
+func testConfig() *wsConfig {
+	cfg := defaultConfig()
+	cfg.Client = *(arvados.NewClientFromEnv())
+	cfg.Postgres = testDBConfig()
+	cfg.Listen = ":"
+	return &cfg
+}
+
+// TestBadDB ensures Run() returns an error (instead of panicking or
+// deadlocking) if it can't connect to the database server at startup.
+func (s *serverSuite) TestBadDB(c *check.C) {
+	cfg := testConfig()
+	cfg.Postgres["password"] = "1234"
+	srv := &server{wsConfig: cfg}
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go func() {
+		err := srv.Run()
+		c.Check(err, check.NotNil)
+		wg.Done()
+	}()
+	wg.Add(1)
+	go func() {
+		srv.WaitReady()
+		wg.Done()
+	}()
+
+	done := make(chan bool)
+	go func() {
+		wg.Wait()
+		close(done)
+	}()
+	select {
+	case <-done:
+	case <-time.After(10 * time.Second):
+		c.Fatal("timeout")
+	}
+}
+
+func newTestServer() *server {
+	srv := &server{wsConfig: testConfig()}
+	go srv.Run()
+	srv.WaitReady()
+	return srv
+}
diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go
index d8a2b69..85e3656 100644
--- a/services/ws/session_v0_test.go
+++ b/services/ws/session_v0_test.go
@@ -5,8 +5,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
-	"net"
-	"net/http"
 	"net/url"
 	"os"
 	"time"
@@ -228,9 +226,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
 	return lg
 }
 
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
 	srv := newTestServer()
-	conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+	conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
 	if err != nil {
 		panic(err)
 	}
@@ -238,47 +236,3 @@ func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *js
 	r := json.NewDecoder(conn)
 	return srv, conn, r, w
 }
-
-type testServer struct {
-	http.Server
-	addr string
-	ln   net.Listener
-	pges *pgEventSource
-}
-
-func (srv *testServer) Close() {
-	srv.ln.Close()
-	srv.pges.cancel()
-}
-
-func newTestServer() *testServer {
-	ln, err := net.Listen("tcp", ":")
-	if err != nil {
-		panic(err)
-	}
-	cfg := defaultConfig()
-	cfg.Client = *(arvados.NewClientFromEnv())
-	pges := &pgEventSource{
-		DataSource: testDBConfig().ConnectionString(),
-		QueueSize:  4,
-	}
-	srv := &testServer{
-		Server: http.Server{
-			Addr:         ":",
-			ReadTimeout:  10 * time.Second,
-			WriteTimeout: 10 * time.Second,
-			Handler: &router{
-				Config:         &cfg,
-				eventSource:    pges,
-				newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
-			},
-		},
-		addr: ln.Addr().String(),
-		ln:   ln,
-		pges: pges,
-	}
-	go pges.Run()
-	go srv.Serve(ln)
-	pges.waitReady()
-	return srv
-}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list