[ARVADOS] updated: 17f4a03d71d8d6130f796d61fa49b8480bf555b6

Git user git at public.curoverse.com
Fri Feb 24 16:53:29 EST 2017


Summary of changes:
 sdk/go/arvadostest/fixtures.go                     |  4 +-
 sdk/go/dispatch/dispatch.go                        | 45 ++++++++++++++--------
 sdk/go/dispatch/dispatch_test.go                   | 39 +++++++++++--------
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 38 ++++++++----------
 services/crunch-dispatch-slurm/squeue.go           | 16 ++++++--
 5 files changed, 82 insertions(+), 60 deletions(-)

       via  17f4a03d71d8d6130f796d61fa49b8480bf555b6 (commit)
      from  dccfd637a120e10166a4d9f6b75381c69121f24c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 17f4a03d71d8d6130f796d61fa49b8480bf555b6
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Feb 24 16:46:46 2017 -0500

    10979: Check for orphans only once at startup. Add missing Lock() in
    squeue checker. Avoid holding mtx while waiting for API response.
    Ensure RunContainer actually gets called in test case.
    
    refs #10979

diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index a7ab0fe..70393a6 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -16,6 +16,8 @@ const (
 
 	Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
 	Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
+
+	QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
 )
 
 // PathologicalManifest : A valid manifest designed to test
@@ -39,5 +41,3 @@ var (
 
 // BlobSigningKey used by the test servers
 const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
-
-const QueuedContainerUuid = "zzzzz-dz642-queuedcontainer"
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 49c756e..261444a 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -203,28 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error {
 	return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
-// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
-func (d *Dispatcher) TrackContainer(uuid string) {
-	d.mtx.Lock()
-	defer d.mtx.Unlock()
-
-	if d.trackers == nil {
-		d.trackers = make(map[string]*runTracker)
-	}
-
-	_, alreadyTracking := d.trackers[uuid]
-	if alreadyTracking {
-		return
-	}
-
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
 	var cntr arvados.Container
 	err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
 	if err != nil {
-		log.Printf("Error getting container %s: %s", uuid, err)
-		return
+		return err
+	}
+	if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+		return nil
 	}
 
+	d.mtx.Lock()
+	defer d.mtx.Unlock()
+	if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+		return nil
+	}
+	if d.trackers == nil {
+		d.trackers = make(map[string]*runTracker)
+	}
 	d.trackers[uuid] = d.start(cntr)
+	switch cntr.State {
+	case Queued, Cancelled, Complete:
+		d.trackers[uuid].close()
+	}
+	return nil
 }
 
 type runTracker struct {
diff --git a/sdk/go/dispatch/dispatch_test.go b/sdk/go/dispatch/dispatch_test.go
index 57b6126..08ce512 100644
--- a/sdk/go/dispatch/dispatch_test.go
+++ b/sdk/go/dispatch/dispatch_test.go
@@ -1,38 +1,43 @@
 package dispatch
 
 import (
+	"time"
+
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"os/exec"
-
 	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
-var _ = Suite(&DispatchTestSuite{})
+var _ = Suite(&suite{})
 
-type DispatchTestSuite struct{}
+type suite struct{}
 
-func (s *DispatchTestSuite) SetUpSuite(c *C) {
+func (s *suite) SetUpSuite(c *C) {
 	arvadostest.StartAPI()
 }
 
-func (s *DispatchTestSuite) TearDownSuite(c *C) {
+func (s *suite) TearDownSuite(c *C) {
 	arvadostest.StopAPI()
 }
 
-func (s *DispatchTestSuite) TestTrackContainer(c *C) {
+func (s *suite) TestTrackContainer(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	c.Assert(err, Equals, nil)
-
-	runContainer := func(d *Dispatcher, ctr arvados.Container) *exec.Cmd { return exec.Command("echo") }
-	d := &Dispatcher{Arv: arv, RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
-		go runContainer(dsp, ctr)
-	}}
-	d.trackers = make(map[string]*runTracker)
-
-	d.TrackContainer(arvadostest.QueuedContainerUuid)
-	_, tracking := d.trackers[arvadostest.QueuedContainerUuid]
-	c.Assert(tracking, Equals, true)
+	arv.ApiToken = arvadostest.Dispatch1Token
+
+	done := make(chan bool, 1)
+	time.AfterFunc(10*time.Second, func() { done <- false })
+	d := &Dispatcher{
+		Arv: arv,
+		RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+			for ctr := range status {
+				c.Logf("%#v", ctr)
+			}
+			done <- true
+		},
+	}
+	d.TrackContainer(arvadostest.QueuedContainerUUID)
+	c.Assert(<-done, Equals, true)
 }
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 7cb14fa..d84d461 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -123,32 +123,28 @@ func doMain() error {
 		log.Printf("Error notifying init daemon: %v", err)
 	}
 
-	containerTrackerTicker := trackContainers(dispatcher)
-	defer containerTrackerTicker.Stop()
+	go checkSqueueForOrphans(dispatcher, sqCheck)
 
 	return dispatcher.Run(context.Background())
 }
 
-var containerUuidPattern = regexp.MustCompile(`[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
-
-// Start a goroutine to check squeue report periodically, and
-// invoke TrackContainer for all the containers in the report.
-func trackContainers(dispatcher *dispatch.Dispatcher) *time.Ticker {
-	ticker := time.NewTicker(sqCheck.Period)
-	go func() {
-		for {
-			select {
-			case <-ticker.C:
-				for uuid := range sqCheck.AllUuids() {
-					match := containerUuidPattern.MatchString(uuid)
-					if match {
-						dispatcher.TrackContainer(uuid)
-					}
-				}
-			}
+var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Check the next squeue report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel slurm
+// jobs started by a previous dispatch process that never released
+// their slurm allocations even though their container states are
+// Cancelled or Complete. See https://dev.arvados.org/issues/10979
+func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
+	for _, uuid := range sqCheck.All() {
+		if !containerUuidPattern.MatchString(uuid) {
+			continue
+		}
+		err := dispatcher.TrackContainer(uuid)
+		if err != nil {
+			log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
 		}
-	}()
-	return ticker
+	}
 }
 
 // sbatchCmd
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 0c90679..85fadbd 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -49,7 +49,7 @@ func (sqc *SqueueChecker) Stop() {
 
 // check gets the names of jobs in the SLURM queue (running and
 // queued). If it succeeds, it updates squeue.uuids and wakes up any
-// goroutines that are waiting in HasUUID().
+// goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
 	// Mutex between squeue sync and running sbatch or scancel.  This
 	// establishes a sequence so that squeue doesn't run concurrently with
@@ -93,7 +93,15 @@ func (sqc *SqueueChecker) start() {
 	}()
 }
 
-// All Uuids in squeue
-func (sqc *SqueueChecker) AllUuids() map[string]bool {
-	return sqc.uuids
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+	sqc.L.Lock()
+	defer sqc.L.Unlock()
+	sqc.Wait()
+	var uuids []string
+	for uuid := range sqc.uuids {
+		uuids = append(uuids, uuid)
+	}
+	return uuids
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list