[ARVADOS] updated: 17f4a03d71d8d6130f796d61fa49b8480bf555b6
Git user
git at public.curoverse.com
Fri Feb 24 16:53:29 EST 2017
Summary of changes:
sdk/go/arvadostest/fixtures.go | 4 +-
sdk/go/dispatch/dispatch.go | 45 ++++++++++++++--------
sdk/go/dispatch/dispatch_test.go | 39 +++++++++++--------
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 38 ++++++++----------
services/crunch-dispatch-slurm/squeue.go | 16 ++++++--
5 files changed, 82 insertions(+), 60 deletions(-)
via 17f4a03d71d8d6130f796d61fa49b8480bf555b6 (commit)
from dccfd637a120e10166a4d9f6b75381c69121f24c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 17f4a03d71d8d6130f796d61fa49b8480bf555b6
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Feb 24 16:46:46 2017 -0500
10979: Check for orphans only once at startup. Add missing Lock() in
squeue checker. Avoid holding mtx while waiting for API response.
Ensure RunContainer actually gets called in test case.
refs #10979
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index a7ab0fe..70393a6 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -16,6 +16,8 @@ const (
Dispatch1Token = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
+
+ QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
)
// PathologicalManifest : A valid manifest designed to test
@@ -39,5 +41,3 @@ var (
// BlobSigningKey used by the test servers
const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
-
-const QueuedContainerUuid = "zzzzz-dz642-queuedcontainer"
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 49c756e..261444a 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -203,28 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error {
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
-// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
-func (d *Dispatcher) TrackContainer(uuid string) {
- d.mtx.Lock()
- defer d.mtx.Unlock()
-
- if d.trackers == nil {
- d.trackers = make(map[string]*runTracker)
- }
-
- _, alreadyTracking := d.trackers[uuid]
- if alreadyTracking {
- return
- }
-
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
var cntr arvados.Container
err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
if err != nil {
- log.Printf("Error getting container %s: %s", uuid, err)
- return
+ return err
+ }
+ if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+ return nil
}
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+ return nil
+ }
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
d.trackers[uuid] = d.start(cntr)
+ switch cntr.State {
+ case Queued, Cancelled, Complete:
+ d.trackers[uuid].close()
+ }
+ return nil
}
type runTracker struct {
diff --git a/sdk/go/dispatch/dispatch_test.go b/sdk/go/dispatch/dispatch_test.go
index 57b6126..08ce512 100644
--- a/sdk/go/dispatch/dispatch_test.go
+++ b/sdk/go/dispatch/dispatch_test.go
@@ -1,38 +1,43 @@
package dispatch
import (
+ "time"
+
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "os/exec"
-
. "gopkg.in/check.v1"
)
// Gocheck boilerplate
-var _ = Suite(&DispatchTestSuite{})
+var _ = Suite(&suite{})
-type DispatchTestSuite struct{}
+type suite struct{}
-func (s *DispatchTestSuite) SetUpSuite(c *C) {
+func (s *suite) SetUpSuite(c *C) {
arvadostest.StartAPI()
}
-func (s *DispatchTestSuite) TearDownSuite(c *C) {
+func (s *suite) TearDownSuite(c *C) {
arvadostest.StopAPI()
}
-func (s *DispatchTestSuite) TestTrackContainer(c *C) {
+func (s *suite) TestTrackContainer(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, Equals, nil)
-
- runContainer := func(d *Dispatcher, ctr arvados.Container) *exec.Cmd { return exec.Command("echo") }
- d := &Dispatcher{Arv: arv, RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
- go runContainer(dsp, ctr)
- }}
- d.trackers = make(map[string]*runTracker)
-
- d.TrackContainer(arvadostest.QueuedContainerUuid)
- _, tracking := d.trackers[arvadostest.QueuedContainerUuid]
- c.Assert(tracking, Equals, true)
+ arv.ApiToken = arvadostest.Dispatch1Token
+
+ done := make(chan bool, 1)
+ time.AfterFunc(10*time.Second, func() { done <- false })
+ d := &Dispatcher{
+ Arv: arv,
+ RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ for ctr := range status {
+ c.Logf("%#v", ctr)
+ }
+ done <- true
+ },
+ }
+ d.TrackContainer(arvadostest.QueuedContainerUUID)
+ c.Assert(<-done, Equals, true)
}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 7cb14fa..d84d461 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -123,32 +123,28 @@ func doMain() error {
log.Printf("Error notifying init daemon: %v", err)
}
- containerTrackerTicker := trackContainers(dispatcher)
- defer containerTrackerTicker.Stop()
+ go checkSqueueForOrphans(dispatcher, sqCheck)
return dispatcher.Run(context.Background())
}
-var containerUuidPattern = regexp.MustCompile(`[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
-
-// Start a goroutine to check squeue report periodically, and
-// invoke TrackContainer for all the containers in the report.
-func trackContainers(dispatcher *dispatch.Dispatcher) *time.Ticker {
- ticker := time.NewTicker(sqCheck.Period)
- go func() {
- for {
- select {
- case <-ticker.C:
- for uuid := range sqCheck.AllUuids() {
- match := containerUuidPattern.MatchString(uuid)
- if match {
- dispatcher.TrackContainer(uuid)
- }
- }
- }
+var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Check the next squeue report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel slurm
+// jobs started by a previous dispatch process that never released
+// their slurm allocations even though their container states are
+// Cancelled or Complete. See https://dev.arvados.org/issues/10979
+func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
+ for _, uuid := range sqCheck.All() {
+ if !containerUuidPattern.MatchString(uuid) {
+ continue
+ }
+ err := dispatcher.TrackContainer(uuid)
+ if err != nil {
+ log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
}
- }()
- return ticker
+ }
}
// sbatchCmd
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 0c90679..85fadbd 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -49,7 +49,7 @@ func (sqc *SqueueChecker) Stop() {
// check gets the names of jobs in the SLURM queue (running and
// queued). If it succeeds, it updates squeue.uuids and wakes up any
-// goroutines that are waiting in HasUUID().
+// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
@@ -93,7 +93,15 @@ func (sqc *SqueueChecker) start() {
}()
}
-// All Uuids in squeue
-func (sqc *SqueueChecker) AllUuids() map[string]bool {
- return sqc.uuids
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ sqc.Wait()
+ var uuids []string
+ for uuid := range sqc.uuids {
+ uuids = append(uuids, uuid)
+ }
+ return uuids
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list