[ARVADOS] updated: cff6cd57cbbaa032602ca2fb930c78812c5824dc
Git user
git at public.curoverse.com
Fri Feb 10 13:11:55 EST 2017
Summary of changes:
sdk/go/dispatch/dispatch.go | 171 ++++++++++++++++++++++++++------------------
1 file changed, 102 insertions(+), 69 deletions(-)
via cff6cd57cbbaa032602ca2fb930c78812c5824dc (commit)
via 6b1bf77727379c7ffaf1620399c37ea0106a0909 (commit)
from ddee3839f8a82b889f84171e2354108cb20f93e0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cff6cd57cbbaa032602ca2fb930c78812c5824dc
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Feb 10 13:11:52 2017 -0500
10701: Add back MinRetryPeriod throttle. Update comments/identifiers.
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index f805298..722d4ee 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -1,6 +1,5 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
package dispatch
import (
@@ -23,10 +22,17 @@ const (
)
type Dispatcher struct {
- Arv *arvadosclient.ArvadosClient
- PollPeriod time.Duration
+ Arv *arvadosclient.ArvadosClient
+
+ // Queue polling frequency
+ PollPeriod time.Duration
+
+ // Time to wait between successive attempts to run the same container
MinRetryPeriod time.Duration
- RunContainer Runner
+
+ // Func that implements the container lifecycle. Must be set
+ // to a non-nil DispatchFunc before calling Run().
+ RunContainer DispatchFunc
auth arvados.APIClientAuthorization
mtx sync.Mutex
@@ -34,17 +40,30 @@ type Dispatcher struct {
throttle throttle
}
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
}
+ d.throttle.hold = d.MinRetryPeriod
+
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
@@ -139,13 +158,20 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
} else {
switch c.State {
case Queued:
- if err := d.lock(c.UUID); err != nil {
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
+ err := d.lock(c.UUID)
+ if err != nil {
log.Printf("debug: error locking container %s: %s", c.UUID, err)
- } else {
- c.State = Locked
- d.running[c.UUID] = d.start(c)
+ break
}
+ c.State = Locked
+ d.running[c.UUID] = d.start(c)
case Locked, Running:
+ if !d.throttle.Check(c.UUID) {
+ break
+ }
d.running[c.UUID] = d.start(c)
case Cancelled, Complete:
tracker.close()
commit 6b1bf77727379c7ffaf1620399c37ea0106a0909
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Feb 10 04:56:28 2017 -0500
10701: cleanup
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 7342c3b..f805298 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -22,30 +22,6 @@ const (
Cancelled = arvados.ContainerStateCancelled
)
-type runner struct {
- closing bool
- updates chan arvados.Container
-}
-
-func (ex *runner) close() {
- if !ex.closing {
- close(ex.updates)
- }
- ex.closing = true
-}
-
-func (ex *runner) update(c arvados.Container) {
- if ex.closing {
- return
- }
- select {
- case <-ex.updates:
- log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
- default:
- }
- ex.updates <- c
-}
-
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
PollPeriod time.Duration
@@ -54,7 +30,7 @@ type Dispatcher struct {
auth arvados.APIClientAuthorization
mtx sync.Mutex
- running map[string]*runner
+ running map[string]*runTracker
throttle throttle
}
@@ -73,25 +49,15 @@ func (d *Dispatcher) Run(ctx context.Context) error {
defer poll.Stop()
for {
- running := make([]string, 0, len(d.running))
- d.mtx.Lock()
- for uuid := range d.running {
- running = append(running, uuid)
- }
- d.mtx.Unlock()
- if len(running) == 0 {
- // API bug: ["uuid", "not in", []] does not match everything
- running = []string{"X"}
- }
d.checkForUpdates([][]interface{}{
- {"uuid", "in", running}})
+ {"uuid", "in", d.runningUUIDs()}})
+ d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID},
+ {"uuid", "not in", d.runningUUIDs()}})
d.checkForUpdates([][]interface{}{
{"state", "=", Queued},
{"priority", ">", "0"},
- {"uuid", "not in", running}})
- d.checkForUpdates([][]interface{}{
- {"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", running}})
+ {"uuid", "not in", d.runningUUIDs()}})
select {
case <-poll.C:
continue
@@ -101,21 +67,34 @@ func (d *Dispatcher) Run(ctx context.Context) error {
}
}
-func (d *Dispatcher) start(c arvados.Container) *runner {
- ex := &runner{
- updates: make(chan arvados.Container, 1),
+func (d *Dispatcher) runningUUIDs() []string {
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if len(d.running) == 0 {
+ // API bug: ["uuid", "not in", []] does not match everything
+ return []string{"X"}
}
- if d.running == nil {
- d.running = make(map[string]*runner)
+ uuids := make([]string, 0, len(d.running))
+ for x := range d.running {
+ uuids = append(uuids, x)
}
- d.running[c.UUID] = ex
+ return uuids
+}
+
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+ updates := make(chan arvados.Container, 1)
+ tracker := &runTracker{updates: updates}
+ tracker.updates <- c
go func() {
- d.RunContainer(d, c, ex.updates)
+ d.RunContainer(d, c, tracker.updates)
+
d.mtx.Lock()
delete(d.running, c.UUID)
d.mtx.Unlock()
}()
- return ex
+ return tracker
}
func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
@@ -140,32 +119,36 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
d.mtx.Lock()
defer d.mtx.Unlock()
+ if d.running == nil {
+ d.running = make(map[string]*runTracker)
+ }
+
for _, c := range list.Items {
- ex, running := d.running[c.UUID]
+ tracker, running := d.running[c.UUID]
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if running {
switch c.State {
case Queued:
- ex.close()
+ tracker.close()
case Locked, Running:
- ex.update(c)
+ tracker.update(c)
case Cancelled, Complete:
- ex.close()
+ tracker.close()
}
} else {
switch c.State {
case Queued:
if err := d.lock(c.UUID); err != nil {
- log.Printf("Error locking container %s: %s", c.UUID, err)
+ log.Printf("debug: error locking container %s: %s", c.UUID, err)
} else {
c.State = Locked
- d.start(c).update(c)
+ d.running[c.UUID] = d.start(c)
}
case Locked, Running:
- d.start(c).update(c)
+ d.running[c.UUID] = d.start(c)
case Cancelled, Complete:
- ex.close()
+ tracker.close()
}
}
}
@@ -192,3 +175,27 @@ func (d *Dispatcher) lock(uuid string) error {
func (d *Dispatcher) Unlock(uuid string) error {
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
+
+type runTracker struct {
+ closing bool
+ updates chan<- arvados.Container
+}
+
+func (tracker *runTracker) close() {
+ if !tracker.closing {
+ close(tracker.updates)
+ }
+ tracker.closing = true
+}
+
+func (tracker *runTracker) update(c arvados.Container) {
+ if tracker.closing {
+ return
+ }
+ select {
+ case <-tracker.updates:
+ log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ default:
+ }
+ tracker.updates <- c
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list