[ARVADOS] updated: 2565719b84461df57ec411f7973b88419f0e6c2c
Git user
git at public.curoverse.com
Wed Feb 8 16:34:27 EST 2017
Summary of changes:
services/ws/event_source.go | 31 +++++++++++++++++++------------
1 file changed, 19 insertions(+), 12 deletions(-)
discards c99fd72a6db093ef0f9a80ed66f0ac0807c74933 (commit)
via 2565719b84461df57ec411f7973b88419f0e6c2c (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (c99fd72a6db093ef0f9a80ed66f0ac0807c74933)
\
N -- N -- N (2565719b84461df57ec411f7973b88419f0e6c2c)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2565719b84461df57ec411f7973b88419f0e6c2c
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Feb 8 16:34:18 2017 -0500
11070: Add some logs, ensure shutdown after a pq connection problem.
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..97c58b7 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -59,27 +59,42 @@ func (ps *pgEventSource) setup() {
}
ps.db = db
- ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
- if err != nil {
- // Until we have a mechanism for catching up
- // on missed events, we cannot recover from a
- // dropped connection without breaking our
- // promises to clients.
- logger(nil).WithError(err).Error("listener problem")
- ps.shutdown <- err
- }
- })
+ ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
err = ps.pqListener.Listen("logs")
if err != nil {
logger(nil).WithError(err).Fatal("pq Listen failed")
}
- logger(nil).Debug("pgEventSource listening")
-
go ps.run()
+ logger(nil).Debug("pgEventSource setup")
+}
+
+func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
+ if et == pq.ListenerEventConnected {
+ logger(nil).Debug("pgEventSource connected")
+ return
+ }
+
+ // Until we have a mechanism for catching up on missed events,
+ // we cannot recover from a dropped connection without
+ // breaking our promises to clients.
+ logger(nil).
+ WithField("eventType", et).
+ WithError(err).
+ Error("listener problem")
+
+ select {
+ case ps.shutdown <- err:
+ default:
+ // If there's already an err in the shutdown channel,
+ // shutdown will happen (or has already happened) so
+ // there's no need to block here.
+ }
}
func (ps *pgEventSource) run() {
ps.queue = make(chan *event, ps.QueueSize)
+ defer close(ps.queue)
+ defer ps.pqListener.Close()
go func() {
for e := range ps.queue {
@@ -115,7 +130,6 @@ func (ps *pgEventSource) run() {
if ok {
logger(nil).WithError(err).Info("shutdown")
}
- close(ps.queue)
return
case <-ticker.C:
@@ -124,10 +138,19 @@ func (ps *pgEventSource) run() {
case pqEvent, ok := <-ps.pqListener.Notify:
if !ok {
- close(ps.queue)
+ logger(nil).Debug("pqListener Notify chan closed")
return
}
+ if pqEvent == nil {
+ // pq should call listenerProblem
+ // itself in addition to sending us a
+ // nil event, so this might be
+ // superfluous:
+ ps.listenerProblem(-1, nil)
+ continue
+ }
if pqEvent.Channel != "logs" {
+ logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
continue
}
logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list