[ARVADOS] updated: a58e9ca25aaa0963545f256985eb75b2f840e80f
Git user
git at public.curoverse.com
Wed Mar 1 11:04:28 EST 2017
Summary of changes:
services/ws/event_source.go | 28 ++++++++++++----
services/ws/event_source_test.go | 2 +-
services/ws/main.go | 32 ++----------------
services/ws/server.go | 71 ++++++++++++++++++++++++++++++++++++++++
services/ws/server_test.go | 61 ++++++++++++++++++++++++++++++++++
services/ws/session_v0_test.go | 50 ++--------------------------
6 files changed, 158 insertions(+), 86 deletions(-)
create mode 100644 services/ws/server.go
create mode 100644 services/ws/server_test.go
via a58e9ca25aaa0963545f256985eb75b2f840e80f (commit)
from f36162457a771824059fefa098a3ffb89c59263f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a58e9ca25aaa0963545f256985eb75b2f840e80f
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Mar 1 10:58:16 2017 -0500
10764: De-duplicate real/test server startup. Add test for broken config.
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ed1ac0d..fe1876c 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -70,9 +70,14 @@ func (ps *pgEventSource) setup() {
ps.ready = make(chan bool)
}
-// waitReady returns when private fields (cancel, db) are available
-// for tests to use.
-func (ps *pgEventSource) waitReady() {
+// Close stops listening for new events and disconnects all clients.
+func (ps *pgEventSource) Close() {
+ ps.WaitReady()
+ ps.cancel()
+}
+
+// WaitReady returns when the event listener is connected.
+func (ps *pgEventSource) WaitReady() {
ps.setupOnce.Do(ps.setup)
<-ps.ready
}
@@ -84,6 +89,12 @@ func (ps *pgEventSource) Run() {
defer logger(nil).Debug("pgEventSource Run finished")
ps.setupOnce.Do(ps.setup)
+ ready := ps.ready
+ defer func() {
+ if ready != nil {
+ close(ready)
+ }
+ }()
ctx, cancel := context.WithCancel(context.Background())
ps.cancel = cancel
@@ -101,11 +112,11 @@ func (ps *pgEventSource) Run() {
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
- logger(nil).WithError(err).Fatal("sql.Open failed")
+ logger(nil).WithError(err).Error("sql.Open failed")
return
}
if err = db.Ping(); err != nil {
- logger(nil).WithError(err).Fatal("db.Ping failed")
+ logger(nil).WithError(err).Error("db.Ping failed")
return
}
ps.db = db
@@ -113,12 +124,15 @@ func (ps *pgEventSource) Run() {
ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
- logger(nil).WithError(err).Fatal("pq Listen failed")
+ logger(nil).WithError(err).Error("pq Listen failed")
+ return
}
defer ps.pqListener.Close()
logger(nil).Debug("pq Listen setup done")
- close(ps.ready)
+ close(ready)
+ // Avoid double-close in deferred func
+ ready = nil
ps.queue = make(chan *event, ps.QueueSize)
defer close(ps.queue)
diff --git a/services/ws/event_source_test.go b/services/ws/event_source_test.go
index 675ac90..b157cfa 100644
--- a/services/ws/event_source_test.go
+++ b/services/ws/event_source_test.go
@@ -57,7 +57,7 @@ func (*eventSourceSuite) TestEventSource(c *check.C) {
sinks[i] = pges.NewSink()
}
- pges.waitReady()
+ pges.WaitReady()
defer pges.cancel()
done := make(chan bool, 1)
diff --git a/services/ws/main.go b/services/ws/main.go
index 9eee813..b2b8670 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -3,12 +3,9 @@ package main
import (
"flag"
"fmt"
- "net/http"
- "time"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "github.com/coreos/go-systemd/daemon"
)
var logger = ctxlog.FromContext
@@ -39,31 +36,6 @@ func main() {
}
log.Info("started")
- eventSource := &pgEventSource{
- DataSource: cfg.Postgres.ConnectionString(),
- QueueSize: cfg.ServerEventQueue,
- }
- srv := &http.Server{
- Addr: cfg.Listen,
- ReadTimeout: time.Minute,
- WriteTimeout: time.Minute,
- MaxHeaderBytes: 1 << 20,
- Handler: &router{
- Config: &cfg,
- eventSource: eventSource,
- newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
- },
- }
-
- go func() {
- eventSource.Run()
- log.Fatal("event source stopped")
- }()
-
- if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.WithError(err).Warn("error notifying init daemon")
- }
-
- log.WithField("Listen", srv.Addr).Info("listening")
- log.Fatal(srv.ListenAndServe())
+ srv := &server{wsConfig: &cfg}
+ log.Fatal(srv.Run())
}
diff --git a/services/ws/server.go b/services/ws/server.go
new file mode 100644
index 0000000..8870ca1
--- /dev/null
+++ b/services/ws/server.go
@@ -0,0 +1,71 @@
+package main
+
+import (
+ "net"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/coreos/go-systemd/daemon"
+)
+
+type server struct {
+ httpServer *http.Server
+ listener net.Listener
+ wsConfig *wsConfig
+ eventSource *pgEventSource
+ setupOnce sync.Once
+}
+
+func (srv *server) Close() {
+ srv.WaitReady()
+ srv.eventSource.Close()
+ srv.listener.Close()
+}
+
+func (srv *server) WaitReady() {
+ srv.setupOnce.Do(srv.setup)
+ srv.eventSource.WaitReady()
+}
+
+func (srv *server) Run() error {
+ srv.setupOnce.Do(srv.setup)
+ return srv.httpServer.Serve(srv.listener)
+}
+
+func (srv *server) setup() {
+ log := logger(nil)
+
+ ln, err := net.Listen("tcp", srv.wsConfig.Listen)
+ if err != nil {
+ log.WithField("Listen", srv.wsConfig.Listen).Fatal(err)
+ }
+ log.WithField("Listen", ln.Addr().String()).Info("listening")
+
+ srv.listener = ln
+ srv.eventSource = &pgEventSource{
+ DataSource: srv.wsConfig.Postgres.ConnectionString(),
+ QueueSize: srv.wsConfig.ServerEventQueue,
+ }
+ srv.httpServer = &http.Server{
+ Addr: srv.wsConfig.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ Config: srv.wsConfig,
+ eventSource: srv.eventSource,
+ newPermChecker: func() permChecker { return newPermChecker(srv.wsConfig.Client) },
+ },
+ }
+
+ go func() {
+ srv.eventSource.Run()
+ log.Info("event source stopped")
+ srv.Close()
+ }()
+
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.WithError(err).Warn("error notifying init daemon")
+ }
+}
diff --git a/services/ws/server_test.go b/services/ws/server_test.go
new file mode 100644
index 0000000..d74f7df
--- /dev/null
+++ b/services/ws/server_test.go
@@ -0,0 +1,61 @@
+package main
+
+import (
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&serverSuite{})
+
+type serverSuite struct {
+}
+
+func testConfig() *wsConfig {
+ cfg := defaultConfig()
+ cfg.Client = *(arvados.NewClientFromEnv())
+ cfg.Postgres = testDBConfig()
+ cfg.Listen = ":"
+ return &cfg
+}
+
+// TestBadDB ensures Run() returns an error (instead of panicking or
+// deadlocking) if it can't connect to the database server at startup.
+func (s *serverSuite) TestBadDB(c *check.C) {
+ cfg := testConfig()
+ cfg.Postgres["password"] = "1234"
+ srv := &server{wsConfig: cfg}
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err := srv.Run()
+ c.Check(err, check.NotNil)
+ wg.Done()
+ }()
+ wg.Add(1)
+ go func() {
+ srv.WaitReady()
+ wg.Done()
+ }()
+
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Fatal("timeout")
+ }
+}
+
+func newTestServer() *server {
+ srv := &server{wsConfig: testConfig()}
+ go srv.Run()
+ srv.WaitReady()
+ return srv
+}
diff --git a/services/ws/session_v0_test.go b/services/ws/session_v0_test.go
index d8a2b69..85e3656 100644
--- a/services/ws/session_v0_test.go
+++ b/services/ws/session_v0_test.go
@@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
- "net"
- "net/http"
"net/url"
"os"
"time"
@@ -228,9 +226,9 @@ func (s *v0Suite) expectLog(c *check.C, r *json.Decoder) *arvados.Log {
return lg
}
-func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *json.Encoder) {
+func (s *v0Suite) testClient() (*server, *websocket.Conn, *json.Decoder, *json.Encoder) {
srv := newTestServer()
- conn, err := websocket.Dial("ws://"+srv.addr+"/websocket?api_token="+s.token, "", "http://"+srv.addr)
+ conn, err := websocket.Dial("ws://"+srv.listener.Addr().String()+"/websocket?api_token="+s.token, "", "http://"+srv.listener.Addr().String())
if err != nil {
panic(err)
}
@@ -238,47 +236,3 @@ func (s *v0Suite) testClient() (*testServer, *websocket.Conn, *json.Decoder, *js
r := json.NewDecoder(conn)
return srv, conn, r, w
}
-
-type testServer struct {
- http.Server
- addr string
- ln net.Listener
- pges *pgEventSource
-}
-
-func (srv *testServer) Close() {
- srv.ln.Close()
- srv.pges.cancel()
-}
-
-func newTestServer() *testServer {
- ln, err := net.Listen("tcp", ":")
- if err != nil {
- panic(err)
- }
- cfg := defaultConfig()
- cfg.Client = *(arvados.NewClientFromEnv())
- pges := &pgEventSource{
- DataSource: testDBConfig().ConnectionString(),
- QueueSize: 4,
- }
- srv := &testServer{
- Server: http.Server{
- Addr: ":",
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- Handler: &router{
- Config: &cfg,
- eventSource: pges,
- newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
- },
- },
- addr: ln.Addr().String(),
- ln: ln,
- pges: pges,
- }
- go pges.Run()
- go srv.Serve(ln)
- pges.waitReady()
- return srv
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list