[ARVADOS] updated: 0fc6eaead0bf7691e99d19e74ec33636909001a7
Git user
git at public.curoverse.com
Thu Feb 9 04:21:37 EST 2017
Summary of changes:
services/ws/event_source.go | 87 ++++++++++++++++++++++++---------------------
services/ws/main.go | 8 +++--
2 files changed, 52 insertions(+), 43 deletions(-)
discards 2565719b84461df57ec411f7973b88419f0e6c2c (commit)
via 0fc6eaead0bf7691e99d19e74ec33636909001a7 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (2565719b84461df57ec411f7973b88419f0e6c2c)
\
N -- N -- N (0fc6eaead0bf7691e99d19e74ec33636909001a7)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0fc6eaead0bf7691e99d19e74ec33636909001a7
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Feb 9 04:04:57 2017 -0500
11070: Add some logs, ensure shutdown after a pq connection problem.
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..4de1d55 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -1,6 +1,7 @@
package main
import (
+ "context"
"database/sql"
"strconv"
"strings"
@@ -35,51 +36,73 @@ type pgEventSource struct {
pqListener *pq.Listener
queue chan *event
sinks map[*pgEventSink]bool
- setupOnce sync.Once
mtx sync.Mutex
- shutdown chan error
lastQDelay time.Duration
eventsIn uint64
eventsOut uint64
+
+ cancel func()
}
var _ debugStatuser = (*pgEventSource)(nil)
-func (ps *pgEventSource) setup() {
- ps.shutdown = make(chan error, 1)
+func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
+ if et == pq.ListenerEventConnected {
+ logger(nil).Debug("pgEventSource connected")
+ return
+ }
+
+ // Until we have a mechanism for catching up on missed events,
+ // we cannot recover from a dropped connection without
+ // breaking our promises to clients.
+ logger(nil).
+ WithField("eventType", et).
+ WithError(err).
+ Error("listener problem")
+ ps.cancel()
+}
+
+func (ps *pgEventSource) Run() {
+ logger(nil).Debug("pgEventSource Run starting")
+ defer logger(nil).Debug("pgEventSource Run finished")
+
+ ctx, cancel := context.WithCancel(context.Background())
+ ps.cancel = cancel
+ defer cancel()
+
ps.sinks = make(map[*pgEventSink]bool)
+ defer func() {
+ // Disconnect all clients
+ ps.mtx.Lock()
+ for sink := range ps.sinks {
+ close(sink.channel)
+ }
+ ps.sinks = nil
+ ps.mtx.Unlock()
+ }()
db, err := sql.Open("postgres", ps.DataSource)
if err != nil {
logger(nil).WithError(err).Fatal("sql.Open failed")
+ return
}
if err = db.Ping(); err != nil {
logger(nil).WithError(err).Fatal("db.Ping failed")
+ return
}
ps.db = db
- ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
- if err != nil {
- // Until we have a mechanism for catching up
- // on missed events, we cannot recover from a
- // dropped connection without breaking our
- // promises to clients.
- logger(nil).WithError(err).Error("listener problem")
- ps.shutdown <- err
- }
- })
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
logger(nil).WithError(err).Fatal("pq Listen failed")
}
- logger(nil).Debug("pgEventSource listening")
-
- go ps.run()
-}
+ defer ps.pqListener.Close()
+ logger(nil).Debug("pq Listen setup done")
-func (ps *pgEventSource) run() {
ps.queue = make(chan *event, ps.QueueSize)
+ defer close(ps.queue)
go func() {
for e := range ps.queue {
@@ -111,11 +134,8 @@ func (ps *pgEventSource) run() {
defer ticker.Stop()
for {
select {
- case err, ok := <-ps.shutdown:
- if ok {
- logger(nil).WithError(err).Info("shutdown")
- }
- close(ps.queue)
+ case <-ctx.Done():
+ logger(nil).Debug("ctx done")
return
case <-ticker.C:
@@ -124,10 +144,19 @@ func (ps *pgEventSource) run() {
case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
- close(ps.queue)
+ logger(nil).Debug("pqListener Notify chan closed")
return
}
+ if pqEvent == nil {
+ // pq should call listenerProblem
+ // itself in addition to sending us a
+ // nil event, so this might be
+ // superfluous:
+ ps.listenerProblem(-1, nil)
+ continue
+ }
if pqEvent.Channel != "logs" {
+ logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
continue
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
@@ -158,7 +187,6 @@ func (ps *pgEventSource) run() {
// quickly as possible because when one sink stops being ready, all
// other sinks block.
func (ps *pgEventSource) NewSink() eventSink {
- ps.setupOnce.Do(ps.setup)
sink := &pgEventSink{
channel: make(chan *event, 1),
source: ps,
@@ -170,7 +198,6 @@ func (ps *pgEventSource) NewSink() eventSink {
}
func (ps *pgEventSource) DB() *sql.DB {
- ps.setupOnce.Do(ps.setup)
return ps.db
}
@@ -201,6 +228,7 @@ func (sink *pgEventSink) Channel() <-chan *event {
return sink.channel
}
+// Stop sending events to the sink's channel.
func (sink *pgEventSink) Stop() {
go func() {
// Ensure this sink cannot fill up and block the
@@ -210,7 +238,9 @@ func (sink *pgEventSink) Stop() {
}
}()
sink.source.mtx.Lock()
- delete(sink.source.sinks, sink)
+ if _, ok := sink.source.sinks[sink]; ok {
+ delete(sink.source.sinks, sink)
+ close(sink.channel)
+ }
sink.source.mtx.Unlock()
- close(sink.channel)
}
diff --git a/services/ws/main.go b/services/ws/main.go
index 7c3625b..9eee813 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -54,9 +54,11 @@ func main() {
newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
},
}
- // Bootstrap the eventSource by attaching a dummy subscriber
- // and hanging up.
- eventSource.NewSink().Stop()
+
+ go func() {
+ eventSource.Run()
+ log.Fatal("event source stopped")
+ }()
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.WithError(err).Warn("error notifying init daemon")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list