[ARVADOS] created: 1.3.0-526-g2a748e79c

Git user git at public.curoverse.com
Fri Mar 15 20:35:13 UTC 2019


        at  2a748e79c3a72454d70e40f39fcad9dabf4943cc (commit)


commit 2a748e79c3a72454d70e40f39fcad9dabf4943cc
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Mar 15 16:19:51 2019 -0400

    14977: Don't try fixStaleLocks until worker pool state is loaded.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index e81c2c091..e90935e2a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -419,8 +419,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 }
 
 // CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
 func (wp *Pool) CountWorkers() map[State]int {
 	wp.setupOnce.Do(wp.setup)
+	wp.waitUntilLoaded()
 	wp.mtx.Lock()
 	defer wp.mtx.Unlock()
 	r := map[State]int{}
@@ -786,6 +790,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
 	}
 
 	if !wp.loaded {
+		notify = true
 		wp.loaded = true
 		wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
 	}
@@ -795,6 +800,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
 	}
 }
 
+func (wp *Pool) waitUntilLoaded() {
+	ch := wp.Subscribe()
+	wp.mtx.RLock()
+	defer wp.mtx.RUnlock()
+	for !wp.loaded {
+		wp.mtx.RUnlock()
+		<-ch
+		wp.mtx.RLock()
+	}
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {

commit b268e5520ceed74300b01cc80857e020a8e4ba05
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Mar 15 16:11:48 2019 -0400

    14977: Kill containers that don't exist according to controller.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
index 148b653c2..1f9338f7b 100644
--- a/lib/dispatchcloud/scheduler/fix_stale_locks.go
+++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go
@@ -47,7 +47,6 @@ waiting:
 			// Give up.
 			break waiting
 		}
-
 	}
 
 	for _, uuid := range stale {
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 4296a1364..9f26877f5 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -335,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
 	}
 	c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
 }
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+	ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+	pool := stubPool{
+		unalloc: map[arvados.InstanceType]int{
+			test.InstanceType(2): 0,
+		},
+		idle: map[arvados.InstanceType]int{
+			test.InstanceType(2): 0,
+		},
+		running: map[string]time.Time{
+			test.ContainerUUID(2): time.Time{},
+		},
+	}
+	queue := test.Queue{
+		ChooseType: chooseType,
+		Containers: []arvados.Container{
+			{
+				// create a new worker
+				UUID:     test.ContainerUUID(1),
+				Priority: 1,
+				State:    arvados.ContainerStateLocked,
+				RuntimeConstraints: arvados.RuntimeConstraints{
+					VCPUs: 1,
+					RAM:   1 << 30,
+				},
+			},
+		},
+	}
+	queue.Update()
+	sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+	c.Check(pool.running, check.HasLen, 1)
+	sch.sync()
+	for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+	}
+	c.Check(pool.Running(), check.HasLen, 0)
+}
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
index 23fc621de..28b9fd338 100644
--- a/lib/dispatchcloud/scheduler/sync.go
+++ b/lib/dispatchcloud/scheduler/sync.go
@@ -30,11 +30,11 @@ func (sch *Scheduler) sync() {
 		switch ent.Container.State {
 		case arvados.ContainerStateRunning:
 			if !running {
-				go sch.cancel(ent, "not running on any worker")
+				go sch.cancel(uuid, "not running on any worker")
 			} else if !exited.IsZero() && qUpdated.After(exited) {
-				go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+				go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
 			} else if ent.Container.Priority == 0 {
-				go sch.kill(ent, "priority=0")
+				go sch.kill(uuid, "priority=0")
 			}
 		case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
 			if running {
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
 				// of kill() will be to make the
 				// worker available for the next
 				// container.
-				go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+				go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
 			} else {
 				sch.logger.WithFields(logrus.Fields{
 					"ContainerUUID": uuid,
@@ -60,13 +60,13 @@ func (sch *Scheduler) sync() {
 				// a network outage and is still
 				// preparing to run a container that
 				// has already been unlocked/requeued.
-				go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+				go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
 			}
 		case arvados.ContainerStateLocked:
 			if running && !exited.IsZero() && qUpdated.After(exited) {
 				go sch.requeue(ent, "crunch-run exited")
 			} else if running && exited.IsZero() && ent.Container.Priority == 0 {
-				go sch.kill(ent, "priority=0")
+				go sch.kill(uuid, "priority=0")
 			} else if !running && ent.Container.Priority == 0 {
 				go sch.requeue(ent, "priority=0")
 			}
@@ -77,10 +77,14 @@ func (sch *Scheduler) sync() {
 			}).Error("BUG: unexpected state")
 		}
 	}
+	for uuid := range running {
+		if _, known := qEntries[uuid]; !known {
+			go sch.kill(uuid, "not in queue")
+		}
+	}
 }
 
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
-	uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
 	if !sch.uuidLock(uuid, "cancel") {
 		return
 	}
@@ -93,8 +97,7 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
 	}
 }
 
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
-	uuid := ent.Container.UUID
+func (sch *Scheduler) kill(uuid string, reason string) {
 	logger := sch.logger.WithField("ContainerUUID", uuid)
 	logger.Debugf("killing crunch-run process because %s", reason)
 	sch.pool.KillContainer(uuid)

commit db439331ed1888868e97e18a253257c8b40567da
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Mar 15 15:21:39 2019 -0400

    14977: Drop containers when the controller says they don't exist.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
index bbe47625a..9cb210d0e 100644
--- a/lib/dispatchcloud/container/queue.go
+++ b/lib/dispatchcloud/container/queue.go
@@ -5,6 +5,7 @@
 package container
 
 import (
+	"errors"
 	"io"
 	"sync"
 	"time"
@@ -398,32 +399,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
 	}
 	apply(avail)
 
-	var missing []string
+	missing := map[string]bool{}
 	cq.mtx.Lock()
 	for uuid, ent := range cq.current {
 		if next[uuid] == nil &&
 			ent.Container.State != arvados.ContainerStateCancelled &&
 			ent.Container.State != arvados.ContainerStateComplete {
-			missing = append(missing, uuid)
+			missing[uuid] = true
 		}
 	}
 	cq.mtx.Unlock()
 
-	for i, page := 0, 20; i < len(missing); i += page {
-		batch := missing[i:]
-		if len(batch) > page {
-			batch = batch[:page]
+	for len(missing) > 0 {
+		var batch []string
+		for uuid := range missing {
+			batch = append(batch, uuid)
+			if len(batch) == 20 {
+				break
+			}
 		}
+		filters := []arvados.Filter{{"uuid", "in", batch}}
 		ended, err := cq.fetchAll(arvados.ResourceListParams{
 			Select:  selectParam,
 			Order:   "uuid",
 			Count:   "none",
-			Filters: []arvados.Filter{{"uuid", "in", batch}},
+			Filters: filters,
 		})
 		if err != nil {
 			return nil, err
 		}
+		if len(ended) == 0 {
+			// This is the only case where we can conclude
+			// a container has been deleted from the
+			// database. A short (but non-zero) page, on
+			// the other hand, can be caused by a response
+			// size limit.
+			for _, uuid := range batch {
+				cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+				delete(missing, uuid)
+				cq.mtx.Lock()
+				cq.delEnt(uuid, cq.current[uuid].Container.State)
+				cq.mtx.Unlock()
+			}
+			continue
+		}
 		apply(ended)
+		for _, ctr := range ended {
+			if _, ok := missing[ctr.UUID]; !ok {
+				msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+				cq.logger.WithFields(logrus.Fields{
+					"ContainerUUID": ctr.UUID,
+					"Filters":       filters,
+				}).Error(msg)
+				return nil, errors.New(msg)
+			}
+			delete(missing, ctr.UUID)
+		}
 	}
 	return next, nil
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list