[ARVADOS] created: 1.3.0-526-g2a748e79c
Git user
git at public.curoverse.com
Fri Mar 15 20:35:13 UTC 2019
at 2a748e79c3a72454d70e40f39fcad9dabf4943cc (commit)
commit 2a748e79c3a72454d70e40f39fcad9dabf4943cc
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Mar 15 16:19:51 2019 -0400
14977: Don't try fixStaleLocks until worker pool state is loaded.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index e81c2c091..e90935e2a 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -419,8 +419,12 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
}
// CountWorkers returns the current number of workers in each state.
+//
+// CountWorkers blocks, if necessary, until the initial instance list
+// has been loaded from the cloud provider.
func (wp *Pool) CountWorkers() map[State]int {
wp.setupOnce.Do(wp.setup)
+ wp.waitUntilLoaded()
wp.mtx.Lock()
defer wp.mtx.Unlock()
r := map[State]int{}
@@ -786,6 +790,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
}
if !wp.loaded {
+ notify = true
wp.loaded = true
wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
}
@@ -795,6 +800,17 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
}
}
+func (wp *Pool) waitUntilLoaded() {
+ ch := wp.Subscribe()
+ wp.mtx.RLock()
+ defer wp.mtx.RUnlock()
+ for !wp.loaded {
+ wp.mtx.RUnlock()
+ <-ch
+ wp.mtx.RLock()
+ }
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {
commit b268e5520ceed74300b01cc80857e020a8e4ba05
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Mar 15 16:11:48 2019 -0400
14977: Kill containers that don't exist according to controller.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
index 148b653c2..1f9338f7b 100644
--- a/lib/dispatchcloud/scheduler/fix_stale_locks.go
+++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go
@@ -47,7 +47,6 @@ waiting:
// Give up.
break waiting
}
-
}
for _, uuid := range stale {
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 4296a1364..9f26877f5 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -335,3 +335,40 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
}
c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
}
+
+func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ pool := stubPool{
+ unalloc: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ idle: map[arvados.InstanceType]int{
+ test.InstanceType(2): 0,
+ },
+ running: map[string]time.Time{
+ test.ContainerUUID(2): time.Time{},
+ },
+ }
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ // create a new worker
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+ sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ c.Check(pool.running, check.HasLen, 1)
+ sch.sync()
+ for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+ }
+ c.Check(pool.Running(), check.HasLen, 0)
+}
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
index 23fc621de..28b9fd338 100644
--- a/lib/dispatchcloud/scheduler/sync.go
+++ b/lib/dispatchcloud/scheduler/sync.go
@@ -30,11 +30,11 @@ func (sch *Scheduler) sync() {
switch ent.Container.State {
case arvados.ContainerStateRunning:
if !running {
- go sch.cancel(ent, "not running on any worker")
+ go sch.cancel(uuid, "not running on any worker")
} else if !exited.IsZero() && qUpdated.After(exited) {
- go sch.cancel(ent, "state=\"Running\" after crunch-run exited")
+ go sch.cancel(uuid, "state=\"Running\" after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go sch.kill(ent, "priority=0")
+ go sch.kill(uuid, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
@@ -46,7 +46,7 @@ func (sch *Scheduler) sync() {
// of kill() will be to make the
// worker available for the next
// container.
- go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
} else {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
@@ -60,13 +60,13 @@ func (sch *Scheduler) sync() {
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("state=%q", ent.Container.State))
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
go sch.requeue(ent, "crunch-run exited")
} else if running && exited.IsZero() && ent.Container.Priority == 0 {
- go sch.kill(ent, "priority=0")
+ go sch.kill(uuid, "priority=0")
} else if !running && ent.Container.Priority == 0 {
go sch.requeue(ent, "priority=0")
}
@@ -77,10 +77,14 @@ func (sch *Scheduler) sync() {
}).Error("BUG: unexpected state")
}
}
+ for uuid := range running {
+ if _, known := qEntries[uuid]; !known {
+ go sch.kill(uuid, "not in queue")
+ }
+ }
}
-func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
+func (sch *Scheduler) cancel(uuid string, reason string) {
if !sch.uuidLock(uuid, "cancel") {
return
}
@@ -93,8 +97,7 @@ func (sch *Scheduler) cancel(ent container.QueueEnt, reason string) {
}
}
-func (sch *Scheduler) kill(ent container.QueueEnt, reason string) {
- uuid := ent.Container.UUID
+func (sch *Scheduler) kill(uuid string, reason string) {
logger := sch.logger.WithField("ContainerUUID", uuid)
logger.Debugf("killing crunch-run process because %s", reason)
sch.pool.KillContainer(uuid)
commit db439331ed1888868e97e18a253257c8b40567da
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Mar 15 15:21:39 2019 -0400
14977: Drop containers when the controller says they don't exist.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
index bbe47625a..9cb210d0e 100644
--- a/lib/dispatchcloud/container/queue.go
+++ b/lib/dispatchcloud/container/queue.go
@@ -5,6 +5,7 @@
package container
import (
+ "errors"
"io"
"sync"
"time"
@@ -398,32 +399,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
}
apply(avail)
- var missing []string
+ missing := map[string]bool{}
cq.mtx.Lock()
for uuid, ent := range cq.current {
if next[uuid] == nil &&
ent.Container.State != arvados.ContainerStateCancelled &&
ent.Container.State != arvados.ContainerStateComplete {
- missing = append(missing, uuid)
+ missing[uuid] = true
}
}
cq.mtx.Unlock()
- for i, page := 0, 20; i < len(missing); i += page {
- batch := missing[i:]
- if len(batch) > page {
- batch = batch[:page]
+ for len(missing) > 0 {
+ var batch []string
+ for uuid := range missing {
+ batch = append(batch, uuid)
+ if len(batch) == 20 {
+ break
+ }
}
+ filters := []arvados.Filter{{"uuid", "in", batch}}
ended, err := cq.fetchAll(arvados.ResourceListParams{
Select: selectParam,
Order: "uuid",
Count: "none",
- Filters: []arvados.Filter{{"uuid", "in", batch}},
+ Filters: filters,
})
if err != nil {
return nil, err
}
+ if len(ended) == 0 {
+ // This is the only case where we can conclude
+ // a container has been deleted from the
+ // database. A short (but non-zero) page, on
+ // the other hand, can be caused by a response
+ // size limit.
+ for _, uuid := range batch {
+ cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+ delete(missing, uuid)
+ cq.mtx.Lock()
+ cq.delEnt(uuid, cq.current[uuid].Container.State)
+ cq.mtx.Unlock()
+ }
+ continue
+ }
apply(ended)
+ for _, ctr := range ended {
+ if _, ok := missing[ctr.UUID]; !ok {
+ msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "Filters": filters,
+ }).Error(msg)
+ return nil, errors.New(msg)
+ }
+ delete(missing, ctr.UUID)
+ }
}
return next, nil
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list