[ARVADOS] created: 1.1.0-30-g32713b4

Git user git at public.curoverse.com
Tue Oct 17 10:31:27 EDT 2017


        at  32713b4b3c2e1685b79acb24059a5b817cf6cbfc (commit)


commit 32713b4b3c2e1685b79acb24059a5b817cf6cbfc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Oct 17 10:30:42 2017 -0400

    12446: Avoid listing every container uuid in status query.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 356d087..491897c 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -73,16 +73,56 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 	defer poll.Stop()
 
 	for {
-		tracked := d.trackedUUIDs()
-		d.checkForUpdates([][]interface{}{
-			{"uuid", "in", tracked}})
+		// Clear the "seen" flag
+		d.mtx.Lock()
+		for _, tracker := range d.trackers {
+			tracker.seen = false
+		}
+		d.mtx.Unlock()
+
+		// Containers I currently own (Locked/Running)
 		d.checkForUpdates([][]interface{}{
-			{"locked_by_uuid", "=", d.auth.UUID},
-			{"uuid", "not in", tracked}})
+			{"locked_by_uuid", "=", d.auth.UUID}})
+
+		// Containers I should try to dispatch
 		d.checkForUpdates([][]interface{}{
 			{"state", "=", Queued},
-			{"priority", ">", "0"},
-			{"uuid", "not in", tracked}})
+			{"priority", ">", "0"}})
+
+		// Containers I know about but didn't fall into the
+		// above two categories (probably Complete/Cancelled)
+		var missed []string
+		d.mtx.Lock()
+		for uuid, tracker := range d.trackers {
+			if tracker.seen == false {
+				missed = append(missed, uuid)
+			}
+		}
+		d.mtx.Unlock()
+
+		for len(missed) > 0 {
+			var batch []string
+			if len(missed) > 20 {
+				batch = missed[0:20]
+				missed = missed[20:]
+			} else {
+				batch = missed
+				missed = missed[0:0]
+			}
+			d.checkForUpdates([][]interface{}{
+				{"uuid", "in", batch}})
+		}
+
+		// Containers that I know about that didn't show up in any
+		// query.
+		d.mtx.Lock()
+		for _, tracker := range d.trackers {
+			if tracker.seen == false {
+				tracker.close()
+			}
+		}
+		d.mtx.Unlock()
+
 		select {
 		case <-poll.C:
 			continue
@@ -92,29 +132,15 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 	}
 }
 
-func (d *Dispatcher) trackedUUIDs() []string {
-	d.mtx.Lock()
-	defer d.mtx.Unlock()
-	if len(d.trackers) == 0 {
-		// API bug: ["uuid", "not in", []] does not work as
-		// expected, but this does:
-		return []string{"this-uuid-does-not-exist"}
-	}
-	uuids := make([]string, 0, len(d.trackers))
-	for x := range d.trackers {
-		uuids = append(uuids, x)
-	}
-	return uuids
-}
-
 // Start a runner in a new goroutine, and send the initial container
 // record to its updates channel.
 func (d *Dispatcher) start(c arvados.Container) *runTracker {
-	tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+	tracker := &runTracker{updates: make(chan arvados.Container, 1), seen: true}
 	tracker.updates <- c
 	go func() {
 		d.RunContainer(d, c, tracker.updates)
-
+		// RunContainer blocks for the lifetime of the container.  When
+		// it returns, the tracker should delete itself.
 		d.mtx.Lock()
 		delete(d.trackers, c.UUID)
 		d.mtx.Unlock()
@@ -149,6 +175,10 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
 
 	for _, c := range containers {
 		tracker, alreadyTracking := d.trackers[c.UUID]
+		if alreadyTracking {
+			tracker.seen = true
+		}
+
 		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
 			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if alreadyTracking {
@@ -246,6 +276,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
 
 type runTracker struct {
 	closing bool
+	seen    bool
 	updates chan arvados.Container
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list