[ARVADOS] created: 1.1.0-30-g1b411c1

Git user git at public.curoverse.com
Thu Oct 19 11:14:56 EDT 2017


        at  1b411c1882a37ae8f88c0b770994dd257cf3dee4 (commit)


commit 1b411c1882a37ae8f88c0b770994dd257cf3dee4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Oct 17 10:30:42 2017 -0400

    12446: Avoid listing every container uuid in status query.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 356d087..3289c67 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -73,38 +73,73 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 	defer poll.Stop()
 
 	for {
-		tracked := d.trackedUUIDs()
-		d.checkForUpdates([][]interface{}{
-			{"uuid", "in", tracked}})
-		d.checkForUpdates([][]interface{}{
-			{"locked_by_uuid", "=", d.auth.UUID},
-			{"uuid", "not in", tracked}})
-		d.checkForUpdates([][]interface{}{
-			{"state", "=", Queued},
-			{"priority", ">", "0"},
-			{"uuid", "not in", tracked}})
 		select {
 		case <-poll.C:
-			continue
+			break
 		case <-ctx.Done():
 			return ctx.Err()
 		}
-	}
-}
 
-func (d *Dispatcher) trackedUUIDs() []string {
-	d.mtx.Lock()
-	defer d.mtx.Unlock()
-	if len(d.trackers) == 0 {
-		// API bug: ["uuid", "not in", []] does not work as
-		// expected, but this does:
-		return []string{"this-uuid-does-not-exist"}
-	}
-	uuids := make([]string, 0, len(d.trackers))
-	for x := range d.trackers {
-		uuids = append(uuids, x)
+		todo := make(map[string]*runTracker)
+		d.mtx.Lock()
+		// Make a copy of trackers
+		for uuid, tracker := range d.trackers {
+			todo[uuid] = tracker
+		}
+		d.mtx.Unlock()
+
+		// Containers I currently own (Locked/Running)
+		querySuccess := d.checkForUpdates([][]interface{}{
+			{"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+		// Containers I should try to dispatch
+		querySuccess = d.checkForUpdates([][]interface{}{
+			{"state", "=", Queued},
+			{"priority", ">", "0"}}, todo) && querySuccess
+
+		if !querySuccess {
+			// There was an error in one of the previous queries,
+			// we probably didn't get updates for all the
+			// containers we should have.  Don't check them
+			// individually because it may be expensive.
+			continue
+		}
+
+		// Containers I know about but didn't fall into the
+		// above two categories (probably Complete/Cancelled)
+		var missed []string
+		for uuid := range todo {
+			missed = append(missed, uuid)
+		}
+
+		for len(missed) > 0 {
+			var batch []string
+			if len(missed) > 20 {
+				batch = missed[0:20]
+				missed = missed[20:]
+			} else {
+				batch = missed
+				missed = missed[0:0]
+			}
+			querySuccess = d.checkForUpdates([][]interface{}{
+				{"uuid", "in", batch}}, todo) && querySuccess
+		}
+
+		if !querySuccess {
+			// There was an error in one of the previous queries, we probably
+			// didn't see all the containers we should have, so don't shut down
+			// the missed containers.
+			continue
+		}
+
+		// Containers that I know about that didn't show up in any
+		// query should be let go.
+		for uuid, tracker := range todo {
+			log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+			tracker.close()
+		}
+
 	}
-	return uuids
 }
 
 // Start a runner in a new goroutine, and send the initial container
@@ -114,7 +149,8 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 	tracker.updates <- c
 	go func() {
 		d.RunContainer(d, c, tracker.updates)
-
+		// RunContainer blocks for the lifetime of the container.  When
+		// it returns, the tracker should delete itself.
 		d.mtx.Lock()
 		delete(d.trackers, c.UUID)
 		d.mtx.Unlock()
@@ -122,7 +158,7 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 	return tracker
 }
 
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
 	params := arvadosclient.Dict{
 		"filters": filters,
 		"order":   []string{"priority desc"}}
@@ -133,14 +169,15 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
 		err := d.Arv.List("containers", params, &list)
 		if err != nil {
 			log.Printf("Error getting list of containers: %q", err)
-			return
+			return false
 		}
 		more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
-		d.checkListForUpdates(list.Items)
+		d.checkListForUpdates(list.Items, todo)
 	}
+	return true
 }
 
-func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
 	d.mtx.Lock()
 	defer d.mtx.Unlock()
 	if d.trackers == nil {
@@ -149,6 +186,8 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
 
 	for _, c := range containers {
 		tracker, alreadyTracking := d.trackers[c.UUID]
+		delete(todo, c.UUID)
+
 		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
 			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if alreadyTracking {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list