[ARVADOS] updated: 2.1.0-1844-gd5cf3475a
Git user
git at public.arvados.org
Wed Jan 26 17:02:18 UTC 2022
Summary of changes:
sdk/go/dispatch/dispatch.go | 41 ++++++++++++++++++++++++++++++++---------
1 file changed, 32 insertions(+), 9 deletions(-)
via d5cf3475a95e001de0adcde436e84f1e10d3084f (commit)
from c361e51569e28f30bd034ac240b936346224a0d0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d5cf3475a95e001de0adcde436e84f1e10d3084f
Author: Tom Clegg <tom at curii.com>
Date: Wed Jan 26 10:32:30 2022 -0500
18670: Fix abandoned job tracker during race.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 00c75154f..1a1be126b 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -172,10 +172,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
}
tracker.updates <- c
go func() {
+ fallbackState := Queued
err := d.RunContainer(d, c, tracker.updates)
if err != nil {
text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ fallbackState = Cancelled
var logBuf bytes.Buffer
fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
if len(err.AvailableTypes) == 0 {
@@ -189,7 +191,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
}
}
text = logBuf.String()
- d.UpdateState(c.UUID, Cancelled)
}
d.Logger.Printf("%s", text)
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -197,12 +198,33 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
"event_type": "dispatch",
"properties": map[string]string{"text": text}}}
d.Arv.Create("logs", lr, nil)
- d.Unlock(c.UUID)
}
-
- d.mtx.Lock()
- delete(d.trackers, c.UUID)
- d.mtx.Unlock()
+ // If checkListForUpdates() doesn't close the tracker
+ // after 2 queue updates, try to move the container to
+ // the fallback state, which should eventually work
+ // and cause the tracker to close.
+ updates := 0
+ for upd := range tracker.updates {
+ updates++
+ switch upd.State {
+ case Queued, Complete, Cancelled:
+ return
+ case Locked, Running:
+ // Tracker didn't clean up before
+ // returning -- or this is the first
+ // update and it contains stale
+ // information from before
+ // RunContainer() returned.
+ if updates < 2 {
+ // Avoid generating confusing
+ // logs / API calls in the
+ // stale-info case.
+ continue
+ }
+ d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+ d.UpdateState(c.UUID, fallbackState)
+ }
+ }
}()
return tracker
}
@@ -263,12 +285,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
- case Queued:
+ case Queued, Cancelled, Complete:
+ d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
tracker.close()
+ delete(d.trackers, c.UUID)
case Locked, Running:
+ d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
tracker.update(c)
- case Cancelled, Complete:
- tracker.close()
}
} else {
switch c.State {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list