[ARVADOS] updated: cff6cd57cbbaa032602ca2fb930c78812c5824dc

Git user git at public.curoverse.com
Fri Feb 10 13:11:55 EST 2017


Summary of changes:
 sdk/go/dispatch/dispatch.go | 171 ++++++++++++++++++++++++++------------------
 1 file changed, 102 insertions(+), 69 deletions(-)

       via  cff6cd57cbbaa032602ca2fb930c78812c5824dc (commit)
       via  6b1bf77727379c7ffaf1620399c37ea0106a0909 (commit)
      from  ddee3839f8a82b889f84171e2354108cb20f93e0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit cff6cd57cbbaa032602ca2fb930c78812c5824dc
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Feb 10 13:11:52 2017 -0500

    10701: Add back MinRetryPeriod throttle. Update comments/identifiers.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index f805298..722d4ee 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,6 +1,5 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
 package dispatch
 
 import (
@@ -23,10 +22,17 @@ const (
 )
 
 type Dispatcher struct {
-	Arv            *arvadosclient.ArvadosClient
-	PollPeriod     time.Duration
+	Arv *arvadosclient.ArvadosClient
+
+	// Queue polling frequency
+	PollPeriod time.Duration
+
+	// Time to wait between successive attempts to run the same container
 	MinRetryPeriod time.Duration
-	RunContainer   Runner
+
+	// Func that implements the container lifecycle. Must be set
+	// to a non-nil DispatchFunc before calling Run().
+	RunContainer DispatchFunc
 
 	auth     arvados.APIClientAuthorization
 	mtx      sync.Mutex
@@ -34,17 +40,30 @@ type Dispatcher struct {
 	throttle throttle
 }
 
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
 	err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
 	if err != nil {
 		return fmt.Errorf("error getting my token UUID: %v", err)
 	}
 
+	d.throttle.hold = d.MinRetryPeriod
+
 	poll := time.NewTicker(d.PollPeriod)
 	defer poll.Stop()
 
@@ -139,13 +158,20 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
 		} else {
 			switch c.State {
 			case Queued:
-				if err := d.lock(c.UUID); err != nil {
+				if !d.throttle.Check(c.UUID) {
+					break
+				}
+				err := d.lock(c.UUID)
+				if err != nil {
 					log.Printf("debug: error locking container %s: %s", c.UUID, err)
-				} else {
-					c.State = Locked
-					d.running[c.UUID] = d.start(c)
+					break
 				}
+				c.State = Locked
+				d.running[c.UUID] = d.start(c)
 			case Locked, Running:
+				if !d.throttle.Check(c.UUID) {
+					break
+				}
 				d.running[c.UUID] = d.start(c)
 			case Cancelled, Complete:
 				tracker.close()

commit 6b1bf77727379c7ffaf1620399c37ea0106a0909
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Feb 10 04:56:28 2017 -0500

    10701: cleanup

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 7342c3b..f805298 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -22,30 +22,6 @@ const (
 	Cancelled = arvados.ContainerStateCancelled
 )
 
-type runner struct {
-	closing bool
-	updates chan arvados.Container
-}
-
-func (ex *runner) close() {
-	if !ex.closing {
-		close(ex.updates)
-	}
-	ex.closing = true
-}
-
-func (ex *runner) update(c arvados.Container) {
-	if ex.closing {
-		return
-	}
-	select {
-	case <-ex.updates:
-		log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
-	default:
-	}
-	ex.updates <- c
-}
-
 type Dispatcher struct {
 	Arv            *arvadosclient.ArvadosClient
 	PollPeriod     time.Duration
@@ -54,7 +30,7 @@ type Dispatcher struct {
 
 	auth     arvados.APIClientAuthorization
 	mtx      sync.Mutex
-	running  map[string]*runner
+	running  map[string]*runTracker
 	throttle throttle
 }
 
@@ -73,25 +49,15 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 	defer poll.Stop()
 
 	for {
-		running := make([]string, 0, len(d.running))
-		d.mtx.Lock()
-		for uuid := range d.running {
-			running = append(running, uuid)
-		}
-		d.mtx.Unlock()
-		if len(running) == 0 {
-			// API bug: ["uuid", "not in", []] does not match everything
-			running = []string{"X"}
-		}
 		d.checkForUpdates([][]interface{}{
-			{"uuid", "in", running}})
+			{"uuid", "in", d.runningUUIDs()}})
+		d.checkForUpdates([][]interface{}{
+			{"locked_by_uuid", "=", d.auth.UUID},
+			{"uuid", "not in", d.runningUUIDs()}})
 		d.checkForUpdates([][]interface{}{
 			{"state", "=", Queued},
 			{"priority", ">", "0"},
-			{"uuid", "not in", running}})
-		d.checkForUpdates([][]interface{}{
-			{"locked_by_uuid", "=", d.auth.UUID},
-			{"uuid", "not in", running}})
+			{"uuid", "not in", d.runningUUIDs()}})
 		select {
 		case <-poll.C:
 			continue
@@ -101,21 +67,34 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 	}
 }
 
-func (d *Dispatcher) start(c arvados.Container) *runner {
-	ex := &runner{
-		updates: make(chan arvados.Container, 1),
+func (d *Dispatcher) runningUUIDs() []string {
+	d.mtx.Lock()
+	defer d.mtx.Unlock()
+	if len(d.running) == 0 {
+		// API bug: ["uuid", "not in", []] does not match everything
+		return []string{"X"}
 	}
-	if d.running == nil {
-		d.running = make(map[string]*runner)
+	uuids := make([]string, 0, len(d.running))
+	for x := range d.running {
+		uuids = append(uuids, x)
 	}
-	d.running[c.UUID] = ex
+	return uuids
+}
+
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+	updates := make(chan arvados.Container, 1)
+	tracker := &runTracker{updates: updates}
+	tracker.updates <- c
 	go func() {
-		d.RunContainer(d, c, ex.updates)
+		d.RunContainer(d, c, tracker.updates)
+
 		d.mtx.Lock()
 		delete(d.running, c.UUID)
 		d.mtx.Unlock()
 	}()
-	return ex
+	return tracker
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
@@ -140,32 +119,36 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
 
 	d.mtx.Lock()
 	defer d.mtx.Unlock()
+	if d.running == nil {
+		d.running = make(map[string]*runTracker)
+	}
+
 	for _, c := range list.Items {
-		ex, running := d.running[c.UUID]
+		tracker, running := d.running[c.UUID]
 		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
 			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if running {
 			switch c.State {
 			case Queued:
-				ex.close()
+				tracker.close()
 			case Locked, Running:
-				ex.update(c)
+				tracker.update(c)
 			case Cancelled, Complete:
-				ex.close()
+				tracker.close()
 			}
 		} else {
 			switch c.State {
 			case Queued:
 				if err := d.lock(c.UUID); err != nil {
-					log.Printf("Error locking container %s: %s", c.UUID, err)
+					log.Printf("debug: error locking container %s: %s", c.UUID, err)
 				} else {
 					c.State = Locked
-					d.start(c).update(c)
+					d.running[c.UUID] = d.start(c)
 				}
 			case Locked, Running:
-				d.start(c).update(c)
+				d.running[c.UUID] = d.start(c)
 			case Cancelled, Complete:
-				ex.close()
+				tracker.close()
 			}
 		}
 	}
@@ -192,3 +175,27 @@ func (d *Dispatcher) lock(uuid string) error {
 func (d *Dispatcher) Unlock(uuid string) error {
 	return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
+
+type runTracker struct {
+	closing bool
+	updates chan<- arvados.Container
+}
+
+func (tracker *runTracker) close() {
+	if !tracker.closing {
+		close(tracker.updates)
+	}
+	tracker.closing = true
+}
+
+func (tracker *runTracker) update(c arvados.Container) {
+	if tracker.closing {
+		return
+	}
+	select {
+	case <-tracker.updates:
+		log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+	default:
+	}
+	tracker.updates <- c
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list