[ARVADOS] updated: 2565719b84461df57ec411f7973b88419f0e6c2c

Git user git at public.curoverse.com
Wed Feb 8 16:34:27 EST 2017


Summary of changes:
 services/ws/event_source.go | 31 +++++++++++++++++++------------
 1 file changed, 19 insertions(+), 12 deletions(-)

  discards  c99fd72a6db093ef0f9a80ed66f0ac0807c74933 (commit)
       via  2565719b84461df57ec411f7973b88419f0e6c2c (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (c99fd72a6db093ef0f9a80ed66f0ac0807c74933)
            \
             N -- N -- N (2565719b84461df57ec411f7973b88419f0e6c2c)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2565719b84461df57ec411f7973b88419f0e6c2c
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Feb 8 16:34:18 2017 -0500

    11070: Add some logs, ensure shutdown after a pq connection problem.

diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..97c58b7 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -59,27 +59,42 @@ func (ps *pgEventSource) setup() {
 	}
 	ps.db = db
 
-	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
-		if err != nil {
-			// Until we have a mechanism for catching up
-			// on missed events, we cannot recover from a
-			// dropped connection without breaking our
-			// promises to clients.
-			logger(nil).WithError(err).Error("listener problem")
-			ps.shutdown <- err
-		}
-	})
+	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
 	err = ps.pqListener.Listen("logs")
 	if err != nil {
 		logger(nil).WithError(err).Fatal("pq Listen failed")
 	}
-	logger(nil).Debug("pgEventSource listening")
-
 	go ps.run()
+	logger(nil).Debug("pgEventSource setup")
+}
+
+func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
+	if et == pq.ListenerEventConnected {
+		logger(nil).Debug("pgEventSource connected")
+		return
+	}
+
+	// Until we have a mechanism for catching up on missed events,
+	// we cannot recover from a dropped connection without
+	// breaking our promises to clients.
+	logger(nil).
+		WithField("eventType", et).
+		WithError(err).
+		Error("listener problem")
+
+	select {
+	case ps.shutdown <- err:
+	default:
+		// If there's already an err in the shutdown channel,
+		// shutdown will happen (or has already happened) so
+		// there's no need to block here.
+	}
 }
 
 func (ps *pgEventSource) run() {
 	ps.queue = make(chan *event, ps.QueueSize)
+	defer close(ps.queue)
+	defer ps.pqListener.Close()
 
 	go func() {
 		for e := range ps.queue {
@@ -115,7 +130,6 @@ func (ps *pgEventSource) run() {
 			if ok {
 				logger(nil).WithError(err).Info("shutdown")
 			}
-			close(ps.queue)
 			return
 
 		case <-ticker.C:
@@ -124,10 +138,19 @@ func (ps *pgEventSource) run() {
 
 		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
-				close(ps.queue)
+				logger(nil).Debug("pqListener Notify chan closed")
 				return
 			}
+			if pqEvent == nil {
+				// pq should call listenerProblem
+				// itself in addition to sending us a
+				// nil event, so this might be
+				// superfluous:
+				ps.listenerProblem(-1, nil)
+				continue
+			}
 			if pqEvent.Channel != "logs" {
+				logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
 				continue
 			}
 			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list