[ARVADOS] created: 2.1.0-1844-g1789aa86c
Git user
git at public.arvados.org
Wed Jan 26 18:32:28 UTC 2022
at 1789aa86c580495f0a722289cec41c4e31872e26 (commit)
commit 1789aa86c580495f0a722289cec41c4e31872e26
Author: Tom Clegg <tom at curii.com>
Date: Wed Jan 26 13:32:13 2022 -0500
18670: Fix abandoned job tracker during race.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 00c75154f..a0a61f2b6 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -172,10 +172,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
}
tracker.updates <- c
go func() {
+ fallbackState := Queued
err := d.RunContainer(d, c, tracker.updates)
if err != nil {
text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ fallbackState = Cancelled
var logBuf bytes.Buffer
fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
if len(err.AvailableTypes) == 0 {
@@ -189,7 +191,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
}
}
text = logBuf.String()
- d.UpdateState(c.UUID, Cancelled)
}
d.Logger.Printf("%s", text)
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -197,12 +198,30 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
"event_type": "dispatch",
"properties": map[string]string{"text": text}}}
d.Arv.Create("logs", lr, nil)
- d.Unlock(c.UUID)
}
-
- d.mtx.Lock()
- delete(d.trackers, c.UUID)
- d.mtx.Unlock()
+ // If checkListForUpdates() doesn't close the tracker
+ // after 2 queue updates, try to move the container to
+ // the fallback state, which should eventually work
+ // and cause the tracker to close.
+ updates := 0
+ for upd := range tracker.updates {
+ updates++
+ if upd.State == Locked || upd.State == Running {
+ // Tracker didn't clean up before
+ // returning -- or this is the first
+ // update and it contains stale
+ // information from before
+ // RunContainer() returned.
+ if updates < 2 {
+ // Avoid generating confusing
+ // logs / API calls in the
+ // stale-info case.
+ continue
+ }
+ d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+ d.UpdateState(c.UUID, fallbackState)
+ }
+ }
}()
return tracker
}
@@ -263,12 +282,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
- case Queued:
+ case Queued, Cancelled, Complete:
+ d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
tracker.close()
+ delete(d.trackers, c.UUID)
case Locked, Running:
+ d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
tracker.update(c)
- case Cancelled, Complete:
- tracker.close()
}
} else {
switch c.State {
commit c361e51569e28f30bd034ac240b936346224a0d0
Author: Tom Clegg <tom at curii.com>
Date: Mon Jan 24 23:48:17 2022 -0500
18670: Fix unreliable test.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index c9ed5582b..0d9324784 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -119,7 +119,7 @@ func (disp *dispatcher) init() {
disp.lsfcli.logger = disp.logger
disp.lsfqueue = lsfqueue{
logger: disp.logger,
- period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+ period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
lsfcli: &disp.lsfcli,
}
disp.ArvClient.AuthToken = disp.AuthToken
@@ -256,7 +256,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
// Try "bkill" every few seconds until the LSF job disappears
// from the queue.
- ticker := time.NewTicker(5 * time.Second)
+ ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
defer ticker.Stop()
for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
err := disp.lsfcli.Bkill(qent.ID)
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index c678a9a48..a99983f34 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -44,7 +44,8 @@ func (s *suite) SetUpTest(c *check.C) {
c.Assert(err, check.IsNil)
cluster, err := cfg.GetCluster("")
c.Assert(err, check.IsNil)
- cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+ cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
+ cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
@@ -243,16 +244,19 @@ func (s *suite) TestSubmit(c *check.C) {
}
// "queuedcontainer" should be running
if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
+ c.Log("Lookup(queuedcontainer) == false")
continue
}
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
- if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
continue
}
// "crTooBig" should be cancelled because lsf stub
// reports there is no suitable instance type
- if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+ if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+ c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
continue
}
var ctr arvados.Container
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 3ed4d0c18..60f01640a 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -58,7 +58,7 @@ func (q *lsfqueue) getNext() map[string]bjobsEntry {
func (q *lsfqueue) init() {
q.updated = sync.NewCond(&q.mutex)
q.nextReady = make(chan (<-chan struct{}))
- ticker := time.NewTicker(time.Second)
+ ticker := time.NewTicker(q.period)
go func() {
for range ticker.C {
// Send a new "next update ready" channel to
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list