[ARVADOS] created: 2.1.0-1844-g1789aa86c

Git user git at public.arvados.org
Wed Jan 26 18:32:28 UTC 2022


        at  1789aa86c580495f0a722289cec41c4e31872e26 (commit)


commit 1789aa86c580495f0a722289cec41c4e31872e26
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jan 26 13:32:13 2022 -0500

    18670: Fix abandoned job tracker during race.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 00c75154f..a0a61f2b6 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -172,10 +172,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 	}
 	tracker.updates <- c
 	go func() {
+		fallbackState := Queued
 		err := d.RunContainer(d, c, tracker.updates)
 		if err != nil {
 			text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
 			if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+				fallbackState = Cancelled
 				var logBuf bytes.Buffer
 				fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
 				if len(err.AvailableTypes) == 0 {
@@ -189,7 +191,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 					}
 				}
 				text = logBuf.String()
-				d.UpdateState(c.UUID, Cancelled)
 			}
 			d.Logger.Printf("%s", text)
 			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -197,12 +198,30 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 				"event_type":  "dispatch",
 				"properties":  map[string]string{"text": text}}}
 			d.Arv.Create("logs", lr, nil)
-			d.Unlock(c.UUID)
 		}
-
-		d.mtx.Lock()
-		delete(d.trackers, c.UUID)
-		d.mtx.Unlock()
+		// If checkListForUpdates() doesn't close the tracker
+		// after 2 queue updates, try to move the container to
+		// the fallback state, which should eventually work
+		// and cause the tracker to close.
+		updates := 0
+		for upd := range tracker.updates {
+			updates++
+			if upd.State == Locked || upd.State == Running {
+				// Tracker didn't clean up before
+				// returning -- or this is the first
+				// update and it contains stale
+				// information from before
+				// RunContainer() returned.
+				if updates < 2 {
+					// Avoid generating confusing
+					// logs / API calls in the
+					// stale-info case.
+					continue
+				}
+				d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+				d.UpdateState(c.UUID, fallbackState)
+			}
+		}
 	}()
 	return tracker
 }
@@ -263,12 +282,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
 			d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if alreadyTracking {
 			switch c.State {
-			case Queued:
+			case Queued, Cancelled, Complete:
+				d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
 				tracker.close()
+				delete(d.trackers, c.UUID)
 			case Locked, Running:
+				d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
 				tracker.update(c)
-			case Cancelled, Complete:
-				tracker.close()
 			}
 		} else {
 			switch c.State {

commit c361e51569e28f30bd034ac240b936346224a0d0
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jan 24 23:48:17 2022 -0500

    18670: Fix unreliable test.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index c9ed5582b..0d9324784 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -119,7 +119,7 @@ func (disp *dispatcher) init() {
 	disp.lsfcli.logger = disp.logger
 	disp.lsfqueue = lsfqueue{
 		logger: disp.logger,
-		period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+		period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
 		lsfcli: &disp.lsfcli,
 	}
 	disp.ArvClient.AuthToken = disp.AuthToken
@@ -256,7 +256,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
 	// Try "bkill" every few seconds until the LSF job disappears
 	// from the queue.
-	ticker := time.NewTicker(5 * time.Second)
+	ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
 	defer ticker.Stop()
 	for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
 		err := disp.lsfcli.Bkill(qent.ID)
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index c678a9a48..a99983f34 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -44,7 +44,8 @@ func (s *suite) SetUpTest(c *check.C) {
 	c.Assert(err, check.IsNil)
 	cluster, err := cfg.GetCluster("")
 	c.Assert(err, check.IsNil)
-	cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+	cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
+	cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
 	s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
 	s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
 		return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
@@ -243,16 +244,19 @@ func (s *suite) TestSubmit(c *check.C) {
 		}
 		// "queuedcontainer" should be running
 		if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
+			c.Log("Lookup(queuedcontainer) == false")
 			continue
 		}
 		// "lockedcontainer" should be cancelled because it
 		// has priority 0 (no matching container requests)
-		if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+		if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+			c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
 			continue
 		}
 		// "crTooBig" should be cancelled because lsf stub
 		// reports there is no suitable instance type
-		if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+		if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+			c.Logf("Lookup(crTooBig) == true, ent = %#v", ent)
 			continue
 		}
 		var ctr arvados.Container
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 3ed4d0c18..60f01640a 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -58,7 +58,7 @@ func (q *lsfqueue) getNext() map[string]bjobsEntry {
 func (q *lsfqueue) init() {
 	q.updated = sync.NewCond(&q.mutex)
 	q.nextReady = make(chan (<-chan struct{}))
-	ticker := time.NewTicker(time.Second)
+	ticker := time.NewTicker(q.period)
 	go func() {
 		for range ticker.C {
 			// Send a new "next update ready" channel to

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list