[ARVADOS] updated: 2.1.0-977-gba09052b2
Git user
git at public.arvados.org
Fri Jul 2 21:10:48 UTC 2021
Summary of changes:
lib/lsf/dispatch.go | 18 ++++++++++-
lib/lsf/lsfqueue.go | 87 +++++++++++++++++++++++++++++++++++------------------
2 files changed, 74 insertions(+), 31 deletions(-)
via ba09052b27956a75ec07790bff5f8613795cd75d (commit)
via ee403420f6a88c7d7188f1b75d0e4bddaec9d483 (commit)
from c1ca6ec2e9809774f9bce3969295db1c70a5ccc2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ba09052b27956a75ec07790bff5f8613795cd75d
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 2 17:08:08 2021 -0400
17756: Add checkLsfQueueForOrphans.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index ff95d0db2..b7032dc73 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -11,6 +11,7 @@ import (
"fmt"
"math"
"net/http"
+ "regexp"
"strings"
"sync"
"time"
@@ -315,8 +316,23 @@ func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string
}
}
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
func (disp *dispatcher) checkLsfQueueForOrphans() {
- disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+ containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+ for _, uuid := range disp.lsfqueue.All() {
+ if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+ continue
+ }
+ err := disp.arvDispatcher.TrackContainer(uuid)
+ if err != nil {
+ disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+ }
+ }
}
func execScript(args []string) []byte {
commit ee403420f6a88c7d7188f1b75d0e4bddaec9d483
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 2 17:07:27 2021 -0400
17756: Rewrite lsfqueue "wait for next update".
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 65f38690c..cedd0becd 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -16,66 +16,93 @@ type lsfqueue struct {
period time.Duration
lsfcli *lsfcli
- initOnce sync.Once
- mutex sync.Mutex
- needUpdate chan bool
- updated *sync.Cond
- latest map[string]bjobsEntry
+ initOnce sync.Once
+ mutex sync.Mutex
+ nextReady chan (<-chan struct{})
+ updated *sync.Cond
+ latest map[string]bjobsEntry
}
// JobID waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
// job ID corresponding to the given container UUID.
func (q *lsfqueue) JobID(uuid string) (int, bool) {
- q.initOnce.Do(q.init)
- q.mutex.Lock()
- defer q.mutex.Unlock()
- select {
- case q.needUpdate <- true:
- default:
- // an update is already pending
- }
- q.updated.Wait()
- ent, ok := q.latest[uuid]
- q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+ ent, ok := q.getNext()[uuid]
return ent.id, ok
}
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+ latest := q.getNext()
+ names := make([]string, 0, len(latest))
+ for name := range latest {
+ names = append(names, name)
+ }
+ return names
+}
+
func (q *lsfqueue) SetPriority(uuid string, priority int64) {
q.initOnce.Do(q.init)
q.logger.Debug("SetPriority is not implemented")
}
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+ q.initOnce.Do(q.init)
+ <-(<-q.nextReady)
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ return q.latest
+}
+
func (q *lsfqueue) init() {
q.updated = sync.NewCond(&q.mutex)
- q.needUpdate = make(chan bool, 1)
+ q.nextReady = make(chan (<-chan struct{}))
ticker := time.NewTicker(time.Second)
go func() {
- for range q.needUpdate {
- q.logger.Debug("running bjobs")
- ents, err := q.lsfcli.Bjobs()
- if err != nil {
- q.logger.Warnf("bjobs: %s", err)
- // Retry on the next tick, don't wait
- // for another new call to JobID().
+ for range ticker.C {
+ // Send a new "next update ready" channel to
+ // the next goroutine that wants one (and any
+ // others that have already queued up since
+ // the first one started waiting).
+ //
+ // Below, when we get a new update, we'll
+ // signal that to the other goroutines by
+ // close the ready chan.
+ ready := make(chan struct{})
+ q.nextReady <- ready
+ for {
select {
- case q.needUpdate <- true:
+ case q.nextReady <- ready:
+ continue
default:
}
+ break
+ }
+ // Run bjobs repeatedly if needed, until we
+ // get valid output.
+ var ents []bjobsEntry
+ for {
+ q.logger.Debug("running bjobs")
+ var err error
+ ents, err = q.lsfcli.Bjobs()
+ if err == nil {
+ break
+ }
+ q.logger.Warnf("bjobs: %s", err)
<-ticker.C
- continue
}
next := make(map[string]bjobsEntry, len(ents))
for _, ent := range ents {
next[ent.name] = ent
}
+ // Replace q.latest and notify all the
+ // goroutines that the "next update" they
+ // asked for is now ready.
q.mutex.Lock()
q.latest = next
- q.updated.Broadcast()
- q.logger.Debugf("waking up waiters with latest %v", q.latest)
q.mutex.Unlock()
- // Limit "bjobs" invocations to 1 per second
- <-ticker.C
+ close(ready)
}
}()
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list