[ARVADOS] created: c99fd72a6db093ef0f9a80ed66f0ac0807c74933

Git user git at public.curoverse.com
Wed Feb 8 15:56:53 EST 2017


        at  c99fd72a6db093ef0f9a80ed66f0ac0807c74933 (commit)


commit c99fd72a6db093ef0f9a80ed66f0ac0807c74933
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Feb 8 15:54:47 2017 -0500

    11070: Add some logs, ensure shutdown after a pq connection problem.

diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..d1150cc 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -59,16 +59,7 @@ func (ps *pgEventSource) setup() {
 	}
 	ps.db = db
 
-	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
-		if err != nil {
-			// Until we have a mechanism for catching up
-			// on missed events, we cannot recover from a
-			// dropped connection without breaking our
-			// promises to clients.
-			logger(nil).WithError(err).Error("listener problem")
-			ps.shutdown <- err
-		}
-	})
+	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
 	err = ps.pqListener.Listen("logs")
 	if err != nil {
 		logger(nil).WithError(err).Fatal("pq Listen failed")
@@ -78,8 +69,25 @@ func (ps *pgEventSource) setup() {
 	go ps.run()
 }
 
+func (ps *pgEventSource) listenerProblem(ev pq.ListenerEventType, err error) {
+	// Until we have a mechanism for catching up on missed events,
+	// we cannot recover from a dropped connection without
+	// breaking our promises to clients.
+	logger(nil).WithError(err).Error("listener problem")
+
+	// avoid blocking here if we get multiple errors (only one err
+	// will ever be received by run())
+	if len(ps.shutdown) < 1 {
+		ps.shutdown <- err
+	}
+
+	ps.pqListener.Close()
+}
+
 func (ps *pgEventSource) run() {
 	ps.queue = make(chan *event, ps.QueueSize)
+	defer close(ps.queue)
+	defer ps.pqListener.Close()
 
 	go func() {
 		for e := range ps.queue {
@@ -115,7 +123,6 @@ func (ps *pgEventSource) run() {
 			if ok {
 				logger(nil).WithError(err).Info("shutdown")
 			}
-			close(ps.queue)
 			return
 
 		case <-ticker.C:
@@ -124,10 +131,19 @@ func (ps *pgEventSource) run() {
 
 		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
-				close(ps.queue)
+				logger(nil).Debug("pqListener Notify chan closed")
 				return
 			}
+			if pqEvent == nil {
+				// pq should call listenerProblem
+				// itself in addition to sending us a
+				// nil event, so this might be
+				// superfluous:
+				ps.listenerProblem(pqEvent, nil)
+				continue
+			}
 			if pqEvent.Channel != "logs" {
+				logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
 				continue
 			}
 			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list