[ARVADOS] created: 1.3.0-441-ge49978c5d

Git user git at public.curoverse.com
Wed Mar 6 20:57:10 EST 2019


        at  e49978c5d9bece2a1db646f36cdf346414dd8813 (commit)


commit e49978c5d9bece2a1db646f36cdf346414dd8813
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Mar 6 20:51:59 2019 -0500

    14920: Fix state==unknown during create/list race.
    
    Recognize a new/booting node by its "instance secret" tag instead of
    relying on Create() to return the new instance's ID quickly.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index fe1c6ecc0..f95e4a1b4 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -140,7 +140,7 @@ type Pool struct {
 
 	// private state
 	subscribers  map[<-chan struct{}]chan<- struct{}
-	creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+	creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls
 	workers      map[cloud.InstanceID]*worker
 	loaded       bool                 // loaded list of instances from InstanceSet at least once
 	exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -160,6 +160,11 @@ type Pool struct {
 	mMemory            *prometheus.GaugeVec
 }
 
+type createCall struct {
+	time         time.Time
+	instanceType arvados.InstanceType
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -205,8 +210,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 	defer wp.mtx.RUnlock()
 	unalloc := map[arvados.InstanceType]int{}
 	creating := map[arvados.InstanceType]int{}
-	for it, times := range wp.creating {
-		creating[it] = len(times)
+	oldestCreate := map[arvados.InstanceType]time.Time{}
+	for _, cc := range wp.creating {
+		it := cc.instanceType
+		creating[it]++
+		if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+			oldestCreate[it] = cc.time
+		}
 	}
 	for _, wkr := range wp.workers {
 		// Skip workers that are not expected to become
@@ -221,7 +231,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 		}
 		it := wkr.instType
 		unalloc[it]++
-		if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+		if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
 			// If up to N new workers appear in
 			// Instances() while we are waiting for N
 			// Create() calls to complete, we assume we're
@@ -260,10 +270,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 		return false
 	}
 	now := time.Now()
-	wp.creating[it] = append(wp.creating[it], now)
+	secret := randomHex(instanceSecretLength)
+	wp.creating[secret] = createCall{time: now, instanceType: it}
 	go func() {
 		defer wp.notify()
-		secret := randomHex(instanceSecretLength)
 		tags := cloud.InstanceTags{
 			tagKeyInstanceType:   it.Name,
 			tagKeyIdleBehavior:   string(IdleBehaviorRun),
@@ -273,14 +283,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 		inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
 		wp.mtx.Lock()
 		defer wp.mtx.Unlock()
-		// Remove our timestamp marker from wp.creating
-		for i, t := range wp.creating[it] {
-			if t == now {
-				copy(wp.creating[it][i:], wp.creating[it][i+1:])
-				wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
-				break
-			}
-		}
+		// delete() is deferred so the updateWorker() call
+		// below knows to use StateBooting when adding a new
+		// worker.
+		defer delete(wp.creating, secret)
 		if err != nil {
 			if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
 				wp.atQuotaErr = err
@@ -291,7 +297,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
 			return
 		}
-		wp.updateWorker(inst, it, StateBooting)
+		wp.updateWorker(inst, it)
 	}()
 	return true
 }
@@ -319,26 +325,30 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 	return nil
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
 	inst = tagVerifier{inst}
 	id := inst.ID()
 	if wkr := wp.workers[id]; wkr != nil {
 		wkr.executor.SetTarget(inst)
 		wkr.instance = inst
 		wkr.updated = time.Now()
-		if initialState == StateBooting && wkr.state == StateUnknown {
-			wkr.state = StateBooting
-		}
 		wkr.saveTags()
 		return wkr, false
 	}
 
+	state := StateUnknown
+	if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+		state = StateBooting
+	}
+
 	// If an instance has a valid IdleBehavior tag when it first
 	// appears, initialize the new worker accordingly (this is how
 	// we restore IdleBehavior that was set by a prior dispatch
@@ -356,7 +366,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 		"Address":      inst.Address(),
 	})
 	logger.WithFields(logrus.Fields{
-		"State":        initialState,
+		"State":        state,
 		"IdleBehavior": idleBehavior,
 	}).Infof("instance appeared in cloud")
 	now := time.Now()
@@ -365,7 +375,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 		wp:           wp,
 		logger:       logger,
 		executor:     wp.newExecutor(inst),
-		state:        initialState,
+		state:        state,
 		idleBehavior: idleBehavior,
 		instance:     inst,
 		instType:     it,
@@ -703,7 +713,7 @@ func (wp *Pool) Instances() []InstanceView {
 }
 
 func (wp *Pool) setup() {
-	wp.creating = map[arvados.InstanceType][]time.Time{}
+	wp.creating = map[string]createCall{}
 	wp.exited = map[string]time.Time{}
 	wp.workers = map[cloud.InstanceID]*worker{}
 	wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -753,7 +763,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
 			wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
 			continue
 		}
-		if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+		if wkr, isNew := wp.updateWorker(inst, it); isNew {
 			notify = true
 		} else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
 			wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list