[ARVADOS] created: 1.1.0-30-g32713b4
Git user
git at public.curoverse.com
Tue Oct 17 10:31:27 EDT 2017
at 32713b4b3c2e1685b79acb24059a5b817cf6cbfc (commit)
commit 32713b4b3c2e1685b79acb24059a5b817cf6cbfc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Oct 17 10:30:42 2017 -0400
12446: Avoid listing every container uuid in status query.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 356d087..491897c 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -73,16 +73,56 @@ func (d *Dispatcher) Run(ctx context.Context) error {
defer poll.Stop()
for {
- tracked := d.trackedUUIDs()
- d.checkForUpdates([][]interface{}{
- {"uuid", "in", tracked}})
+ // Clear the "seen" flag
+ d.mtx.Lock()
+ for _, tracker := range d.trackers {
+ tracker.seen = false
+ }
+ d.mtx.Unlock()
+
+ // Containers I currently own (Locked/Running)
d.checkForUpdates([][]interface{}{
- {"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", tracked}})
+ {"locked_by_uuid", "=", d.auth.UUID}})
+
+ // Containers I should try to dispatch
d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
- {"priority", ">", "0"},
- {"uuid", "not in", tracked}})
+ {"priority", ">", "0"}})
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ d.mtx.Lock()
+ for uuid, tracker := range d.trackers {
+ if tracker.seen == false {
+ missed = append(missed, uuid)
+ }
+ }
+ d.mtx.Unlock()
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]
+ }
+ d.checkForUpdates([][]interface{}{
+ {"uuid", "in", batch}})
+ }
+
+ // Containers that I know about that didn't show up in any
+ // query.
+ d.mtx.Lock()
+ for _, tracker := range d.trackers {
+ if tracker.seen == false {
+ tracker.close()
+ }
+ }
+ d.mtx.Unlock()
+
select {
case <-poll.C:
continue
@@ -92,29 +132,15 @@ func (d *Dispatcher) Run(ctx context.Context) error {
}
}
-func (d *Dispatcher) trackedUUIDs() []string {
- d.mtx.Lock()
- defer d.mtx.Unlock()
- if len(d.trackers) == 0 {
- // API bug: ["uuid", "not in", []] does not work as
- // expected, but this does:
- return []string{"this-uuid-does-not-exist"}
- }
- uuids := make([]string, 0, len(d.trackers))
- for x := range d.trackers {
- uuids = append(uuids, x)
- }
- return uuids
-}
-
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker := &runTracker{updates: make(chan arvados.Container, 1), seen: true}
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
-
+ // RunContainer blocks for the lifetime of the container. When
+ // it returns, the tracker should delete itself.
d.mtx.Lock()
delete(d.trackers, c.UUID)
d.mtx.Unlock()
@@ -149,6 +175,10 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
for _, c := range containers {
tracker, alreadyTracking := d.trackers[c.UUID]
+ if alreadyTracking {
+ tracker.seen = true
+ }
+
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
@@ -246,6 +276,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
type runTracker struct {
closing bool
+ seen bool
updates chan arvados.Container
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list