[ARVADOS] updated: 1.3.0-120-gb3dbac270
Git user
git at public.curoverse.com
Tue Dec 18 15:54:41 EST 2018
Summary of changes:
README.md | 10 +-
build/package-build-dockerfiles/Makefile | 2 +-
build/package-test-dockerfiles/Makefile | 2 +-
doc/index.html.liquid | 2 +-
.../getting_started/community.html.textile.liquid | 4 +-
lib/dispatchcloud/dispatcher.go | 4 +
lib/dispatchcloud/dispatcher_test.go | 213 +++-----------
lib/dispatchcloud/node_size.go | 68 ++++-
lib/dispatchcloud/node_size_test.go | 22 ++
lib/dispatchcloud/scheduler/run_queue.go | 11 +
lib/dispatchcloud/scheduler/scheduler.go | 5 +
lib/dispatchcloud/test/queue.go | 14 +-
lib/dispatchcloud/test/stub_driver.go | 195 ++++++++++---
lib/dispatchcloud/worker/pool.go | 55 +++-
lib/dispatchcloud/worker/pool_test.go | 21 +-
lib/dispatchcloud/worker/worker.go | 1 +
sdk/cwl/arvados_cwl/arvcontainer.py | 2 +-
sdk/cwl/arvados_cwl/arvjob.py | 2 +-
sdk/cwl/arvados_cwl/runner.py | 2 +-
sdk/cwl/setup.py | 4 +-
sdk/go/arvados/contextgroup.go | 95 +++++++
sdk/go/arvados/fs_collection.go | 306 ++++++++++++++-------
sdk/go/arvados/fs_collection_test.go | 50 +++-
sdk/go/arvados/throttle.go | 17 ++
sdk/python/arvados/arvfile.py | 38 ++-
sdk/python/arvados/collection.py | 39 ++-
sdk/python/arvados/commands/put.py | 88 +++++-
sdk/python/arvados/commands/run.py | 4 +-
sdk/python/arvados/keep.py | 13 +
sdk/python/tests/test_arv_put.py | 168 ++++++++++-
sdk/python/tests/test_arvfile.py | 55 ++--
sdk/python/tests/test_collections.py | 63 ++++-
.../app/controllers/arvados/v1/links_controller.rb | 4 +-
services/api/app/models/arvados_model.rb | 2 +
services/api/app/models/collection.rb | 4 +-
...20181213183234_add_expression_index_to_links.rb | 11 +
services/api/db/structure.sql | 16 ++
services/api/lib/arvados_model_updates.rb | 17 ++
services/api/lib/record_filters.rb | 31 ++-
services/api/test/fixtures/links.yml | 14 +
.../functional/arvados/v1/links_controller_test.rb | 17 ++
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 9 +-
.../crunch-dispatch-slurm_test.go | 9 +-
services/crunch-run/crunchrun.go | 12 +-
services/crunch-run/crunchrun_test.go | 124 ++++++---
.../lib/arvbox/docker/service/controller/run | 8 +-
tools/sync-groups/sync-groups.go | 8 +-
47 files changed, 1359 insertions(+), 502 deletions(-)
create mode 100644 sdk/go/arvados/contextgroup.go
create mode 100644 sdk/go/arvados/throttle.go
create mode 100644 services/api/db/migrate/20181213183234_add_expression_index_to_links.rb
via b3dbac2705f2bcffbf346d0b7efb85e2566b11f8 (commit)
via 61379d58674c14d31beab12113cd731a2dc24fd8 (commit)
via cecff7f38bb5894efd1d8e449310b8654938bc15 (commit)
via e8f99cfef7cfbfcf1a1485d69250f24ced3fd609 (commit)
via a758d03b74f1c62f7ed442f4a4e571e5bba29ddd (commit)
via 772fc9527716fea221ae88ea3200db94f754afde (commit)
via cd68182c48ec70804a2acbf95331ec25629a9e28 (commit)
via 882727d49c58c519bb65eae92b1eaf3e37f7199c (commit)
via aab8398af23a6a01713be7c219d78bec36ffe5ad (commit)
via 79379f5d68466922964d71f03d9bbe1b918f6252 (commit)
via 32fc16adf894bb6299f94054b69ff66f133a66e7 (commit)
via 0bebb7ca32bb96b75d5996137b8e385b60f6476a (commit)
via 746deaa52d2fda99e9265fe20a9f265d8e6816f0 (commit)
via 69e1bb1544b601e103b5fce06f3ec4cbed03d272 (commit)
via d7d074d790366338a01736552e916c5e4b5cef69 (commit)
via 29fee810dd128be904354d439f8c85a3318f64f8 (commit)
via cec011b7718536de42ebd683aa96bee92cbca06c (commit)
via 6804c679b365a237621bef37eb11c40ba87c4f10 (commit)
via cd4bb0b5bde62d7fe32cc105d6311dd318ebe304 (commit)
via 55c142e4f5887810dc4c73b3498dad9e25f25347 (commit)
via cbf93e8d897448dbd52369afe89fef2392140ff1 (commit)
via 0c21d2bd2d0fd351a4e546002a249b0b748061eb (commit)
via 9085ebc570deb7387e17e64776d9694ed99b913a (commit)
via 96165e8fcea44632979ddf7db02c1b70cb95d0f4 (commit)
via 81dd4a91b279b3229fb359df6c5dbf07571083ac (commit)
via 0a6b4ed36bd06b37412f020f0622e0c89e4e1653 (commit)
via 4e32f0b140ec0ec7f96c1f9eaae00950c176ff03 (commit)
via 2aa58f31ac8fc696361214a05ab9ba75a5140b08 (commit)
via 8ff3fd06e165a275f53884d1d20287b68c1b32bd (commit)
via cd7a746df5e9bf8a5770d06410b3fe1908282a7b (commit)
via d1571f495b0e0e05c833d4666924bcb6a288b33d (commit)
via 4a093ba4a1e14275a9500f2c65dd48528bc1e095 (commit)
via 0731e1f9019fa841ef496e5e6e308e41deb7585a (commit)
via f09d4a342cced4915b76632fb996936b93f1cfc1 (commit)
via 3f76b6309bc82cb2d62feb2eab10b55aa8b257ce (commit)
via 4d27cabda122ebda4c8688580af9088c8732a074 (commit)
via 7317196f547cf8c5e0ec87d1526322c305faba30 (commit)
via f1dfa6a0288d9a0380a73349db6083a85bda15d6 (commit)
via df3485f29656a5c4e3f1d7c09ccfdf3c6274d312 (commit)
via 7ec3ab46727a04fb374debbad9e08b1f719f0ac2 (commit)
via 81b8420a261a095a269a46d965b2fc0ee6ecf793 (commit)
via f62eb24060bb89c89faaa2998206961923c7b0b4 (commit)
via c031d4145d8ab1a11463acf5b20ef4df1afe00a4 (commit)
via 471ac065323e087f2f844d21d1c39929191eba71 (commit)
via 934d880aa5d10ed3382f9924a9a9f5694b41f266 (commit)
via 18f8ceb73765a71a71449414f4b4f87ad620028a (commit)
via 665379525fdf1a4a2ec89a45dcc3203a1ecfcf22 (commit)
via 6fc7bd0626e93dd20fc58167300186e9f8820638 (commit)
via 1e02903b90dbaf1f0e9fac222f65e3969b5f0352 (commit)
via ea36c4ba2652d8b6996e2c68535c22a7dc1a3807 (commit)
via 127e57b8ea812b9d03b67c0118230a475b727158 (commit)
via a88f7ad9728ee6968367928c6d3d7613bbf290ec (commit)
via a45b256a3261fff8f168321c02e61b94ae9b4a64 (commit)
via 50f8d8487ad5156058087438b670d7c6f8a8d718 (commit)
via f929cc124fc1fde8f1a6a12679e327b34aa88bde (commit)
via 478fb1838aea03ebad17b66926ef03aac536707b (commit)
via e36e81150649a6457c9cbf0101130cfdb776336f (commit)
via 78c18757e42c40178d7a9eaf78f7b6d167bee926 (commit)
via 9b3d58d6fa3d7c300e006af16ce8072bb68eca30 (commit)
via 70d02ffd2919ffb4148cdfd8cb8566db7a01345c (commit)
via 80905cf49077ecf797f6d0ae3c375ca1a7f7df20 (commit)
via d9d145af78c61d900447434735cab7f3dc64fbdd (commit)
from 076d8a0d4885f7bfa22962a745a5aacb48fa9be1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b3dbac2705f2bcffbf346d0b7efb85e2566b11f8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 15:08:57 2018 -0500
14360: Replace OnComplete/OnCancel with ExecuteContainer callback.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 42767a159..737688023 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -121,16 +121,17 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- onComplete := func(uuid string) {
+ executeContainer := func(ctr arvados.Container) int {
mtx.Lock()
defer mtx.Unlock()
- if _, ok := waiting[uuid]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+ if _, ok := waiting[ctr.UUID]; !ok {
+ c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
}
- delete(waiting, uuid)
+ delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ return int(rand.Uint32() & 0x3)
}
n := 0
s.stubDriver.Queue = queue
@@ -138,7 +139,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
n++
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
- stubvm.CtrExit = int(rand.Uint32() & 0x3)
+ stubvm.ExecuteContainer = executeContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
@@ -147,8 +148,6 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
default:
stubvm.CrunchRunCrashRate = 0.1
}
- stubvm.OnComplete = onComplete
- stubvm.OnCancel = onComplete
}
start := time.Now()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 5c2633c49..8bdfaa947 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -28,10 +28,18 @@ type StubDriver struct {
HostKey ssh.Signer
AuthorizedKeys []ssh.PublicKey
- // SetupVM, if set, is called upon creation of each new VM.
- SetupVM func(*StubVM)
+ // SetupVM, if set, is called upon creation of each new
+ // StubVM. This is the caller's opportunity to customize the
+ // VM's error rate and other behaviors.
+ SetupVM func(*StubVM)
+
+ // StubVM's fake crunch-run uses this Queue to read and update
+ // container state.
+ Queue *Queue
+
+ // Frequency of artificially introduced errors on calls to
+ // Destroy. 0=always succeed, 1=always fail.
ErrorRateDestroy float64
- Queue *Queue
instanceSets []*StubInstanceSet
}
@@ -120,9 +128,7 @@ type StubVM struct {
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
- CtrExit int
- OnCancel func(string)
- OnComplete func(string)
+ ExecuteContainer func(arvados.Container) int
sis *StubInstanceSet
id cloud.InstanceID
@@ -196,22 +202,18 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
logger.Print("[test] container was killed")
return
}
+ if svm.ExecuteContainer != nil {
+ ctr.ExitCode = svm.ExecuteContainer(ctr)
+ }
// TODO: Check whether the stub instance has
// been destroyed, and if so, don't call
- // onComplete. Then "container finished twice"
- // can be classified as a bug.
+ // queue.Notify. Then "container finished
+ // twice" can be classified as a bug.
if crashluck < svm.CrunchRunCrashRate {
logger.Print("[test] crashing crunch-run stub")
- if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
- svm.OnCancel(uuid)
- }
} else {
ctr.State = arvados.ContainerStateComplete
- ctr.ExitCode = svm.CtrExit
queue.Notify(ctr)
- if svm.OnComplete != nil {
- svm.OnComplete(uuid)
- }
}
logger.Print("[test] exiting crunch-run stub")
svm.Lock()
commit 61379d58674c14d31beab12113cd731a2dc24fd8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 14:11:23 2018 -0500
14360: Call ChooseType just once per container.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
index ceca8c63f..e18a2b536 100644
--- a/lib/dispatchcloud/test/queue.go
+++ b/lib/dispatchcloud/test/queue.go
@@ -134,10 +134,15 @@ func (q *Queue) Update() error {
if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
continue
}
- it, _ := q.ChooseType(&ctr)
- upd[ctr.UUID] = container.QueueEnt{
- Container: ctr,
- InstanceType: it,
+ if ent, ok := upd[ctr.UUID]; ok {
+ ent.Container = ctr
+ upd[ctr.UUID] = ent
+ } else {
+ it, _ := q.ChooseType(&ctr)
+ upd[ctr.UUID] = container.QueueEnt{
+ Container: ctr,
+ InstanceType: it,
+ }
}
}
q.entries = upd
commit cecff7f38bb5894efd1d8e449310b8654938bc15
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 13:54:40 2018 -0500
14360: Remove log spam.
Just don't reserve space for an image if there is no image.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/node_size.go b/lib/dispatchcloud/node_size.go
index 2fac799f4..d7f458561 100644
--- a/lib/dispatchcloud/node_size.go
+++ b/lib/dispatchcloud/node_size.go
@@ -6,7 +6,6 @@ package dispatchcloud
import (
"errors"
- "log"
"regexp"
"sort"
"strconv"
@@ -34,12 +33,10 @@ var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
func estimateDockerImageSize(collectionPDH string) int64 {
m := pdhRegexp.FindStringSubmatch(collectionPDH)
if m == nil {
- log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
return 0
}
n, err := strconv.ParseInt(m[1], 10, 64)
if err != nil || n < 122 {
- log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
return 0
}
// To avoid having to fetch the collection, take advantage of
commit e8f99cfef7cfbfcf1a1485d69250f24ced3fd609
Merge: a758d03b7 882727d49
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 10:02:55 2018 -0500
Merge branch 'master' into 14360-dispatch-cloud
refs #14360
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --cc lib/dispatchcloud/node_size.go
index e77c862b3,339e042c1..2fac799f4
--- a/lib/dispatchcloud/node_size.go
+++ b/lib/dispatchcloud/node_size.go
@@@ -6,7 -6,13 +6,10 @@@ package dispatchclou
import (
"errors"
+ "log"
- "os/exec"
+ "regexp"
"sort"
+ "strconv"
- "strings"
- "time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
commit a758d03b74f1c62f7ed442f4a4e571e5bba29ddd
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 02:59:05 2018 -0500
14360: Shutdown between tests to eliminate leaking logs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 1a47e8490..bea6ed3cc 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -51,6 +51,7 @@ type dispatcher struct {
setupOnce sync.Once
stop chan struct{}
+ stopped chan struct{}
}
// Start starts the dispatcher. Start can be called multiple times
@@ -79,6 +80,7 @@ func (disp *dispatcher) Close() {
case disp.stop <- struct{}{}:
default:
}
+ <-disp.stopped
}
// Make a worker.Executor for the given instance.
@@ -109,6 +111,7 @@ func (disp *dispatcher) initialize() {
}
}
disp.stop = make(chan struct{}, 1)
+ disp.stopped = make(chan struct{})
disp.logger = logrus.StandardLogger()
if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
@@ -144,6 +147,7 @@ func (disp *dispatcher) initialize() {
}
func (disp *dispatcher) run() {
+ defer close(disp.stopped)
defer disp.instanceSet.Stop()
staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 7d3ab7930..42767a159 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -179,8 +179,12 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
- drivers["test"] = s.stubDriver
s.cluster.ManagementToken = "abcdefgh"
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
for _, token := range []string{"abc", ""} {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
if token != "" {
@@ -197,8 +201,12 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
}
func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
- drivers["test"] = s.stubDriver
s.cluster.ManagementToken = ""
+ drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
+
for _, token := range []string{"abc", ""} {
req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
if token != "" {
@@ -214,6 +222,9 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
drivers["test"] = s.stubDriver
+ s.disp.setupOnce.Do(s.disp.initialize)
+ s.disp.queue = &test.Queue{}
+ go s.disp.run()
type instance struct {
Instance string
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 9a5fb10d5..3971a5319 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -37,6 +37,7 @@ type Scheduler struct {
runOnce sync.Once
stop chan struct{}
+ stopped chan struct{}
}
// New returns a new unstarted Scheduler.
@@ -51,6 +52,7 @@ func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, stale
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
stop: make(chan struct{}),
+ stopped: make(chan struct{}),
locking: map[string]bool{},
}
}
@@ -64,9 +66,12 @@ func (sch *Scheduler) Start() {
// Stop.
func (sch *Scheduler) Stop() {
close(sch.stop)
+ <-sch.stopped
}
func (sch *Scheduler) run() {
+ defer close(sch.stopped)
+
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
commit 772fc9527716fea221ae88ea3200db94f754afde
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 02:27:27 2018 -0500
14360: Clean up stub driver.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 0eaf63d90..7d3ab7930 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -6,19 +6,14 @@ package dispatchcloud
import (
"encoding/json"
- "fmt"
- "io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"os"
- "regexp"
- "strings"
"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Sirupsen/logrus"
@@ -28,141 +23,6 @@ import (
var _ = check.Suite(&DispatcherSuite{})
-// fakeCloud provides an exec method that can be used as a
-// test.StubExecFunc. It calls the provided makeVM func when called
-// with a previously unseen instance ID. Calls to exec are passed on
-// to the *fakeVM for the appropriate instance ID.
-type fakeCloud struct {
- queue *test.Queue
- makeVM func(cloud.Instance) *fakeVM
- onComplete func(string)
- onCancel func(string)
- vms map[cloud.InstanceID]*fakeVM
- sync.Mutex
-}
-
-func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- fc.Lock()
- fvm, ok := fc.vms[inst.ID()]
- if !ok {
- if fc.vms == nil {
- fc.vms = make(map[cloud.InstanceID]*fakeVM)
- }
- fvm = fc.makeVM(inst)
- fc.vms[inst.ID()] = fvm
- }
- fc.Unlock()
- return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
-}
-
-// fakeVM is a fake VM with configurable delays and failure modes.
-type fakeVM struct {
- boot time.Time
- broken time.Time
- crunchRunMissing bool
- crunchRunCrashRate float64
- crunchRunDetachDelay time.Duration
- ctrExit int
- running map[string]bool
- completed []string
- sync.Mutex
-}
-
-func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
- if eta := fvm.boot.Sub(time.Now()); eta > 0 {
- fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
- return 1
- }
- if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
- fmt.Fprintf(stderr, "cannot fork\n")
- return 2
- }
- if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
- fmt.Fprint(stderr, "crunch-run: command not found\n")
- return 1
- }
- if strings.HasPrefix(command, "crunch-run --detach ") {
- fvm.Lock()
- if fvm.running == nil {
- fvm.running = map[string]bool{}
- }
- fvm.running[uuid] = true
- fvm.Unlock()
- time.Sleep(fvm.crunchRunDetachDelay)
- fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithField("ContainerUUID", uuid)
- logger.Printf("[test] starting crunch-run stub")
- go func() {
- crashluck := rand.Float64()
- ctr, ok := queue.Get(uuid)
- if !ok {
- logger.Print("[test] container not in queue")
- return
- }
- if crashluck > fvm.crunchRunCrashRate/2 {
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- ctr.State = arvados.ContainerStateRunning
- queue.Notify(ctr)
- }
-
- time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
- fvm.Lock()
- _, running := fvm.running[uuid]
- fvm.Unlock()
- if !running {
- logger.Print("[test] container was killed")
- return
- }
- // TODO: Check whether the stub instance has
- // been destroyed, and if so, don't call
- // onComplete. Then "container finished twice"
- // can be classified as a bug.
- if crashluck < fvm.crunchRunCrashRate {
- logger.Print("[test] crashing crunch-run stub")
- if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
- onCancel(uuid)
- }
- } else {
- ctr.State = arvados.ContainerStateComplete
- ctr.ExitCode = fvm.ctrExit
- queue.Notify(ctr)
- if onComplete != nil {
- onComplete(uuid)
- }
- }
- logger.Print("[test] exiting crunch-run stub")
- fvm.Lock()
- defer fvm.Unlock()
- delete(fvm.running, uuid)
- }()
- return 0
- }
- if command == "crunch-run --list" {
- fvm.Lock()
- defer fvm.Unlock()
- for uuid := range fvm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
- }
- return 0
- }
- if strings.HasPrefix(command, "crunch-run --kill ") {
- fvm.Lock()
- defer fvm.Unlock()
- if fvm.running[uuid] {
- delete(fvm.running, uuid)
- } else {
- fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
- }
- return 0
- }
- if command == "true" {
- return 0
- }
- fmt.Fprintf(stderr, "%q: command not found", command)
- return 1
-}
-
type DispatcherSuite struct {
cluster *arvados.Cluster
instanceSet *test.LameInstanceSet
@@ -183,13 +43,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
- c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
- return 1
- },
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
-
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
ErrorRateDestroy: 0.1,
}
@@ -278,29 +133,23 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
}
}
n := 0
- fc := &fakeCloud{
- queue: queue,
- makeVM: func(inst cloud.Instance) *fakeVM {
- n++
- fvm := &fakeVM{
- boot: time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
- crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
- ctrExit: int(rand.Uint32() & 0x3),
- }
- switch n % 7 {
- case 0:
- fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
- case 1:
- fvm.crunchRunMissing = true
- default:
- fvm.crunchRunCrashRate = 0.1
- }
- return fvm
- },
- onComplete: onComplete,
- onCancel: onComplete,
+ s.stubDriver.Queue = queue
+ s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+ n++
+ stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+ stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+ stubvm.CtrExit = int(rand.Uint32() & 0x3)
+ switch n % 7 {
+ case 0:
+ stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+ case 1:
+ stubvm.CrunchRunMissing = true
+ default:
+ stubvm.CrunchRunCrashRate = 0.1
+ }
+ stubvm.OnComplete = onComplete
+ stubvm.OnCancel = onComplete
}
- s.stubDriver.Exec = fc.exec
start := time.Now()
go s.disp.run()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 808d7bd3a..5c2633c49 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -10,25 +10,28 @@ import (
"fmt"
"io"
math_rand "math/rand"
+ "regexp"
+ "strings"
"sync"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
"github.com/mitchellh/mapstructure"
"golang.org/x/crypto/ssh"
)
-type StubExecFunc func(instance cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
-
// A StubDriver implements cloud.Driver by setting up local SSH
-// servers that pass their command execution requests to the provided
-// SSHExecFunc.
+// servers that do fake command executions.
type StubDriver struct {
- Exec StubExecFunc
HostKey ssh.Signer
AuthorizedKeys []ssh.PublicKey
+ // SetupVM, if set, is called upon creation of each new VM.
+ SetupVM func(*StubVM)
ErrorRateDestroy float64
+ Queue *Queue
instanceSets []*StubInstanceSet
}
@@ -37,7 +40,7 @@ type StubDriver struct {
func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
sis := StubInstanceSet{
driver: sd,
- servers: map[cloud.InstanceID]*stubServer{},
+ servers: map[cloud.InstanceID]*StubVM{},
}
sd.instanceSets = append(sd.instanceSets, &sis)
return &sis, mapstructure.Decode(params, &sis)
@@ -52,7 +55,7 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
type StubInstanceSet struct {
driver *StubDriver
- servers map[cloud.InstanceID]*stubServer
+ servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
}
@@ -67,23 +70,22 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
if authKey != nil {
ak = append([]ssh.PublicKey{authKey}, ak...)
}
- var ss *stubServer
- ss = &stubServer{
+ svm := &StubVM{
sis: sis,
id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
tags: copyTags(tags),
providerType: it.ProviderType,
- SSHService: SSHService{
- HostKey: sis.driver.HostKey,
- AuthorizedKeys: ak,
- Exec: func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
- return sis.driver.Exec(ss.Instance(), command, stdin, stdout, stderr)
- },
- },
}
-
- sis.servers[ss.id] = ss
- return ss.Instance(), nil
+ svm.SSHService = SSHService{
+ HostKey: sis.driver.HostKey,
+ AuthorizedKeys: ak,
+ Exec: svm.Exec,
+ }
+ if setup := sis.driver.SetupVM; setup != nil {
+ setup(svm)
+ }
+ sis.servers[svm.id] = svm
+ return svm.Instance(), nil
}
func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
@@ -105,47 +107,152 @@ func (sis *StubInstanceSet) Stop() {
sis.stopped = true
}
-// stubServer is a fake server that runs an SSH service. It represents
-// a VM running in a fake cloud.
+// StubVM is a fake server that runs an SSH service. It represents a
+// VM running in a fake cloud.
//
// Note this is distinct from a stubInstance, which is a snapshot of
-// the VM's metadata. As with a VM in a real cloud, the stubServer
-// keeps running (and might change IP addresses, shut down, etc.)
-// without updating any stubInstances that have been returned to
-// callers.
-type stubServer struct {
+// the VM's metadata. Like a VM in a real cloud, a StubVM keeps
+// running (and might change IP addresses, shut down, etc.) without
+// updating any stubInstances that have been returned to callers.
+type StubVM struct {
+ Boot time.Time
+ Broken time.Time
+ CrunchRunMissing bool
+ CrunchRunCrashRate float64
+ CrunchRunDetachDelay time.Duration
+ CtrExit int
+ OnCancel func(string)
+ OnComplete func(string)
+
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
providerType string
SSHService SSHService
+ running map[string]bool
sync.Mutex
}
-func (ss *stubServer) Instance() stubInstance {
- ss.Lock()
- defer ss.Unlock()
+func (svm *StubVM) Instance() stubInstance {
+ svm.Lock()
+ defer svm.Unlock()
return stubInstance{
- ss: ss,
- addr: ss.SSHService.Address(),
+ svm: svm,
+ addr: svm.SSHService.Address(),
// We deliberately return a cached/stale copy of the
// real tags here, so that (Instance)Tags() sometimes
// returns old data after a call to
// (Instance)SetTags(). This is permitted by the
// driver interface, and this might help remind
// callers that they need to tolerate it.
- tags: copyTags(ss.tags),
+ tags: copyTags(svm.tags),
+ }
+}
+
+func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ queue := svm.sis.driver.Queue
+ uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+ if eta := svm.Boot.Sub(time.Now()); eta > 0 {
+ fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+ return 1
+ }
+ if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
+ fmt.Fprintf(stderr, "cannot fork\n")
+ return 2
+ }
+ if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
+ fmt.Fprint(stderr, "crunch-run: command not found\n")
+ return 1
+ }
+ if strings.HasPrefix(command, "crunch-run --detach ") {
+ svm.Lock()
+ if svm.running == nil {
+ svm.running = map[string]bool{}
+ }
+ svm.running[uuid] = true
+ svm.Unlock()
+ time.Sleep(svm.CrunchRunDetachDelay)
+ fmt.Fprintf(stderr, "starting %s\n", uuid)
+ logger := logrus.WithField("ContainerUUID", uuid)
+ logger.Printf("[test] starting crunch-run stub")
+ go func() {
+ crashluck := math_rand.Float64()
+ ctr, ok := queue.Get(uuid)
+ if !ok {
+ logger.Print("[test] container not in queue")
+ return
+ }
+ if crashluck > svm.CrunchRunCrashRate/2 {
+ time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+ ctr.State = arvados.ContainerStateRunning
+ queue.Notify(ctr)
+ }
+
+ time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+ svm.Lock()
+ _, running := svm.running[uuid]
+ svm.Unlock()
+ if !running {
+ logger.Print("[test] container was killed")
+ return
+ }
+ // TODO: Check whether the stub instance has
+ // been destroyed, and if so, don't call
+ // onComplete. Then "container finished twice"
+ // can be classified as a bug.
+ if crashluck < svm.CrunchRunCrashRate {
+ logger.Print("[test] crashing crunch-run stub")
+ if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
+ svm.OnCancel(uuid)
+ }
+ } else {
+ ctr.State = arvados.ContainerStateComplete
+ ctr.ExitCode = svm.CtrExit
+ queue.Notify(ctr)
+ if svm.OnComplete != nil {
+ svm.OnComplete(uuid)
+ }
+ }
+ logger.Print("[test] exiting crunch-run stub")
+ svm.Lock()
+ defer svm.Unlock()
+ delete(svm.running, uuid)
+ }()
+ return 0
+ }
+ if command == "crunch-run --list" {
+ svm.Lock()
+ defer svm.Unlock()
+ for uuid := range svm.running {
+ fmt.Fprintf(stdout, "%s\n", uuid)
+ }
+ return 0
+ }
+ if strings.HasPrefix(command, "crunch-run --kill ") {
+ svm.Lock()
+ defer svm.Unlock()
+ if svm.running[uuid] {
+ delete(svm.running, uuid)
+ } else {
+ fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+ }
+ return 0
+ }
+ if command == "true" {
+ return 0
}
+ fmt.Fprintf(stderr, "%q: command not found", command)
+ return 1
}
type stubInstance struct {
- ss *stubServer
+ svm *StubVM
addr string
tags cloud.InstanceTags
}
func (si stubInstance) ID() cloud.InstanceID {
- return si.ss.id
+ return si.svm.id
}
func (si stubInstance) Address() string {
@@ -153,28 +260,28 @@ func (si stubInstance) Address() string {
}
func (si stubInstance) Destroy() error {
- if math_rand.Float64() < si.ss.sis.driver.ErrorRateDestroy {
+ if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
- si.ss.SSHService.Close()
- sis := si.ss.sis
+ si.svm.SSHService.Close()
+ sis := si.svm.sis
sis.mtx.Lock()
defer sis.mtx.Unlock()
- delete(sis.servers, si.ss.id)
+ delete(sis.servers, si.svm.id)
return nil
}
func (si stubInstance) ProviderType() string {
- return si.ss.providerType
+ return si.svm.providerType
}
func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
tags = copyTags(tags)
- ss := si.ss
+ svm := si.svm
go func() {
- ss.Lock()
- defer ss.Unlock()
- ss.tags = tags
+ svm.Lock()
+ defer svm.Unlock()
+ svm.tags = tags
}()
return nil
}
@@ -184,7 +291,7 @@ func (si stubInstance) Tags() cloud.InstanceTags {
}
func (si stubInstance) String() string {
- return string(si.ss.id)
+ return string(si.svm.id)
}
func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
@@ -193,7 +300,7 @@ func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) erro
if err != nil {
return err
}
- sig, err := si.ss.sis.driver.HostKey.Sign(rand.Reader, buf)
+ sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
if err != nil {
return err
}
commit cd68182c48ec70804a2acbf95331ec25629a9e28
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Dec 17 23:37:47 2018 -0500
14360: Avoid overreporting instances during Create/List race.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index a43b96ed8..722d4e918 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -121,7 +121,7 @@ type Pool struct {
// private state
subscribers map[<-chan struct{}]chan<- struct{}
- creating map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+ creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
workers map[cloud.InstanceID]*worker
loaded bool // loaded list of instances from InstanceSet at least once
exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -171,25 +171,41 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
// Unallocated returns the number of unallocated (creating + booting +
// idle + unknown) workers for each instance type.
-//
-// The returned counts should be interpreted as upper bounds, rather
-// than exact counts: they are sometimes artificially high when a
-// newly created instance appears in the driver's Instances() list
-// before the Create() call returns.
func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- u := map[arvados.InstanceType]int{}
- for it, c := range wp.creating {
- u[it] = c
+ unalloc := map[arvados.InstanceType]int{}
+ creating := map[arvados.InstanceType]int{}
+ for it, times := range wp.creating {
+ creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
- u[wkr.instType]++
+ if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ continue
}
+ it := wkr.instType
+ unalloc[it]++
+ if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+ // If up to N new workers appear in
+ // Instances() while we are waiting for N
+ // Create() calls to complete, we assume we're
+ // just seeing a race between Instances() and
+ // Create() responses.
+ //
+ // The other common reason why nodes have
+ // state==Unknown is that they appeared at
+ // startup, before any Create calls. They
+ // don't match the above timing condition, so
+ // we never mistakenly attribute them to
+ // pending Create calls.
+ creating[it]--
+ }
+ }
+ for it, c := range creating {
+ unalloc[it] += c
}
- return u
+ return unalloc
}
// Create a new instance with the given type, and add it to the worker
@@ -204,13 +220,21 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
return wp.atQuotaErr
}
tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
- wp.creating[it]++
+ now := time.Now()
+ wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wp.creating[it]--
+ // Remove our timestamp marker from wp.creating
+ for i, t := range wp.creating[it] {
+ if t == now {
+ copy(wp.creating[it][i:], wp.creating[it][i+1:])
+ wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
+ break
+ }
+ }
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
wp.atQuotaUntil = time.Now().Add(time.Minute)
@@ -266,6 +290,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
state: initialState,
instance: inst,
instType: it,
+ appeared: now,
probed: now,
busy: now,
updated: now,
@@ -579,7 +604,7 @@ func (wp *Pool) Instances() []InstanceView {
}
func (wp *Pool) setup() {
- wp.creating = map[arvados.InstanceType]int{}
+ wp.creating = map[arvados.InstanceType][]time.Time{}
wp.exited = map[string]time.Time{}
wp.workers = map[cloud.InstanceID]*worker{}
wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index f8667e06c..3867e2c63 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -68,23 +68,16 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
pool.Create(type2)
c.Check(pool.Unallocated()[type1], check.Equals, 1)
c.Check(pool.Unallocated()[type2], check.Equals, 2)
- // Unblock the pending Create calls and (before calling Sync!)
- // wait for the pool to process the returned instances.
+
+ // Unblock the pending Create calls.
go lameInstanceSet.Release(3)
- suite.wait(c, pool, notify, func() bool {
- list, err := lameInstanceSet.Instances(nil)
- return err == nil && len(list) == 3
- })
- c.Check(pool.Unallocated()[type1], check.Equals, 1)
- c.Check(pool.Unallocated()[type2], check.Equals, 2)
- pool.getInstancesAndSync()
- // Returned counts can be temporarily higher than 1 and 2 if
- // poll ran before Create() returned.
- c.Check(pool.Unallocated()[type1], check.Not(less), 1)
- c.Check(pool.Unallocated()[type2], check.Not(less), 2)
+ // Wait for each instance to either return from its Create
+ // call, or show up in a poll.
suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
+ pool.mtx.RLock()
+ defer pool.mtx.RUnlock()
+ return len(pool.workers) == 3
})
c.Check(pool.Shutdown(type2), check.Equals, true)
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 85104a13a..a0a61c159 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -64,6 +64,7 @@ type worker struct {
instType arvados.InstanceType
vcpus int64
memory int64
+ appeared time.Time
probed time.Time
updated time.Time
busy time.Time
commit aab8398af23a6a01713be7c219d78bec36ffe5ad
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Dec 17 15:14:11 2018 -0500
14360: Comment stubServer.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index bfe0f8f26..808d7bd3a 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -105,6 +105,14 @@ func (sis *StubInstanceSet) Stop() {
sis.stopped = true
}
+// stubServer is a fake server that runs an SSH service. It represents
+// a VM running in a fake cloud.
+//
+// Note this is distinct from a stubInstance, which is a snapshot of
+// the VM's metadata. As with a VM in a real cloud, the stubServer
+// keeps running (and might change IP addresses, shut down, etc.)
+// without updating any stubInstances that have been returned to
+// callers.
type stubServer struct {
sis *StubInstanceSet
id cloud.InstanceID
commit 32fc16adf894bb6299f94054b69ff66f133a66e7
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Dec 17 13:49:43 2018 -0500
14360: Locking comment.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
index fda04d52b..ceca8c63f 100644
--- a/lib/dispatchcloud/test/queue.go
+++ b/lib/dispatchcloud/test/queue.go
@@ -95,6 +95,7 @@ func (q *Queue) Unsubscribe(ch <-chan struct{}) {
delete(q.subscribers, ch)
}
+// caller must have lock.
func (q *Queue) notify() {
for _, ch := range q.subscribers {
select {
commit 0bebb7ca32bb96b75d5996137b8e385b60f6476a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Dec 17 13:47:14 2018 -0500
14360: Fix log spam on normal race condition.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 7b432adc6..c8c25131e 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -131,6 +131,17 @@ func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
logger.Debug("locking in progress, doing nothing")
return
}
+ if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
+ // This happens if the container has been cancelled or
+ // locked since runQueue called sch.queue.Entries(),
+ // possibly by a bgLock() call from a previous
+ // runQueue iteration. In any case, we will respond
+ // appropriately on the next runQueue iteration, which
+ // will have already been triggered by the queue
+ // update.
+ logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
+ return
+ }
sch.locking[uuid] = true
go func() {
defer func() {
commit 69e1bb1544b601e103b5fce06f3ec4cbed03d272
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Dec 17 10:57:40 2018 -0500
14360: Comment TestDispatchToStubDriver.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index ed987154e..0eaf63d90 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -235,6 +235,10 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
s.disp.Close()
}
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list