[ARVADOS] updated: 1.3.0-120-gb3dbac270

Git user git at public.curoverse.com
Tue Dec 18 15:54:41 EST 2018


Summary of changes:
 README.md                                          |  10 +-
 build/package-build-dockerfiles/Makefile           |   2 +-
 build/package-test-dockerfiles/Makefile            |   2 +-
 doc/index.html.liquid                              |   2 +-
 .../getting_started/community.html.textile.liquid  |   4 +-
 lib/dispatchcloud/dispatcher.go                    |   4 +
 lib/dispatchcloud/dispatcher_test.go               | 213 +++-----------
 lib/dispatchcloud/node_size.go                     |  68 ++++-
 lib/dispatchcloud/node_size_test.go                |  22 ++
 lib/dispatchcloud/scheduler/run_queue.go           |  11 +
 lib/dispatchcloud/scheduler/scheduler.go           |   5 +
 lib/dispatchcloud/test/queue.go                    |  14 +-
 lib/dispatchcloud/test/stub_driver.go              | 195 ++++++++++---
 lib/dispatchcloud/worker/pool.go                   |  55 +++-
 lib/dispatchcloud/worker/pool_test.go              |  21 +-
 lib/dispatchcloud/worker/worker.go                 |   1 +
 sdk/cwl/arvados_cwl/arvcontainer.py                |   2 +-
 sdk/cwl/arvados_cwl/arvjob.py                      |   2 +-
 sdk/cwl/arvados_cwl/runner.py                      |   2 +-
 sdk/cwl/setup.py                                   |   4 +-
 sdk/go/arvados/contextgroup.go                     |  95 +++++++
 sdk/go/arvados/fs_collection.go                    | 306 ++++++++++++++-------
 sdk/go/arvados/fs_collection_test.go               |  50 +++-
 sdk/go/arvados/throttle.go                         |  17 ++
 sdk/python/arvados/arvfile.py                      |  38 ++-
 sdk/python/arvados/collection.py                   |  39 ++-
 sdk/python/arvados/commands/put.py                 |  88 +++++-
 sdk/python/arvados/commands/run.py                 |   4 +-
 sdk/python/arvados/keep.py                         |  13 +
 sdk/python/tests/test_arv_put.py                   | 168 ++++++++++-
 sdk/python/tests/test_arvfile.py                   |  55 ++--
 sdk/python/tests/test_collections.py               |  63 ++++-
 .../app/controllers/arvados/v1/links_controller.rb |   4 +-
 services/api/app/models/arvados_model.rb           |   2 +
 services/api/app/models/collection.rb              |   4 +-
 ...20181213183234_add_expression_index_to_links.rb |  11 +
 services/api/db/structure.sql                      |  16 ++
 services/api/lib/arvados_model_updates.rb          |  17 ++
 services/api/lib/record_filters.rb                 |  31 ++-
 services/api/test/fixtures/links.yml               |  14 +
 .../functional/arvados/v1/links_controller_test.rb |  17 ++
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |   9 +-
 .../crunch-dispatch-slurm_test.go                  |   9 +-
 services/crunch-run/crunchrun.go                   |  12 +-
 services/crunch-run/crunchrun_test.go              | 124 ++++++---
 .../lib/arvbox/docker/service/controller/run       |   8 +-
 tools/sync-groups/sync-groups.go                   |   8 +-
 47 files changed, 1359 insertions(+), 502 deletions(-)
 create mode 100644 sdk/go/arvados/contextgroup.go
 create mode 100644 sdk/go/arvados/throttle.go
 create mode 100644 services/api/db/migrate/20181213183234_add_expression_index_to_links.rb

       via  b3dbac2705f2bcffbf346d0b7efb85e2566b11f8 (commit)
       via  61379d58674c14d31beab12113cd731a2dc24fd8 (commit)
       via  cecff7f38bb5894efd1d8e449310b8654938bc15 (commit)
       via  e8f99cfef7cfbfcf1a1485d69250f24ced3fd609 (commit)
       via  a758d03b74f1c62f7ed442f4a4e571e5bba29ddd (commit)
       via  772fc9527716fea221ae88ea3200db94f754afde (commit)
       via  cd68182c48ec70804a2acbf95331ec25629a9e28 (commit)
       via  882727d49c58c519bb65eae92b1eaf3e37f7199c (commit)
       via  aab8398af23a6a01713be7c219d78bec36ffe5ad (commit)
       via  79379f5d68466922964d71f03d9bbe1b918f6252 (commit)
       via  32fc16adf894bb6299f94054b69ff66f133a66e7 (commit)
       via  0bebb7ca32bb96b75d5996137b8e385b60f6476a (commit)
       via  746deaa52d2fda99e9265fe20a9f265d8e6816f0 (commit)
       via  69e1bb1544b601e103b5fce06f3ec4cbed03d272 (commit)
       via  d7d074d790366338a01736552e916c5e4b5cef69 (commit)
       via  29fee810dd128be904354d439f8c85a3318f64f8 (commit)
       via  cec011b7718536de42ebd683aa96bee92cbca06c (commit)
       via  6804c679b365a237621bef37eb11c40ba87c4f10 (commit)
       via  cd4bb0b5bde62d7fe32cc105d6311dd318ebe304 (commit)
       via  55c142e4f5887810dc4c73b3498dad9e25f25347 (commit)
       via  cbf93e8d897448dbd52369afe89fef2392140ff1 (commit)
       via  0c21d2bd2d0fd351a4e546002a249b0b748061eb (commit)
       via  9085ebc570deb7387e17e64776d9694ed99b913a (commit)
       via  96165e8fcea44632979ddf7db02c1b70cb95d0f4 (commit)
       via  81dd4a91b279b3229fb359df6c5dbf07571083ac (commit)
       via  0a6b4ed36bd06b37412f020f0622e0c89e4e1653 (commit)
       via  4e32f0b140ec0ec7f96c1f9eaae00950c176ff03 (commit)
       via  2aa58f31ac8fc696361214a05ab9ba75a5140b08 (commit)
       via  8ff3fd06e165a275f53884d1d20287b68c1b32bd (commit)
       via  cd7a746df5e9bf8a5770d06410b3fe1908282a7b (commit)
       via  d1571f495b0e0e05c833d4666924bcb6a288b33d (commit)
       via  4a093ba4a1e14275a9500f2c65dd48528bc1e095 (commit)
       via  0731e1f9019fa841ef496e5e6e308e41deb7585a (commit)
       via  f09d4a342cced4915b76632fb996936b93f1cfc1 (commit)
       via  3f76b6309bc82cb2d62feb2eab10b55aa8b257ce (commit)
       via  4d27cabda122ebda4c8688580af9088c8732a074 (commit)
       via  7317196f547cf8c5e0ec87d1526322c305faba30 (commit)
       via  f1dfa6a0288d9a0380a73349db6083a85bda15d6 (commit)
       via  df3485f29656a5c4e3f1d7c09ccfdf3c6274d312 (commit)
       via  7ec3ab46727a04fb374debbad9e08b1f719f0ac2 (commit)
       via  81b8420a261a095a269a46d965b2fc0ee6ecf793 (commit)
       via  f62eb24060bb89c89faaa2998206961923c7b0b4 (commit)
       via  c031d4145d8ab1a11463acf5b20ef4df1afe00a4 (commit)
       via  471ac065323e087f2f844d21d1c39929191eba71 (commit)
       via  934d880aa5d10ed3382f9924a9a9f5694b41f266 (commit)
       via  18f8ceb73765a71a71449414f4b4f87ad620028a (commit)
       via  665379525fdf1a4a2ec89a45dcc3203a1ecfcf22 (commit)
       via  6fc7bd0626e93dd20fc58167300186e9f8820638 (commit)
       via  1e02903b90dbaf1f0e9fac222f65e3969b5f0352 (commit)
       via  ea36c4ba2652d8b6996e2c68535c22a7dc1a3807 (commit)
       via  127e57b8ea812b9d03b67c0118230a475b727158 (commit)
       via  a88f7ad9728ee6968367928c6d3d7613bbf290ec (commit)
       via  a45b256a3261fff8f168321c02e61b94ae9b4a64 (commit)
       via  50f8d8487ad5156058087438b670d7c6f8a8d718 (commit)
       via  f929cc124fc1fde8f1a6a12679e327b34aa88bde (commit)
       via  478fb1838aea03ebad17b66926ef03aac536707b (commit)
       via  e36e81150649a6457c9cbf0101130cfdb776336f (commit)
       via  78c18757e42c40178d7a9eaf78f7b6d167bee926 (commit)
       via  9b3d58d6fa3d7c300e006af16ce8072bb68eca30 (commit)
       via  70d02ffd2919ffb4148cdfd8cb8566db7a01345c (commit)
       via  80905cf49077ecf797f6d0ae3c375ca1a7f7df20 (commit)
       via  d9d145af78c61d900447434735cab7f3dc64fbdd (commit)
      from  076d8a0d4885f7bfa22962a745a5aacb48fa9be1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b3dbac2705f2bcffbf346d0b7efb85e2566b11f8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 15:08:57 2018 -0500

    14360: Replace OnComplete/OnCancel with ExecuteContainer callback.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 42767a159..737688023 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -121,16 +121,17 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 	for _, ctr := range queue.Containers {
 		waiting[ctr.UUID] = struct{}{}
 	}
-	onComplete := func(uuid string) {
+	executeContainer := func(ctr arvados.Container) int {
 		mtx.Lock()
 		defer mtx.Unlock()
-		if _, ok := waiting[uuid]; !ok {
-			c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+		if _, ok := waiting[ctr.UUID]; !ok {
+			c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
 		}
-		delete(waiting, uuid)
+		delete(waiting, ctr.UUID)
 		if len(waiting) == 0 {
 			close(done)
 		}
+		return int(rand.Uint32() & 0x3)
 	}
 	n := 0
 	s.stubDriver.Queue = queue
@@ -138,7 +139,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 		n++
 		stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
 		stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
-		stubvm.CtrExit = int(rand.Uint32() & 0x3)
+		stubvm.ExecuteContainer = executeContainer
 		switch n % 7 {
 		case 0:
 			stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
@@ -147,8 +148,6 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 		default:
 			stubvm.CrunchRunCrashRate = 0.1
 		}
-		stubvm.OnComplete = onComplete
-		stubvm.OnCancel = onComplete
 	}
 
 	start := time.Now()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 5c2633c49..8bdfaa947 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -28,10 +28,18 @@ type StubDriver struct {
 	HostKey        ssh.Signer
 	AuthorizedKeys []ssh.PublicKey
 
-	// SetupVM, if set, is called upon creation of each new VM.
-	SetupVM          func(*StubVM)
+	// SetupVM, if set, is called upon creation of each new
+	// StubVM. This is the caller's opportunity to customize the
+	// VM's error rate and other behaviors.
+	SetupVM func(*StubVM)
+
+	// StubVM's fake crunch-run uses this Queue to read and update
+	// container state.
+	Queue *Queue
+
+	// Frequency of artificially introduced errors on calls to
+	// Destroy. 0=always succeed, 1=always fail.
 	ErrorRateDestroy float64
-	Queue            *Queue
 
 	instanceSets []*StubInstanceSet
 }
@@ -120,9 +128,7 @@ type StubVM struct {
 	CrunchRunMissing     bool
 	CrunchRunCrashRate   float64
 	CrunchRunDetachDelay time.Duration
-	CtrExit              int
-	OnCancel             func(string)
-	OnComplete           func(string)
+	ExecuteContainer     func(arvados.Container) int
 
 	sis          *StubInstanceSet
 	id           cloud.InstanceID
@@ -196,22 +202,18 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
 				logger.Print("[test] container was killed")
 				return
 			}
+			if svm.ExecuteContainer != nil {
+				ctr.ExitCode = svm.ExecuteContainer(ctr)
+			}
 			// TODO: Check whether the stub instance has
 			// been destroyed, and if so, don't call
-			// onComplete. Then "container finished twice"
-			// can be classified as a bug.
+			// queue.Notify. Then "container finished
+			// twice" can be classified as a bug.
 			if crashluck < svm.CrunchRunCrashRate {
 				logger.Print("[test] crashing crunch-run stub")
-				if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
-					svm.OnCancel(uuid)
-				}
 			} else {
 				ctr.State = arvados.ContainerStateComplete
-				ctr.ExitCode = svm.CtrExit
 				queue.Notify(ctr)
-				if svm.OnComplete != nil {
-					svm.OnComplete(uuid)
-				}
 			}
 			logger.Print("[test] exiting crunch-run stub")
 			svm.Lock()

commit 61379d58674c14d31beab12113cd731a2dc24fd8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 14:11:23 2018 -0500

    14360: Call ChooseType just once per container.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
index ceca8c63f..e18a2b536 100644
--- a/lib/dispatchcloud/test/queue.go
+++ b/lib/dispatchcloud/test/queue.go
@@ -134,10 +134,15 @@ func (q *Queue) Update() error {
 		if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
 			continue
 		}
-		it, _ := q.ChooseType(&ctr)
-		upd[ctr.UUID] = container.QueueEnt{
-			Container:    ctr,
-			InstanceType: it,
+		if ent, ok := upd[ctr.UUID]; ok {
+			ent.Container = ctr
+			upd[ctr.UUID] = ent
+		} else {
+			it, _ := q.ChooseType(&ctr)
+			upd[ctr.UUID] = container.QueueEnt{
+				Container:    ctr,
+				InstanceType: it,
+			}
 		}
 	}
 	q.entries = upd

commit cecff7f38bb5894efd1d8e449310b8654938bc15
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 13:54:40 2018 -0500

    14360: Remove log spam.
    
    Just don't reserve space for an image if there is no image.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/node_size.go b/lib/dispatchcloud/node_size.go
index 2fac799f4..d7f458561 100644
--- a/lib/dispatchcloud/node_size.go
+++ b/lib/dispatchcloud/node_size.go
@@ -6,7 +6,6 @@ package dispatchcloud
 
 import (
 	"errors"
-	"log"
 	"regexp"
 	"sort"
 	"strconv"
@@ -34,12 +33,10 @@ var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
 func estimateDockerImageSize(collectionPDH string) int64 {
 	m := pdhRegexp.FindStringSubmatch(collectionPDH)
 	if m == nil {
-		log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
 		return 0
 	}
 	n, err := strconv.ParseInt(m[1], 10, 64)
 	if err != nil || n < 122 {
-		log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
 		return 0
 	}
 	// To avoid having to fetch the collection, take advantage of

commit e8f99cfef7cfbfcf1a1485d69250f24ced3fd609
Merge: a758d03b7 882727d49
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 10:02:55 2018 -0500

    Merge branch 'master' into 14360-dispatch-cloud
    
    refs #14360
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --cc lib/dispatchcloud/node_size.go
index e77c862b3,339e042c1..2fac799f4
--- a/lib/dispatchcloud/node_size.go
+++ b/lib/dispatchcloud/node_size.go
@@@ -6,7 -6,13 +6,10 @@@ package dispatchclou
  
  import (
  	"errors"
+ 	"log"
 -	"os/exec"
+ 	"regexp"
  	"sort"
+ 	"strconv"
 -	"strings"
 -	"time"
  
  	"git.curoverse.com/arvados.git/sdk/go/arvados"
  )

commit a758d03b74f1c62f7ed442f4a4e571e5bba29ddd
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 02:59:05 2018 -0500

    14360: Shutdown between tests to eliminate leaking logs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 1a47e8490..bea6ed3cc 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -51,6 +51,7 @@ type dispatcher struct {
 
 	setupOnce sync.Once
 	stop      chan struct{}
+	stopped   chan struct{}
 }
 
 // Start starts the dispatcher. Start can be called multiple times
@@ -79,6 +80,7 @@ func (disp *dispatcher) Close() {
 	case disp.stop <- struct{}{}:
 	default:
 	}
+	<-disp.stopped
 }
 
 // Make a worker.Executor for the given instance.
@@ -109,6 +111,7 @@ func (disp *dispatcher) initialize() {
 		}
 	}
 	disp.stop = make(chan struct{}, 1)
+	disp.stopped = make(chan struct{})
 	disp.logger = logrus.StandardLogger()
 
 	if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
@@ -144,6 +147,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
+	defer close(disp.stopped)
 	defer disp.instanceSet.Stop()
 
 	staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 7d3ab7930..42767a159 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -179,8 +179,12 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 }
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
-	drivers["test"] = s.stubDriver
 	s.cluster.ManagementToken = "abcdefgh"
+	drivers["test"] = s.stubDriver
+	s.disp.setupOnce.Do(s.disp.initialize)
+	s.disp.queue = &test.Queue{}
+	go s.disp.run()
+
 	for _, token := range []string{"abc", ""} {
 		req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
 		if token != "" {
@@ -197,8 +201,12 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
 }
 
 func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
-	drivers["test"] = s.stubDriver
 	s.cluster.ManagementToken = ""
+	drivers["test"] = s.stubDriver
+	s.disp.setupOnce.Do(s.disp.initialize)
+	s.disp.queue = &test.Queue{}
+	go s.disp.run()
+
 	for _, token := range []string{"abc", ""} {
 		req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
 		if token != "" {
@@ -214,6 +222,9 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 	s.cluster.ManagementToken = "abcdefgh"
 	s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
 	drivers["test"] = s.stubDriver
+	s.disp.setupOnce.Do(s.disp.initialize)
+	s.disp.queue = &test.Queue{}
+	go s.disp.run()
 
 	type instance struct {
 		Instance             string
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
index 9a5fb10d5..3971a5319 100644
--- a/lib/dispatchcloud/scheduler/scheduler.go
+++ b/lib/dispatchcloud/scheduler/scheduler.go
@@ -37,6 +37,7 @@ type Scheduler struct {
 
 	runOnce sync.Once
 	stop    chan struct{}
+	stopped chan struct{}
 }
 
 // New returns a new unstarted Scheduler.
@@ -51,6 +52,7 @@ func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, stale
 		staleLockTimeout:    staleLockTimeout,
 		queueUpdateInterval: queueUpdateInterval,
 		stop:                make(chan struct{}),
+		stopped:             make(chan struct{}),
 		locking:             map[string]bool{},
 	}
 }
@@ -64,9 +66,12 @@ func (sch *Scheduler) Start() {
 // Stop.
 func (sch *Scheduler) Stop() {
 	close(sch.stop)
+	<-sch.stopped
 }
 
 func (sch *Scheduler) run() {
+	defer close(sch.stopped)
+
 	// Ensure the queue is fetched once before attempting anything.
 	for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
 		sch.logger.Errorf("error updating queue: %s", err)

commit 772fc9527716fea221ae88ea3200db94f754afde
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 02:27:27 2018 -0500

    14360: Clean up stub driver.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 0eaf63d90..7d3ab7930 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -6,19 +6,14 @@ package dispatchcloud
 
 import (
 	"encoding/json"
-	"fmt"
-	"io"
 	"io/ioutil"
 	"math/rand"
 	"net/http"
 	"net/http/httptest"
 	"os"
-	"regexp"
-	"strings"
 	"sync"
 	"time"
 
-	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/Sirupsen/logrus"
@@ -28,141 +23,6 @@ import (
 
 var _ = check.Suite(&DispatcherSuite{})
 
-// fakeCloud provides an exec method that can be used as a
-// test.StubExecFunc. It calls the provided makeVM func when called
-// with a previously unseen instance ID. Calls to exec are passed on
-// to the *fakeVM for the appropriate instance ID.
-type fakeCloud struct {
-	queue      *test.Queue
-	makeVM     func(cloud.Instance) *fakeVM
-	onComplete func(string)
-	onCancel   func(string)
-	vms        map[cloud.InstanceID]*fakeVM
-	sync.Mutex
-}
-
-func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-	fc.Lock()
-	fvm, ok := fc.vms[inst.ID()]
-	if !ok {
-		if fc.vms == nil {
-			fc.vms = make(map[cloud.InstanceID]*fakeVM)
-		}
-		fvm = fc.makeVM(inst)
-		fc.vms[inst.ID()] = fvm
-	}
-	fc.Unlock()
-	return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
-}
-
-// fakeVM is a fake VM with configurable delays and failure modes.
-type fakeVM struct {
-	boot                 time.Time
-	broken               time.Time
-	crunchRunMissing     bool
-	crunchRunCrashRate   float64
-	crunchRunDetachDelay time.Duration
-	ctrExit              int
-	running              map[string]bool
-	completed            []string
-	sync.Mutex
-}
-
-func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-	uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
-	if eta := fvm.boot.Sub(time.Now()); eta > 0 {
-		fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
-		return 1
-	}
-	if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
-		fmt.Fprintf(stderr, "cannot fork\n")
-		return 2
-	}
-	if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
-		fmt.Fprint(stderr, "crunch-run: command not found\n")
-		return 1
-	}
-	if strings.HasPrefix(command, "crunch-run --detach ") {
-		fvm.Lock()
-		if fvm.running == nil {
-			fvm.running = map[string]bool{}
-		}
-		fvm.running[uuid] = true
-		fvm.Unlock()
-		time.Sleep(fvm.crunchRunDetachDelay)
-		fmt.Fprintf(stderr, "starting %s\n", uuid)
-		logger := logrus.WithField("ContainerUUID", uuid)
-		logger.Printf("[test] starting crunch-run stub")
-		go func() {
-			crashluck := rand.Float64()
-			ctr, ok := queue.Get(uuid)
-			if !ok {
-				logger.Print("[test] container not in queue")
-				return
-			}
-			if crashluck > fvm.crunchRunCrashRate/2 {
-				time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
-				ctr.State = arvados.ContainerStateRunning
-				queue.Notify(ctr)
-			}
-
-			time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
-			fvm.Lock()
-			_, running := fvm.running[uuid]
-			fvm.Unlock()
-			if !running {
-				logger.Print("[test] container was killed")
-				return
-			}
-			// TODO: Check whether the stub instance has
-			// been destroyed, and if so, don't call
-			// onComplete. Then "container finished twice"
-			// can be classified as a bug.
-			if crashluck < fvm.crunchRunCrashRate {
-				logger.Print("[test] crashing crunch-run stub")
-				if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
-					onCancel(uuid)
-				}
-			} else {
-				ctr.State = arvados.ContainerStateComplete
-				ctr.ExitCode = fvm.ctrExit
-				queue.Notify(ctr)
-				if onComplete != nil {
-					onComplete(uuid)
-				}
-			}
-			logger.Print("[test] exiting crunch-run stub")
-			fvm.Lock()
-			defer fvm.Unlock()
-			delete(fvm.running, uuid)
-		}()
-		return 0
-	}
-	if command == "crunch-run --list" {
-		fvm.Lock()
-		defer fvm.Unlock()
-		for uuid := range fvm.running {
-			fmt.Fprintf(stdout, "%s\n", uuid)
-		}
-		return 0
-	}
-	if strings.HasPrefix(command, "crunch-run --kill ") {
-		fvm.Lock()
-		defer fvm.Unlock()
-		if fvm.running[uuid] {
-			delete(fvm.running, uuid)
-		} else {
-			fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
-		}
-		return 0
-	}
-	if command == "true" {
-		return 0
-	}
-	fmt.Fprintf(stderr, "%q: command not found", command)
-	return 1
-}
-
 type DispatcherSuite struct {
 	cluster     *arvados.Cluster
 	instanceSet *test.LameInstanceSet
@@ -183,13 +43,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
 	_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
 	s.stubDriver = &test.StubDriver{
-		Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
-			c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
-			return 1
-		},
-		HostKey:        hostpriv,
-		AuthorizedKeys: []ssh.PublicKey{dispatchpub},
-
+		HostKey:          hostpriv,
+		AuthorizedKeys:   []ssh.PublicKey{dispatchpub},
 		ErrorRateDestroy: 0.1,
 	}
 
@@ -278,29 +133,23 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 		}
 	}
 	n := 0
-	fc := &fakeCloud{
-		queue: queue,
-		makeVM: func(inst cloud.Instance) *fakeVM {
-			n++
-			fvm := &fakeVM{
-				boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
-				crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
-				ctrExit:              int(rand.Uint32() & 0x3),
-			}
-			switch n % 7 {
-			case 0:
-				fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
-			case 1:
-				fvm.crunchRunMissing = true
-			default:
-				fvm.crunchRunCrashRate = 0.1
-			}
-			return fvm
-		},
-		onComplete: onComplete,
-		onCancel:   onComplete,
+	s.stubDriver.Queue = queue
+	s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+		n++
+		stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+		stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+		stubvm.CtrExit = int(rand.Uint32() & 0x3)
+		switch n % 7 {
+		case 0:
+			stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+		case 1:
+			stubvm.CrunchRunMissing = true
+		default:
+			stubvm.CrunchRunCrashRate = 0.1
+		}
+		stubvm.OnComplete = onComplete
+		stubvm.OnCancel = onComplete
 	}
-	s.stubDriver.Exec = fc.exec
 
 	start := time.Now()
 	go s.disp.run()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 808d7bd3a..5c2633c49 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -10,25 +10,28 @@ import (
 	"fmt"
 	"io"
 	math_rand "math/rand"
+	"regexp"
+	"strings"
 	"sync"
+	"time"
 
 	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/Sirupsen/logrus"
 	"github.com/mitchellh/mapstructure"
 	"golang.org/x/crypto/ssh"
 )
 
-type StubExecFunc func(instance cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
-
 // A StubDriver implements cloud.Driver by setting up local SSH
-// servers that pass their command execution requests to the provided
-// SSHExecFunc.
+// servers that do fake command executions.
 type StubDriver struct {
-	Exec           StubExecFunc
 	HostKey        ssh.Signer
 	AuthorizedKeys []ssh.PublicKey
 
+	// SetupVM, if set, is called upon creation of each new VM.
+	SetupVM          func(*StubVM)
 	ErrorRateDestroy float64
+	Queue            *Queue
 
 	instanceSets []*StubInstanceSet
 }
@@ -37,7 +40,7 @@ type StubDriver struct {
 func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
 	sis := StubInstanceSet{
 		driver:  sd,
-		servers: map[cloud.InstanceID]*stubServer{},
+		servers: map[cloud.InstanceID]*StubVM{},
 	}
 	sd.instanceSets = append(sd.instanceSets, &sis)
 	return &sis, mapstructure.Decode(params, &sis)
@@ -52,7 +55,7 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
 
 type StubInstanceSet struct {
 	driver  *StubDriver
-	servers map[cloud.InstanceID]*stubServer
+	servers map[cloud.InstanceID]*StubVM
 	mtx     sync.RWMutex
 	stopped bool
 }
@@ -67,23 +70,22 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 	if authKey != nil {
 		ak = append([]ssh.PublicKey{authKey}, ak...)
 	}
-	var ss *stubServer
-	ss = &stubServer{
+	svm := &StubVM{
 		sis:          sis,
 		id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
 		tags:         copyTags(tags),
 		providerType: it.ProviderType,
-		SSHService: SSHService{
-			HostKey:        sis.driver.HostKey,
-			AuthorizedKeys: ak,
-			Exec: func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
-				return sis.driver.Exec(ss.Instance(), command, stdin, stdout, stderr)
-			},
-		},
 	}
-
-	sis.servers[ss.id] = ss
-	return ss.Instance(), nil
+	svm.SSHService = SSHService{
+		HostKey:        sis.driver.HostKey,
+		AuthorizedKeys: ak,
+		Exec:           svm.Exec,
+	}
+	if setup := sis.driver.SetupVM; setup != nil {
+		setup(svm)
+	}
+	sis.servers[svm.id] = svm
+	return svm.Instance(), nil
 }
 
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
@@ -105,47 +107,152 @@ func (sis *StubInstanceSet) Stop() {
 	sis.stopped = true
 }
 
-// stubServer is a fake server that runs an SSH service. It represents
-// a VM running in a fake cloud.
+// StubVM is a fake server that runs an SSH service. It represents a
+// VM running in a fake cloud.
 //
 // Note this is distinct from a stubInstance, which is a snapshot of
-// the VM's metadata. As with a VM in a real cloud, the stubServer
-// keeps running (and might change IP addresses, shut down, etc.)
-// without updating any stubInstances that have been returned to
-// callers.
-type stubServer struct {
+// the VM's metadata. Like a VM in a real cloud, a StubVM keeps
+// running (and might change IP addresses, shut down, etc.)  without
+// updating any stubInstances that have been returned to callers.
+type StubVM struct {
+	Boot                 time.Time
+	Broken               time.Time
+	CrunchRunMissing     bool
+	CrunchRunCrashRate   float64
+	CrunchRunDetachDelay time.Duration
+	CtrExit              int
+	OnCancel             func(string)
+	OnComplete           func(string)
+
 	sis          *StubInstanceSet
 	id           cloud.InstanceID
 	tags         cloud.InstanceTags
 	providerType string
 	SSHService   SSHService
+	running      map[string]bool
 	sync.Mutex
 }
 
-func (ss *stubServer) Instance() stubInstance {
-	ss.Lock()
-	defer ss.Unlock()
+func (svm *StubVM) Instance() stubInstance {
+	svm.Lock()
+	defer svm.Unlock()
 	return stubInstance{
-		ss:   ss,
-		addr: ss.SSHService.Address(),
+		svm:  svm,
+		addr: svm.SSHService.Address(),
 		// We deliberately return a cached/stale copy of the
 		// real tags here, so that (Instance)Tags() sometimes
 		// returns old data after a call to
 		// (Instance)SetTags().  This is permitted by the
 		// driver interface, and this might help remind
 		// callers that they need to tolerate it.
-		tags: copyTags(ss.tags),
+		tags: copyTags(svm.tags),
+	}
+}
+
+func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+	queue := svm.sis.driver.Queue
+	uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+	if eta := svm.Boot.Sub(time.Now()); eta > 0 {
+		fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+		return 1
+	}
+	if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
+		fmt.Fprintf(stderr, "cannot fork\n")
+		return 2
+	}
+	if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
+		fmt.Fprint(stderr, "crunch-run: command not found\n")
+		return 1
+	}
+	if strings.HasPrefix(command, "crunch-run --detach ") {
+		svm.Lock()
+		if svm.running == nil {
+			svm.running = map[string]bool{}
+		}
+		svm.running[uuid] = true
+		svm.Unlock()
+		time.Sleep(svm.CrunchRunDetachDelay)
+		fmt.Fprintf(stderr, "starting %s\n", uuid)
+		logger := logrus.WithField("ContainerUUID", uuid)
+		logger.Printf("[test] starting crunch-run stub")
+		go func() {
+			crashluck := math_rand.Float64()
+			ctr, ok := queue.Get(uuid)
+			if !ok {
+				logger.Print("[test] container not in queue")
+				return
+			}
+			if crashluck > svm.CrunchRunCrashRate/2 {
+				time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+				ctr.State = arvados.ContainerStateRunning
+				queue.Notify(ctr)
+			}
+
+			time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+			svm.Lock()
+			_, running := svm.running[uuid]
+			svm.Unlock()
+			if !running {
+				logger.Print("[test] container was killed")
+				return
+			}
+			// TODO: Check whether the stub instance has
+			// been destroyed, and if so, don't call
+			// onComplete. Then "container finished twice"
+			// can be classified as a bug.
+			if crashluck < svm.CrunchRunCrashRate {
+				logger.Print("[test] crashing crunch-run stub")
+				if svm.OnCancel != nil && ctr.State == arvados.ContainerStateRunning {
+					svm.OnCancel(uuid)
+				}
+			} else {
+				ctr.State = arvados.ContainerStateComplete
+				ctr.ExitCode = svm.CtrExit
+				queue.Notify(ctr)
+				if svm.OnComplete != nil {
+					svm.OnComplete(uuid)
+				}
+			}
+			logger.Print("[test] exiting crunch-run stub")
+			svm.Lock()
+			defer svm.Unlock()
+			delete(svm.running, uuid)
+		}()
+		return 0
+	}
+	if command == "crunch-run --list" {
+		svm.Lock()
+		defer svm.Unlock()
+		for uuid := range svm.running {
+			fmt.Fprintf(stdout, "%s\n", uuid)
+		}
+		return 0
+	}
+	if strings.HasPrefix(command, "crunch-run --kill ") {
+		svm.Lock()
+		defer svm.Unlock()
+		if svm.running[uuid] {
+			delete(svm.running, uuid)
+		} else {
+			fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+		}
+		return 0
+	}
+	if command == "true" {
+		return 0
 	}
+	fmt.Fprintf(stderr, "%q: command not found", command)
+	return 1
 }
 
 type stubInstance struct {
-	ss   *stubServer
+	svm  *StubVM
 	addr string
 	tags cloud.InstanceTags
 }
 
 func (si stubInstance) ID() cloud.InstanceID {
-	return si.ss.id
+	return si.svm.id
 }
 
 func (si stubInstance) Address() string {
@@ -153,28 +260,28 @@ func (si stubInstance) Address() string {
 }
 
 func (si stubInstance) Destroy() error {
-	if math_rand.Float64() < si.ss.sis.driver.ErrorRateDestroy {
+	if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
 		return errors.New("instance could not be destroyed")
 	}
-	si.ss.SSHService.Close()
-	sis := si.ss.sis
+	si.svm.SSHService.Close()
+	sis := si.svm.sis
 	sis.mtx.Lock()
 	defer sis.mtx.Unlock()
-	delete(sis.servers, si.ss.id)
+	delete(sis.servers, si.svm.id)
 	return nil
 }
 
 func (si stubInstance) ProviderType() string {
-	return si.ss.providerType
+	return si.svm.providerType
 }
 
 func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
 	tags = copyTags(tags)
-	ss := si.ss
+	svm := si.svm
 	go func() {
-		ss.Lock()
-		defer ss.Unlock()
-		ss.tags = tags
+		svm.Lock()
+		defer svm.Unlock()
+		svm.tags = tags
 	}()
 	return nil
 }
@@ -184,7 +291,7 @@ func (si stubInstance) Tags() cloud.InstanceTags {
 }
 
 func (si stubInstance) String() string {
-	return string(si.ss.id)
+	return string(si.svm.id)
 }
 
 func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
@@ -193,7 +300,7 @@ func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) erro
 	if err != nil {
 		return err
 	}
-	sig, err := si.ss.sis.driver.HostKey.Sign(rand.Reader, buf)
+	sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
 	if err != nil {
 		return err
 	}

commit cd68182c48ec70804a2acbf95331ec25629a9e28
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Dec 17 23:37:47 2018 -0500

    14360: Avoid overreporting instances during Create/List race.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index a43b96ed8..722d4e918 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -121,7 +121,7 @@ type Pool struct {
 
 	// private state
 	subscribers  map[<-chan struct{}]chan<- struct{}
-	creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+	creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
 	workers      map[cloud.InstanceID]*worker
 	loaded       bool                 // loaded list of instances from InstanceSet at least once
 	exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -171,25 +171,41 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 
 // Unallocated returns the number of unallocated (creating + booting +
 // idle + unknown) workers for each instance type.
-//
-// The returned counts should be interpreted as upper bounds, rather
-// than exact counts: they are sometimes artificially high when a
-// newly created instance appears in the driver's Instances() list
-// before the Create() call returns.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 	wp.setupOnce.Do(wp.setup)
 	wp.mtx.RLock()
 	defer wp.mtx.RUnlock()
-	u := map[arvados.InstanceType]int{}
-	for it, c := range wp.creating {
-		u[it] = c
+	unalloc := map[arvados.InstanceType]int{}
+	creating := map[arvados.InstanceType]int{}
+	for it, times := range wp.creating {
+		creating[it] = len(times)
 	}
 	for _, wkr := range wp.workers {
-		if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
-			u[wkr.instType]++
+		if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+			continue
 		}
+		it := wkr.instType
+		unalloc[it]++
+		if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+			// If up to N new workers appear in
+			// Instances() while we are waiting for N
+			// Create() calls to complete, we assume we're
+			// just seeing a race between Instances() and
+			// Create() responses.
+			//
+			// The other common reason why nodes have
+			// state==Unknown is that they appeared at
+			// startup, before any Create calls. They
+			// don't match the above timing condition, so
+			// we never mistakenly attribute them to
+			// pending Create calls.
+			creating[it]--
+		}
+	}
+	for it, c := range creating {
+		unalloc[it] += c
 	}
-	return u
+	return unalloc
 }
 
 // Create a new instance with the given type, and add it to the worker
@@ -204,13 +220,21 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 		return wp.atQuotaErr
 	}
 	tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
-	wp.creating[it]++
+	now := time.Now()
+	wp.creating[it] = append(wp.creating[it], now)
 	go func() {
 		defer wp.notify()
 		inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
 		wp.mtx.Lock()
 		defer wp.mtx.Unlock()
-		wp.creating[it]--
+		// Remove our timestamp marker from wp.creating
+		for i, t := range wp.creating[it] {
+			if t == now {
+				copy(wp.creating[it][i:], wp.creating[it][i+1:])
+				wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
+				break
+			}
+		}
 		if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
 			wp.atQuotaErr = err
 			wp.atQuotaUntil = time.Now().Add(time.Minute)
@@ -266,6 +290,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 		state:    initialState,
 		instance: inst,
 		instType: it,
+		appeared: now,
 		probed:   now,
 		busy:     now,
 		updated:  now,
@@ -579,7 +604,7 @@ func (wp *Pool) Instances() []InstanceView {
 }
 
 func (wp *Pool) setup() {
-	wp.creating = map[arvados.InstanceType]int{}
+	wp.creating = map[arvados.InstanceType][]time.Time{}
 	wp.exited = map[string]time.Time{}
 	wp.workers = map[cloud.InstanceID]*worker{}
 	wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index f8667e06c..3867e2c63 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -68,23 +68,16 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	pool.Create(type2)
 	c.Check(pool.Unallocated()[type1], check.Equals, 1)
 	c.Check(pool.Unallocated()[type2], check.Equals, 2)
-	// Unblock the pending Create calls and (before calling Sync!)
-	// wait for the pool to process the returned instances.
+
+	// Unblock the pending Create calls.
 	go lameInstanceSet.Release(3)
-	suite.wait(c, pool, notify, func() bool {
-		list, err := lameInstanceSet.Instances(nil)
-		return err == nil && len(list) == 3
-	})
 
-	c.Check(pool.Unallocated()[type1], check.Equals, 1)
-	c.Check(pool.Unallocated()[type2], check.Equals, 2)
-	pool.getInstancesAndSync()
-	// Returned counts can be temporarily higher than 1 and 2 if
-	// poll ran before Create() returned.
-	c.Check(pool.Unallocated()[type1], check.Not(less), 1)
-	c.Check(pool.Unallocated()[type2], check.Not(less), 2)
+	// Wait for each instance to either return from its Create
+	// call, or show up in a poll.
 	suite.wait(c, pool, notify, func() bool {
-		return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
+		pool.mtx.RLock()
+		defer pool.mtx.RUnlock()
+		return len(pool.workers) == 3
 	})
 
 	c.Check(pool.Shutdown(type2), check.Equals, true)
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 85104a13a..a0a61c159 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -64,6 +64,7 @@ type worker struct {
 	instType  arvados.InstanceType
 	vcpus     int64
 	memory    int64
+	appeared  time.Time
 	probed    time.Time
 	updated   time.Time
 	busy      time.Time

commit aab8398af23a6a01713be7c219d78bec36ffe5ad
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Dec 17 15:14:11 2018 -0500

    14360: Comment stubServer.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index bfe0f8f26..808d7bd3a 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -105,6 +105,14 @@ func (sis *StubInstanceSet) Stop() {
 	sis.stopped = true
 }
 
+// stubServer is a fake server that runs an SSH service. It represents
+// a VM running in a fake cloud.
+//
+// Note this is distinct from a stubInstance, which is a snapshot of
+// the VM's metadata. As with a VM in a real cloud, the stubServer
+// keeps running (and might change IP addresses, shut down, etc.)
+// without updating any stubInstances that have been returned to
+// callers.
 type stubServer struct {
 	sis          *StubInstanceSet
 	id           cloud.InstanceID

commit 32fc16adf894bb6299f94054b69ff66f133a66e7
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Dec 17 13:49:43 2018 -0500

    14360: Locking comment.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
index fda04d52b..ceca8c63f 100644
--- a/lib/dispatchcloud/test/queue.go
+++ b/lib/dispatchcloud/test/queue.go
@@ -95,6 +95,7 @@ func (q *Queue) Unsubscribe(ch <-chan struct{}) {
 	delete(q.subscribers, ch)
 }
 
+// caller must have lock.
 func (q *Queue) notify() {
 	for _, ch := range q.subscribers {
 		select {

commit 0bebb7ca32bb96b75d5996137b8e385b60f6476a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Dec 17 13:47:14 2018 -0500

    14360: Fix log spam on normal race condition.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 7b432adc6..c8c25131e 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -131,6 +131,17 @@ func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
 		logger.Debug("locking in progress, doing nothing")
 		return
 	}
+	if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
+		// This happens if the container has been cancelled or
+		// locked since runQueue called sch.queue.Entries(),
+		// possibly by a bgLock() call from a previous
+		// runQueue iteration. In any case, we will respond
+		// appropriately on the next runQueue iteration, which
+		// will have already been triggered by the queue
+		// update.
+		logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
+		return
+	}
 	sch.locking[uuid] = true
 	go func() {
 		defer func() {

commit 69e1bb1544b601e103b5fce06f3ec4cbed03d272
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Dec 17 10:57:40 2018 -0500

    14360: Comment TestDispatchToStubDriver.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index ed987154e..0eaf63d90 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -235,6 +235,10 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
 	s.disp.Close()
 }
 
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 	drivers["test"] = s.stubDriver
 	s.disp.setupOnce.Do(s.disp.initialize)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list