[ARVADOS] created: c99fd72a6db093ef0f9a80ed66f0ac0807c74933
Git user
git at public.curoverse.com
Wed Feb 8 15:56:53 EST 2017
at c99fd72a6db093ef0f9a80ed66f0ac0807c74933 (commit)
commit c99fd72a6db093ef0f9a80ed66f0ac0807c74933
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Feb 8 15:54:47 2017 -0500
11070: Add some logs, ensure shutdown after a pq connection problem.
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..d1150cc 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -59,16 +59,7 @@ func (ps *pgEventSource) setup() {
}
ps.db = db
- ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
- if err != nil {
- // Until we have a mechanism for catching up
- // on missed events, we cannot recover from a
- // dropped connection without breaking our
- // promises to clients.
- logger(nil).WithError(err).Error("listener problem")
- ps.shutdown <- err
- }
- })
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
logger(nil).WithError(err).Fatal("pq Listen failed")
@@ -78,8 +69,25 @@ func (ps *pgEventSource) setup() {
go ps.run()
}
+func (ps *pgEventSource) listenerProblem(ev pq.ListenerEventType, err error) {
+ // Until we have a mechanism for catching up on missed events,
+ // we cannot recover from a dropped connection without
+ // breaking our promises to clients.
+ logger(nil).WithError(err).Error("listener problem")
+
+ // avoid blocking here if we get multiple errors (only one err
+ // will ever be received by run())
+ if len(ps.shutdown) < 1 {
+ ps.shutdown <- err
+ }
+
+ ps.pqListener.Close()
+}
+
func (ps *pgEventSource) run() {
ps.queue = make(chan *event, ps.QueueSize)
+ defer close(ps.queue)
+ defer ps.pqListener.Close()
go func() {
for e := range ps.queue {
@@ -115,7 +123,6 @@ func (ps *pgEventSource) run() {
if ok {
logger(nil).WithError(err).Info("shutdown")
}
- close(ps.queue)
return
case <-ticker.C:
@@ -124,10 +131,19 @@ func (ps *pgEventSource) run() {
case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
- close(ps.queue)
+ logger(nil).Debug("pqListener Notify chan closed")
return
}
+ if pqEvent == nil {
+ // pq should call listenerProblem
+ // itself in addition to sending us a
+ // nil event, so this might be
+ // superfluous:
+ ps.listenerProblem(pqEvent, nil)
+ continue
+ }
if pqEvent.Channel != "logs" {
+ logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
continue
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list