[ARVADOS] updated: 0fc6eaead0bf7691e99d19e74ec33636909001a7

Git user git at public.curoverse.com
Thu Feb 9 04:21:37 EST 2017


Summary of changes:
 services/ws/event_source.go | 87 ++++++++++++++++++++++++---------------------
 services/ws/main.go         |  8 +++--
 2 files changed, 52 insertions(+), 43 deletions(-)

  discards  2565719b84461df57ec411f7973b88419f0e6c2c (commit)
       via  0fc6eaead0bf7691e99d19e74ec33636909001a7 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (2565719b84461df57ec411f7973b88419f0e6c2c)
            \
             N -- N -- N (0fc6eaead0bf7691e99d19e74ec33636909001a7)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0fc6eaead0bf7691e99d19e74ec33636909001a7
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Feb 9 04:04:57 2017 -0500

    11070: Add some logs, ensure shutdown after a pq connection problem.

diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index ea90ec7..4de1d55 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"database/sql"
 	"strconv"
 	"strings"
@@ -35,51 +36,73 @@ type pgEventSource struct {
 	pqListener *pq.Listener
 	queue      chan *event
 	sinks      map[*pgEventSink]bool
-	setupOnce  sync.Once
 	mtx        sync.Mutex
-	shutdown   chan error
 
 	lastQDelay time.Duration
 	eventsIn   uint64
 	eventsOut  uint64
+
+	cancel func()
 }
 
 var _ debugStatuser = (*pgEventSource)(nil)
 
-func (ps *pgEventSource) setup() {
-	ps.shutdown = make(chan error, 1)
+func (ps *pgEventSource) listenerProblem(et pq.ListenerEventType, err error) {
+	if et == pq.ListenerEventConnected {
+		logger(nil).Debug("pgEventSource connected")
+		return
+	}
+
+	// Until we have a mechanism for catching up on missed events,
+	// we cannot recover from a dropped connection without
+	// breaking our promises to clients.
+	logger(nil).
+		WithField("eventType", et).
+		WithError(err).
+		Error("listener problem")
+	ps.cancel()
+}
+
+func (ps *pgEventSource) Run() {
+	logger(nil).Debug("pgEventSource Run starting")
+	defer logger(nil).Debug("pgEventSource Run finished")
+
+	ctx, cancel := context.WithCancel(context.Background())
+	ps.cancel = cancel
+	defer cancel()
+
 	ps.sinks = make(map[*pgEventSink]bool)
+	defer func() {
+		// Disconnect all clients
+		ps.mtx.Lock()
+		for sink := range ps.sinks {
+			close(sink.channel)
+		}
+		ps.sinks = nil
+		ps.mtx.Unlock()
+	}()
 
 	db, err := sql.Open("postgres", ps.DataSource)
 	if err != nil {
 		logger(nil).WithError(err).Fatal("sql.Open failed")
+		return
 	}
 	if err = db.Ping(); err != nil {
 		logger(nil).WithError(err).Fatal("db.Ping failed")
+		return
 	}
 	ps.db = db
 
-	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
-		if err != nil {
-			// Until we have a mechanism for catching up
-			// on missed events, we cannot recover from a
-			// dropped connection without breaking our
-			// promises to clients.
-			logger(nil).WithError(err).Error("listener problem")
-			ps.shutdown <- err
-		}
-	})
+	ps.pqListener = pq.NewListener(ps.DataSource, time.Second, time.Minute, ps.listenerProblem)
 	err = ps.pqListener.Listen("logs")
 	if err != nil {
 		logger(nil).WithError(err).Fatal("pq Listen failed")
 	}
-	logger(nil).Debug("pgEventSource listening")
-
-	go ps.run()
-}
+	defer ps.pqListener.Close()
+	logger(nil).Debug("pq Listen setup done")
 
-func (ps *pgEventSource) run() {
 	ps.queue = make(chan *event, ps.QueueSize)
+	defer close(ps.queue)
 
 	go func() {
 		for e := range ps.queue {
@@ -111,11 +134,8 @@ func (ps *pgEventSource) run() {
 	defer ticker.Stop()
 	for {
 		select {
-		case err, ok := <-ps.shutdown:
-			if ok {
-				logger(nil).WithError(err).Info("shutdown")
-			}
-			close(ps.queue)
+		case <-ctx.Done():
+			logger(nil).Debug("ctx done")
 			return
 
 		case <-ticker.C:
@@ -124,10 +144,19 @@ func (ps *pgEventSource) run() {
 
 		case pqEvent, ok := <-ps.pqListener.Notify:
 			if !ok {
-				close(ps.queue)
+				logger(nil).Debug("pqListener Notify chan closed")
 				return
 			}
+			if pqEvent == nil {
+				// pq should call listenerProblem
+				// itself in addition to sending us a
+				// nil event, so this might be
+				// superfluous:
+				ps.listenerProblem(-1, nil)
+				continue
+			}
 			if pqEvent.Channel != "logs" {
+				logger(nil).WithField("pqEvent", pqEvent).Error("unexpected notify from wrong channel")
 				continue
 			}
 			logID, err := strconv.ParseUint(pqEvent.Extra, 10, 64)
@@ -158,7 +187,6 @@ func (ps *pgEventSource) run() {
 // quickly as possible because when one sink stops being ready, all
 // other sinks block.
 func (ps *pgEventSource) NewSink() eventSink {
-	ps.setupOnce.Do(ps.setup)
 	sink := &pgEventSink{
 		channel: make(chan *event, 1),
 		source:  ps,
@@ -170,7 +198,6 @@ func (ps *pgEventSource) NewSink() eventSink {
 }
 
 func (ps *pgEventSource) DB() *sql.DB {
-	ps.setupOnce.Do(ps.setup)
 	return ps.db
 }
 
@@ -201,6 +228,7 @@ func (sink *pgEventSink) Channel() <-chan *event {
 	return sink.channel
 }
 
+// Stop sending events to the sink's channel.
 func (sink *pgEventSink) Stop() {
 	go func() {
 		// Ensure this sink cannot fill up and block the
@@ -210,7 +238,9 @@ func (sink *pgEventSink) Stop() {
 		}
 	}()
 	sink.source.mtx.Lock()
-	delete(sink.source.sinks, sink)
+	if _, ok := sink.source.sinks[sink]; ok {
+		delete(sink.source.sinks, sink)
+		close(sink.channel)
+	}
 	sink.source.mtx.Unlock()
-	close(sink.channel)
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index 7c3625b..9eee813 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -54,9 +54,11 @@ func main() {
 			newPermChecker: func() permChecker { return newPermChecker(cfg.Client) },
 		},
 	}
-	// Bootstrap the eventSource by attaching a dummy subscriber
-	// and hanging up.
-	eventSource.NewSink().Stop()
+
+	go func() {
+		eventSource.Run()
+		log.Fatal("event source stopped")
+	}()
 
 	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
 		log.WithError(err).Warn("error notifying init daemon")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list