[ARVADOS] updated: 2.1.0-977-gba09052b2

Git user git at public.arvados.org
Fri Jul 2 21:10:48 UTC 2021


Summary of changes:
 lib/lsf/dispatch.go | 18 ++++++++++-
 lib/lsf/lsfqueue.go | 87 +++++++++++++++++++++++++++++++++++------------------
 2 files changed, 74 insertions(+), 31 deletions(-)

       via  ba09052b27956a75ec07790bff5f8613795cd75d (commit)
       via  ee403420f6a88c7d7188f1b75d0e4bddaec9d483 (commit)
      from  c1ca6ec2e9809774f9bce3969295db1c70a5ccc2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ba09052b27956a75ec07790bff5f8613795cd75d
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jul 2 17:08:08 2021 -0400

    17756: Add checkLsfQueueForOrphans.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index ff95d0db2..b7032dc73 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -11,6 +11,7 @@ import (
 	"fmt"
 	"math"
 	"net/http"
+	"regexp"
 	"strings"
 	"sync"
 	"time"
@@ -315,8 +316,23 @@ func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string
 	}
 }
 
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
 func (disp *dispatcher) checkLsfQueueForOrphans() {
-	disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+	containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+	for _, uuid := range disp.lsfqueue.All() {
+		if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+			continue
+		}
+		err := disp.arvDispatcher.TrackContainer(uuid)
+		if err != nil {
+			disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+		}
+	}
 }
 
 func execScript(args []string) []byte {

commit ee403420f6a88c7d7188f1b75d0e4bddaec9d483
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jul 2 17:07:27 2021 -0400

    17756: Rewrite lsfqueue "wait for next update".
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 65f38690c..cedd0becd 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -16,66 +16,93 @@ type lsfqueue struct {
 	period time.Duration
 	lsfcli *lsfcli
 
-	initOnce   sync.Once
-	mutex      sync.Mutex
-	needUpdate chan bool
-	updated    *sync.Cond
-	latest     map[string]bjobsEntry
+	initOnce  sync.Once
+	mutex     sync.Mutex
+	nextReady chan (<-chan struct{})
+	updated   *sync.Cond
+	latest    map[string]bjobsEntry
 }
 
 // JobID waits for the next queue update (so even a job that was only
 // submitted a nanosecond ago will show up) and then returns the LSF
 // job ID corresponding to the given container UUID.
 func (q *lsfqueue) JobID(uuid string) (int, bool) {
-	q.initOnce.Do(q.init)
-	q.mutex.Lock()
-	defer q.mutex.Unlock()
-	select {
-	case q.needUpdate <- true:
-	default:
-		// an update is already pending
-	}
-	q.updated.Wait()
-	ent, ok := q.latest[uuid]
-	q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+	ent, ok := q.getNext()[uuid]
 	return ent.id, ok
 }
 
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+	latest := q.getNext()
+	names := make([]string, 0, len(latest))
+	for name := range latest {
+		names = append(names, name)
+	}
+	return names
+}
+
 func (q *lsfqueue) SetPriority(uuid string, priority int64) {
 	q.initOnce.Do(q.init)
 	q.logger.Debug("SetPriority is not implemented")
 }
 
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+	q.initOnce.Do(q.init)
+	<-(<-q.nextReady)
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+	return q.latest
+}
+
 func (q *lsfqueue) init() {
 	q.updated = sync.NewCond(&q.mutex)
-	q.needUpdate = make(chan bool, 1)
+	q.nextReady = make(chan (<-chan struct{}))
 	ticker := time.NewTicker(time.Second)
 	go func() {
-		for range q.needUpdate {
-			q.logger.Debug("running bjobs")
-			ents, err := q.lsfcli.Bjobs()
-			if err != nil {
-				q.logger.Warnf("bjobs: %s", err)
-				// Retry on the next tick, don't wait
-				// for another new call to JobID().
+		for range ticker.C {
+			// Send a new "next update ready" channel to
+			// the next goroutine that wants one (and any
+			// others that have already queued up since
+			// the first one started waiting).
+			//
+			// Below, when we get a new update, we'll
+			// signal that to the other goroutines by
+			// close the ready chan.
+			ready := make(chan struct{})
+			q.nextReady <- ready
+			for {
 				select {
-				case q.needUpdate <- true:
+				case q.nextReady <- ready:
+					continue
 				default:
 				}
+				break
+			}
+			// Run bjobs repeatedly if needed, until we
+			// get valid output.
+			var ents []bjobsEntry
+			for {
+				q.logger.Debug("running bjobs")
+				var err error
+				ents, err = q.lsfcli.Bjobs()
+				if err == nil {
+					break
+				}
+				q.logger.Warnf("bjobs: %s", err)
 				<-ticker.C
-				continue
 			}
 			next := make(map[string]bjobsEntry, len(ents))
 			for _, ent := range ents {
 				next[ent.name] = ent
 			}
+			// Replace q.latest and notify all the
+			// goroutines that the "next update" they
+			// asked for is now ready.
 			q.mutex.Lock()
 			q.latest = next
-			q.updated.Broadcast()
-			q.logger.Debugf("waking up waiters with latest %v", q.latest)
 			q.mutex.Unlock()
-			// Limit "bjobs" invocations to 1 per second
-			<-ticker.C
+			close(ready)
 		}
 	}()
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list