[ARVADOS] updated: 1.3.0-375-g9559f6df3

Git user git at public.curoverse.com
Thu Feb 21 01:04:04 EST 2019


Summary of changes:
 lib/dispatchcloud/scheduler/run_queue.go | 85 +++++++++++++++++++------------
 lib/dispatchcloud/scheduler/scheduler.go |  9 ++--
 lib/dispatchcloud/scheduler/sync.go      | 86 +++++++++++++++++---------------
 3 files changed, 106 insertions(+), 74 deletions(-)

       via  9559f6df3d943cff3cd946c406535971fdf35fb8 (commit)
      from  d97388bdbfeb6a43cb86996012a1db0ba4a8871f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 9559f6df3d943cff3cd946c406535971fdf35fb8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Feb 21 01:00:00 2019 -0500

    14807: Avoid doing concurrent update requests per container.
    
    The previous code avoided concurrent Lock requests, but would
    sometimes generate lots of overlapping Cancel requests instead of
    waiting for the first one to finish.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index ecdae7f87..d102d2fd2 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -6,6 +6,7 @@ package scheduler
 
 import (
 	"sort"
+	"time"
 
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -50,7 +51,7 @@ tryrun:
 				overquota = sorted[i:]
 				break tryrun
 			}
-			sch.bgLock(logger, ctr.UUID)
+			go sch.lockContainer(logger, ctr.UUID)
 			unalloc[it]--
 		case arvados.ContainerStateLocked:
 			if unalloc[it] > 0 {
@@ -120,22 +121,16 @@ tryrun:
 	}
 }
 
-// Start an API call to lock the given container, and return
-// immediately while waiting for the response in a new goroutine. Do
-// nothing if a lock request is already in progress for this
-// container.
-func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
-	logger.Debug("locking")
-	sch.mtx.Lock()
-	defer sch.mtx.Unlock()
-	if sch.locking[uuid] {
-		logger.Debug("locking in progress, doing nothing")
+// Lock the given container. Should be called in a new goroutine.
+func (sch *Scheduler) lockContainer(logger logrus.FieldLogger, uuid string) {
+	if !sch.uuidLock(uuid, "lock") {
 		return
 	}
+	defer sch.uuidUnlock(uuid)
 	if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
 		// This happens if the container has been cancelled or
 		// locked since runQueue called sch.queue.Entries(),
-		// possibly by a bgLock() call from a previous
+		// possibly by a lockContainer() call from a previous
 		// runQueue iteration. In any case, we will respond
 		// appropriately on the next runQueue iteration, which
 		// will have already been triggered by the queue
@@ -143,24 +138,50 @@ func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
 		logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
 		return
 	}
-	sch.locking[uuid] = true
-	go func() {
-		defer func() {
-			sch.mtx.Lock()
-			defer sch.mtx.Unlock()
-			delete(sch.locking, uuid)
-		}()
-		err := sch.queue.Lock(uuid)
-		if err != nil {
-			logger.WithError(err).Warn("error locking container")
-			return
-		}
-		logger.Debug("lock succeeded")
-		ctr, ok := sch.queue.Get(uuid)
-		if !ok {
-			logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
-		} else if ctr.State != arvados.ContainerStateLocked {
-			logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
-		}
-	}()
+	err := sch.queue.Lock(uuid)
+	if err != nil {
+		logger.WithError(err).Warn("error locking container")
+		return
+	}
+	logger.Debug("lock succeeded")
+	ctr, ok := sch.queue.Get(uuid)
+	if !ok {
+		logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+	} else if ctr.State != arvados.ContainerStateLocked {
+		logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+	}
+}
+
+// Acquire a non-blocking lock for specified UUID, returning true if
+// successful.  The op argument is used only for debug logs.
+//
+// If the lock is not available, uuidLock arranges to wake up the
+// scheduler after a short delay, so it can retry whatever operation
+// is trying to get the lock (if that operation is still worth doing).
+//
+// This mechanism helps avoid spamming the controller/database with
+// concurrent updates for any single container, even when the
+// scheduler loop is running frequently.
+func (sch *Scheduler) uuidLock(uuid, op string) bool {
+	sch.mtx.Lock()
+	defer sch.mtx.Unlock()
+	logger := sch.logger.WithFields(logrus.Fields{
+		"ContainerUUID": uuid,
+		"Op":            op,
+	})
+	if op, locked := sch.uuidOp[uuid]; locked {
+		logger.Debugf("uuidLock not available, Op=%s in progress", op)
+		// Make sure the scheduler loop wakes up to retry.
+		sch.wakeup.Reset(time.Second / 4)
+		return false
+	}
+	logger.Debug("uuidLock acquired")
+	sch.uuidOp[uuid] = op
+	return true
+}
+
+func (sch *Scheduler) uuidUnlock(uuid string) {
+	sch.mtx.Lock()
+	defer sch.mtx.Unlock()
+	delete(sch.uuidOp, uuid)
 }
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 97cacee08..eb82c4883 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -34,8 +34,9 @@ type Scheduler struct {
 	staleLockTimeout    time.Duration
 	queueUpdateInterval time.Duration
 
-	locking map[string]bool
-	mtx     sync.Mutex
+	uuidOp map[string]string // operation in progress: "lock", "cancel", ...
+	mtx    sync.Mutex
+	wakeup *time.Timer
 
 	runOnce sync.Once
 	stop    chan struct{}
@@ -53,9 +54,10 @@ func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTi
 		pool:                pool,
 		staleLockTimeout:    staleLockTimeout,
 		queueUpdateInterval: queueUpdateInterval,
+		wakeup:              time.NewTimer(time.Second),
 		stop:                make(chan struct{}),
 		stopped:             make(chan struct{}),
-		locking:             map[string]bool{},
+		uuidOp:              map[string]string{},
 	}
 }
 
@@ -116,6 +118,7 @@ func (sch *Scheduler) run() {
 			return
 		case <-queueNotify:
 		case <-poolNotify:
+		case <-sch.wakeup.C:
 		}
 	}
 }
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
index ed78a8dae..23fc621de 100644
--- a/lib/dispatchcloud/scheduler/sync.go
+++ b/lib/dispatchcloud/scheduler/sync.go
@@ -6,7 +6,6 @@ package scheduler
 
 import (
 	"fmt"
-	"time"
 
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -25,32 +24,17 @@ import (
 // cancelled.
 func (sch *Scheduler) sync() {
 	running := sch.pool.Running()
-	cancel := func(ent container.QueueEnt, reason string) {
-		uuid := ent.Container.UUID
-		logger := sch.logger.WithField("ContainerUUID", uuid)
-		logger.Infof("cancelling container because %s", reason)
-		err := sch.queue.Cancel(uuid)
-		if err != nil {
-			logger.WithError(err).Print("error cancelling container")
-		}
-	}
-	kill := func(ent container.QueueEnt, reason string) {
-		uuid := ent.Container.UUID
-		logger := sch.logger.WithField("ContainerUUID", uuid)
-		logger.Debugf("killing crunch-run process because %s", reason)
-		sch.pool.KillContainer(uuid)
-	}
 	qEntries, qUpdated := sch.queue.Entries()
 	for uuid, ent := range qEntries {
 		exited, running := running[uuid]
 		switch ent.Container.State {
 		case arvados.ContainerStateRunning:
 			if !running {
-				go cancel(ent, "not running on any worker")
+				go sch.cancel(ent, "not running on any worker")
 			} else if !exited.IsZero() && qUpdated.After(exited) {
-				go cancel(ent, "state=\"Running\" after crunch-run exited")
+				go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
 			} else if ent.Container.Priority == 0 {
-				go kill(ent, "priority=0")
+				go sch.kill(ent, "priority=0")
 			}
 		case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
 			if running {
@@ -62,7 +46,7 @@ func (sch *Scheduler) sync() {
 				// of kill() will be to make the
 				// worker available for the next
 				// container.
-				go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+				go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
 			} else {
 				sch.logger.WithFields(logrus.Fields{
 					"ContainerUUID": uuid,
@@ -76,30 +60,15 @@ func (sch *Scheduler) sync() {
 				// a network outage and is still
 				// preparing to run a container that
 				// has already been unlocked/requeued.
-				go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+				go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
 			}
 		case arvados.ContainerStateLocked:
 			if running && !exited.IsZero() && qUpdated.After(exited) {
-				logger := sch.logger.WithFields(logrus.Fields{
-					"ContainerUUID": uuid,
-					"State":         ent.Container.State,
-					"Priority":      ent.Container.Priority,
-					"Exited":        time.Since(exited).Seconds(),
-				})
-				logger.Info("requeueing locked container after crunch-run exited")
-				err := sch.queue.Unlock(uuid)
-				if err != nil {
-					logger.WithError(err).Error("error requeueing container")
-				}
+				go sch.requeue(ent, "crunch-run exited")
 			} else if running && exited.IsZero() && ent.Container.Priority == 0 {
-				go kill(ent, "priority=0")
+				go sch.kill(ent, "priority=0")
 			} else if !running && ent.Container.Priority == 0 {
-				logger := sch.logger.WithField("ContainerUUID", uuid)
-				logger.Info("unlocking container because priority=0")
-				err := sch.queue.Unlock(uuid)
-				if err != nil {
-					logger.WithError(err).Error("error requeueing container")
-				}
+				go sch.requeue(ent, "priority=0")
 			}
 		default:
 			sch.logger.WithFields(logrus.Fields{
@@ -109,3 +78,42 @@ func (sch *Scheduler) sync() {
 		}
 	}
 }
+
+func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
+	uuid := ent.Container.UUID
+	if !sch.uuidLock(uuid, "cancel") {
+		return
+	}
+	defer sch.uuidUnlock(uuid)
+	logger := sch.logger.WithField("ContainerUUID", uuid)
+	logger.Infof("cancelling container because %s", reason)
+	err := sch.queue.Cancel(uuid)
+	if err != nil {
+		logger.WithError(err).Print("error cancelling container")
+	}
+}
+
+func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
+	uuid := ent.Container.UUID
+	logger := sch.logger.WithField("ContainerUUID", uuid)
+	logger.Debugf("killing crunch-run process because %s", reason)
+	sch.pool.KillContainer(uuid)
+}
+
+func (sch *Scheduler) requeue(ent container.QueueEnt, reason string) {
+	uuid := ent.Container.UUID
+	if !sch.uuidLock(uuid, "cancel") {
+		return
+	}
+	defer sch.uuidUnlock(uuid)
+	logger := sch.logger.WithFields(logrus.Fields{
+		"ContainerUUID": uuid,
+		"State":         ent.Container.State,
+		"Priority":      ent.Container.Priority,
+	})
+	logger.Infof("requeueing locked container because %s", reason)
+	err := sch.queue.Unlock(uuid)
+	if err != nil {
+		logger.WithError(err).Error("error requeueing container")
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list