[ARVADOS] updated: 1.3.0-191-g788d0a185

Git user git at public.curoverse.com
Thu Jan 24 15:02:47 EST 2019


Summary of changes:
 lib/dispatchcloud/test/stub_driver.go     |   4 +-
 lib/dispatchcloud/worker/pool_test.go     |  13 +-
 lib/dispatchcloud/worker/throttle.go      |  10 +-
 lib/dispatchcloud/worker/throttle_test.go |   4 +-
 lib/dispatchcloud/worker/worker.go        |  73 ++++++++---
 lib/dispatchcloud/worker/worker_test.go   | 205 ++++++++++++++++++++++++++++++
 6 files changed, 267 insertions(+), 42 deletions(-)
 create mode 100644 lib/dispatchcloud/worker/worker_test.go

  discards  e8d02d1d3d6804000abd81ad4ed07ecd53630b8e (commit)
  discards  dd38b0af5dc3eed696fbaf289108fbd8bb9e705c (commit)
       via  788d0a18592e246b6db62fdb4618fdf9b5eed9b8 (commit)
       via  19b7075b60ef4252c85dca4a0a0f8b0d9e67498a (commit)
       via  de9d45e3a238df8e9f0b2833b86c5e54fec37c7a (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (e8d02d1d3d6804000abd81ad4ed07ecd53630b8e)
            \
             N -- N -- N (788d0a18592e246b6db62fdb4618fdf9b5eed9b8)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 788d0a18592e246b6db62fdb4618fdf9b5eed9b8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 24 14:59:14 2019 -0500

    14325: Don't shutdown busy VMs even if boot probe fails.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 64afa3acf..2277a3f23 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -50,9 +50,7 @@ type StubDriver struct {
 }
 
 // InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
-	logger logrus.FieldLogger) (cloud.InstanceSet, error) {
-
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
 	sis := StubInstanceSet{
 		driver:  sd,
 		servers: map[cloud.InstanceID]*StubVM{},
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 26087193f..aa8195843 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -5,7 +5,6 @@
 package worker
 
 import (
-	"io"
 	"time"
 
 	"git.curoverse.com/arvados.git/lib/cloud"
@@ -50,7 +49,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
 	pool := &Pool{
 		logger:      logrus.StandardLogger(),
-		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+		newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
 		instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
@@ -186,13 +185,3 @@ func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, rea
 	}
 	c.Check(ready(), check.Equals, true)
 }
-
-type stubExecutor struct{}
-
-func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-
-func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
-	return nil, nil, nil
-}
-
-func (*stubExecutor) Close() {}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 0872e594f..a75d2bbb8 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -6,6 +6,7 @@ package worker
 
 import (
 	"bytes"
+	"fmt"
 	"strings"
 	"sync"
 	"time"
@@ -144,38 +145,65 @@ func (wkr *worker) ProbeAndUpdate() {
 	}
 }
 
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
 func (wkr *worker) probeAndUpdate() {
 	wkr.mtx.Lock()
 	updated := wkr.updated
-	needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
-	needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+	initialState := wkr.state
 	wkr.mtx.Unlock()
-	if !needProbeBooted && !needProbeRunning {
-		return
-	}
 
 	var (
+		booted   bool
 		ctrUUIDs []string
 		ok       bool
 		stderr   []byte
 	)
-	if needProbeBooted {
-		ok, stderr = wkr.probeBooted()
-		wkr.mtx.Lock()
-		if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+
+	switch initialState {
+	case StateShutdown:
+		return
+	case StateIdle, StateRunning:
+		booted = true
+	case StateUnknown, StateBooting:
+	default:
+		panic(fmt.Sprintf("unknown state %s", initialState))
+	}
+
+	if !booted {
+		booted, stderr = wkr.probeBooted()
+		if !booted {
+			// Pretend this probe succeeded if another
+			// concurrent attempt succeeded.
+			wkr.mtx.Lock()
+			booted = wkr.state == StateRunning || wkr.state == StateIdle
+			wkr.mtx.Unlock()
+		} else {
 			wkr.logger.Info("instance booted; will try probeRunning")
-			needProbeRunning = true
 		}
-		wkr.mtx.Unlock()
 	}
-	if needProbeRunning {
+	if booted || wkr.state == StateUnknown {
 		ctrUUIDs, ok, stderr = wkr.probeRunning()
 	}
 	logger := wkr.logger.WithField("stderr", string(stderr))
 	wkr.mtx.Lock()
 	defer wkr.mtx.Unlock()
-	if !ok {
+	if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
 		if wkr.state == StateShutdown && wkr.updated.After(updated) {
 			// Skip the logging noise if shutdown was
 			// initiated during probe.
@@ -186,10 +214,10 @@ func (wkr *worker) probeAndUpdate() {
 			"Duration": dur,
 			"State":    wkr.state,
 		})
-		if wkr.state == StateBooting && !needProbeRunning {
-			// If we know the instance has never passed a
-			// boot probe, it's not noteworthy that it
-			// hasn't passed this probe.
+		if !booted {
+			// While we're polling the VM to see if it's
+			// finished booting, failures are not
+			// noteworthy, so we log at Debug level.
 			logger.Debug("new instance not responding")
 		} else {
 			logger.Info("instance not responding")
@@ -234,11 +262,16 @@ func (wkr *worker) probeAndUpdate() {
 			changed = true
 		}
 	}
-	if wkr.state == StateUnknown || wkr.state == StateBooting {
+	if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
 		// Note: this will change again below if
 		// len(wkr.starting)+len(wkr.running) > 0.
 		wkr.state = StateIdle
 		changed = true
+	} else if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+		logger.WithFields(logrus.Fields{
+			"RunningContainers": len(running),
+			"State":             wkr.state,
+		}).Info("crunch-run probe succeeded, but boot probe is still failing")
 	}
 	if !changed {
 		return
@@ -251,7 +284,7 @@ func (wkr *worker) probeAndUpdate() {
 		wkr.state = StateIdle
 	}
 	wkr.updated = updateTime
-	if needProbeBooted {
+	if booted && (initialState == StateUnknown || initialState == StateBooting) {
 		logger.WithFields(logrus.Fields{
 			"RunningContainers": len(running),
 			"State":             wkr.state,
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
new file mode 100644
index 000000000..3072c12a7
--- /dev/null
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -0,0 +1,205 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"errors"
+	"io"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&WorkerSuite{})
+
+type WorkerSuite struct{}
+
+func (suite *WorkerSuite) TestProbe(c *check.C) {
+	logger := logrus.StandardLogger()
+	bootTimeout := time.Minute
+	idleTimeout := time.Second
+
+	is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
+	c.Assert(err, check.IsNil)
+	inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+	c.Assert(err, check.IsNil)
+
+	type trialT struct {
+		testCaseComment string // displayed in test output to help identify failure case
+		age             time.Duration
+		state           State
+		running         int
+		respBoot        stubResp // zero value is success
+		respRun         stubResp // zero value is success + nothing running
+		expectState     State
+		expectRunning   int
+	}
+
+	errFail := errors.New("failed")
+	respFail := stubResp{"", "command failed\n", errFail}
+	respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
+	for _, trial := range []trialT{
+		{
+			testCaseComment: "Unknown, probes fail",
+			state:           StateUnknown,
+			respBoot:        respFail,
+			respRun:         respFail,
+			expectState:     StateUnknown,
+		},
+		{
+			testCaseComment: "Unknown, boot probe fails, but one container is running",
+			state:           StateUnknown,
+			respBoot:        respFail,
+			respRun:         respContainerRunning,
+			expectState:     StateUnknown,
+			expectRunning:   1,
+		},
+		{
+			testCaseComment: "Unknown, boot probe fails, previously running container has exited",
+			state:           StateUnknown,
+			running:         1,
+			respBoot:        respFail,
+			expectState:     StateUnknown,
+			expectRunning:   0,
+		},
+		{
+			testCaseComment: "Unknown, boot timeout exceeded, boot probe fails",
+			state:           StateUnknown,
+			age:             bootTimeout + time.Second,
+			respBoot:        respFail,
+			respRun:         respFail,
+			expectState:     StateShutdown,
+		},
+		{
+			testCaseComment: "Unknown, boot timeout exceeded, boot probe succeeds but crunch-run fails",
+			state:           StateUnknown,
+			age:             bootTimeout + time.Second,
+			respRun:         respFail,
+			expectState:     StateShutdown,
+		},
+		{
+			testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but crunch-run succeeds",
+			state:           StateUnknown,
+			age:             bootTimeout + time.Second,
+			respBoot:        respFail,
+			expectState:     StateShutdown,
+		},
+		{
+			testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but container is running",
+			state:           StateUnknown,
+			age:             bootTimeout + time.Second,
+			respBoot:        respFail,
+			respRun:         respContainerRunning,
+			expectState:     StateUnknown,
+			expectRunning:   1,
+		},
+		{
+			testCaseComment: "Booting, boot probe fails, run probe fails",
+			state:           StateBooting,
+			respBoot:        respFail,
+			respRun:         respFail,
+			expectState:     StateBooting,
+		},
+		{
+			testCaseComment: "Booting, boot probe fails, run probe succeeds (but isn't expected to be called)",
+			state:           StateBooting,
+			respBoot:        respFail,
+			expectState:     StateBooting,
+		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, run probe fails",
+			state:           StateBooting,
+			respRun:         respFail,
+			expectState:     StateBooting,
+		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, run probe succeeds",
+			state:           StateBooting,
+			expectState:     StateIdle,
+		},
+		{
+			testCaseComment: "Booting, boot probe succeeds, run probe succeeds, container is running",
+			state:           StateBooting,
+			respRun:         respContainerRunning,
+			expectState:     StateRunning,
+			expectRunning:   1,
+		},
+		{
+			testCaseComment: "Booting, boot timeout exceeded",
+			state:           StateBooting,
+			age:             bootTimeout + time.Second,
+			respRun:         respFail,
+			expectState:     StateShutdown,
+		},
+		{
+			testCaseComment: "Idle, one container running",
+			state:           StateIdle,
+			respRun:         respContainerRunning,
+			expectState:     StateRunning,
+			expectRunning:   1,
+		},
+		{
+			testCaseComment: "Running, one container still running",
+			state:           StateRunning,
+			running:         1,
+			respRun:         respContainerRunning,
+			expectState:     StateRunning,
+			expectRunning:   1,
+		},
+	} {
+		c.Logf("------- %#v", trial)
+		ctime := time.Now().Add(-trial.age)
+		exr := stubExecutor{
+			"bootprobe":         trial.respBoot,
+			"crunch-run --list": trial.respRun,
+		}
+		wp := &Pool{
+			newExecutor:      func(cloud.Instance) Executor { return exr },
+			bootProbeCommand: "bootprobe",
+			timeoutBooting:   bootTimeout,
+			timeoutIdle:      idleTimeout,
+			exited:           map[string]time.Time{},
+		}
+		wkr := &worker{
+			logger:   logger,
+			executor: exr,
+			wp:       wp,
+			mtx:      &wp.mtx,
+			state:    trial.state,
+			instance: inst,
+			appeared: ctime,
+			busy:     ctime,
+			probed:   ctime,
+			updated:  ctime,
+		}
+		if trial.running > 0 {
+			wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+		}
+		wkr.probeAndUpdate()
+		c.Check(wkr.state, check.Equals, trial.expectState)
+		c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+	}
+}
+
+type stubResp struct {
+	stdout string
+	stderr string
+	err    error
+}
+type stubExecutor map[string]stubResp
+
+func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se stubExecutor) Close()                         {}
+func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+	resp, ok := se[cmd]
+	if !ok {
+		return nil, []byte("command not found\n"), errors.New("command not found")
+	}
+	return []byte(resp.stdout), []byte(resp.stderr), resp.err
+}

commit 19b7075b60ef4252c85dca4a0a0f8b0d9e67498a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Jan 22 17:30:52 2019 -0500

    14325: Wake up scheduler when quota timeout expires.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index ea51d6c3e..2e6cdb162 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -155,15 +155,21 @@ type Pool struct {
 	mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //	ch := wp.Subscribe()
 //	defer wp.Unsubscribe(ch)
 //	for range ch {
-//		// ...try scheduling some work...
+//		tryScheduling(wp)
 //		if done {
 //			break
 //		}
@@ -264,6 +270,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 			if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
 				wp.atQuotaErr = err
 				wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+				time.AfterFunc(quotaErrorTTL, wp.notify)
 			}
 			logger.WithError(err).Error("create failed")
 			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)

commit de9d45e3a238df8e9f0b2833b86c5e54fec37c7a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Jan 22 17:13:56 2019 -0500

    14325: Obey EarliestRetry specified by cloud rate-limiting errors.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e4a881aeb..3e3f5ee19 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -128,7 +128,7 @@ func (disp *dispatcher) initialize() {
 	if err != nil {
 		disp.logger.Fatalf("error initializing driver: %s", err)
 	}
-	disp.instanceSet = &instanceSetProxy{instanceSet}
+	disp.instanceSet = instanceSet
 	disp.reg = prometheus.NewRegistry()
 	disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 93677a896..9ef170e46 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -43,9 +43,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
 	_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
 	s.stubDriver = &test.StubDriver{
-		HostKey:          hostpriv,
-		AuthorizedKeys:   []ssh.PublicKey{dispatchpub},
-		ErrorRateDestroy: 0.1,
+		HostKey:                   hostpriv,
+		AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+		ErrorRateDestroy:          0.1,
+		MinTimeBetweenCreateCalls: time.Millisecond,
 	}
 
 	s.cluster = &arvados.Cluster{
@@ -254,8 +255,8 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 
 	ch := s.disp.pool.Subscribe()
 	defer s.disp.pool.Unsubscribe(ch)
-	err := s.disp.pool.Create(test.InstanceType(1))
-	c.Check(err, check.IsNil)
+	ok := s.disp.pool.Create(test.InstanceType(1))
+	c.Check(ok, check.Equals, true)
 	<-ch
 
 	sr = getInstances()
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
deleted file mode 100644
index e728b67cd..000000000
--- a/lib/dispatchcloud/instance_set_proxy.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package dispatchcloud
-
-import (
-	"git.curoverse.com/arvados.git/lib/cloud"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"golang.org/x/crypto/ssh"
-)
-
-type instanceSetProxy struct {
-	cloud.InstanceSet
-}
-
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
-	// TODO: return if Create failed recently with a RateLimitError or QuotaError
-	return is.InstanceSet.Create(it, id, tags, pk)
-}
-
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
-	// TODO: return if Instances failed recently with a RateLimitError
-	return is.InstanceSet.Instances(tags)
-}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
index 59700c393..e1b575c8a 100644
--- a/lib/dispatchcloud/scheduler/interfaces.go
+++ b/lib/dispatchcloud/scheduler/interfaces.go
@@ -34,7 +34,7 @@ type WorkerPool interface {
 	Unallocated() map[arvados.InstanceType]int
 	CountWorkers() map[worker.State]int
 	AtQuota() bool
-	Create(arvados.InstanceType) error
+	Create(arvados.InstanceType) bool
 	Shutdown(arvados.InstanceType) bool
 	StartContainer(arvados.InstanceType, arvados.Container) bool
 	KillContainer(uuid string)
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 8e74caef0..ecdae7f87 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -7,7 +7,6 @@ package scheduler
 import (
 	"sort"
 
-	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/sirupsen/logrus"
@@ -62,11 +61,13 @@ tryrun:
 				break tryrun
 			} else {
 				logger.Info("creating new instance")
-				err := sch.pool.Create(it)
-				if err != nil {
-					if _, ok := err.(cloud.QuotaError); !ok {
-						logger.WithError(err).Warn("error creating worker")
-					}
+				if !sch.pool.Create(it) {
+					// (Note pool.Create works
+					// asynchronously and logs its
+					// own failures, so we don't
+					// need to log this as a
+					// failure.)
+
 					sch.queue.Unlock(ctr.UUID)
 					// Don't let lower-priority
 					// containers starve this one
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 8945f88a1..b586d62c9 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -5,7 +5,6 @@
 package scheduler
 
 import (
-	"errors"
 	"time"
 
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
@@ -56,14 +55,14 @@ func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
 	}
 	return r
 }
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
 	p.creates = append(p.creates, it)
 	if p.canCreate < 1 {
-		return stubQuotaError{errors.New("quota")}
+		return false
 	}
 	p.canCreate--
 	p.unalloc[it]++
-	return nil
+	return true
 }
 func (p *stubPool) KillContainer(uuid string) {
 	p.running[uuid] = time.Now()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 7c948f4d3..64afa3acf 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -41,6 +41,11 @@ type StubDriver struct {
 	// Destroy. 0=always succeed, 1=always fail.
 	ErrorRateDestroy float64
 
+	// If Create() or Instances() is called too frequently, return
+	// rate-limiting errors.
+	MinTimeBetweenCreateCalls    time.Duration
+	MinTimeBetweenInstancesCalls time.Duration
+
 	instanceSets []*StubInstanceSet
 }
 
@@ -68,6 +73,9 @@ type StubInstanceSet struct {
 	servers map[cloud.InstanceID]*StubVM
 	mtx     sync.RWMutex
 	stopped bool
+
+	allowCreateCall    time.Time
+	allowInstancesCall time.Time
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
@@ -76,6 +84,12 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 	if sis.stopped {
 		return nil, errors.New("StubInstanceSet: Create called after Stop")
 	}
+	if sis.allowCreateCall.After(time.Now()) {
+		return nil, RateLimitError{sis.allowCreateCall}
+	} else {
+		sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+	}
+
 	ak := sis.driver.AuthorizedKeys
 	if authKey != nil {
 		ak = append([]ssh.PublicKey{authKey}, ak...)
@@ -101,6 +115,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
 	sis.mtx.RLock()
 	defer sis.mtx.RUnlock()
+	if sis.allowInstancesCall.After(time.Now()) {
+		return nil, RateLimitError{sis.allowInstancesCall}
+	} else {
+		sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+	}
 	var r []cloud.Instance
 	for _, ss := range sis.servers {
 		r = append(r, ss.Instance())
@@ -117,6 +136,11 @@ func (sis *StubInstanceSet) Stop() {
 	sis.stopped = true
 }
 
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 2c2d977d8..ea51d6c3e 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -67,6 +67,9 @@ const (
 	// Time after a quota error to try again anyway, even if no
 	// instances have been shutdown.
 	quotaErrorTTL = time.Minute
+
+	// Time between "X failed because rate limiting" messages
+	logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -85,7 +88,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 	wp := &Pool{
 		logger:             logger,
 		arvClient:          arvClient,
-		instanceSet:        instanceSet,
+		instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
 		newExecutor:        newExecutor,
 		bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
 		imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -115,7 +118,7 @@ type Pool struct {
 	// configuration
 	logger             logrus.FieldLogger
 	arvClient          *arvados.Client
-	instanceSet        cloud.InstanceSet
+	instanceSet        *throttledInstanceSet
 	newExecutor        func(cloud.Instance) Executor
 	bootProbeCommand   string
 	imageID            cloud.ImageID
@@ -140,6 +143,9 @@ type Pool struct {
 	mtx          sync.RWMutex
 	setupOnce    sync.Once
 
+	throttleCreate    throttle
+	throttleInstances throttle
+
 	mInstances         prometheus.Gauge
 	mInstancesPrice    prometheus.Gauge
 	mContainersRunning prometheus.Gauge
@@ -222,13 +228,18 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
 	logger := wp.logger.WithField("InstanceType", it.Name)
 	wp.setupOnce.Do(wp.setup)
 	wp.mtx.Lock()
 	defer wp.mtx.Unlock()
-	if time.Now().Before(wp.atQuotaUntil) {
-		return wp.atQuotaErr
+	if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+		return false
 	}
 	tags := cloud.InstanceTags{
 		tagKeyInstanceType: it.Name,
@@ -249,17 +260,18 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 				break
 			}
 		}
-		if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-			wp.atQuotaErr = err
-			wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-		}
 		if err != nil {
+			if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+				wp.atQuotaErr = err
+				wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+			}
 			logger.WithError(err).Error("create failed")
+			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
 			return
 		}
 		wp.updateWorker(inst, it, StateBooting)
 	}()
-	return nil
+	return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -677,10 +689,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
 	wp.setupOnce.Do(wp.setup)
+	if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+		return err
+	}
 	wp.logger.Debug("getting instance list")
 	threshold := time.Now()
 	instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
 	if err != nil {
+		wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
 		return err
 	}
 	wp.sync(threshold, instances)
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 9c4542ea6..26087193f 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -51,7 +51,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	pool := &Pool{
 		logger:      logrus.StandardLogger(),
 		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
-		instanceSet: lameInstanceSet,
+		instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
 			type2.Name: type2,
diff --git a/lib/dispatchcloud/worker/throttle.go b/lib/dispatchcloud/worker/throttle.go
new file mode 100644
index 000000000..c5ea79323
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle.go
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"github.com/sirupsen/logrus"
+)
+
+type throttle struct {
+	err   error
+	until time.Time
+	mtx   sync.Mutex
+}
+
+// CheckRateLimitError checks whether the given error is a
+// cloud.RateLimitError, and if so, ensures Error() returns a non-nil
+// error until the rate limiting holdoff period expires.
+//
+// If a notify func is given, it will be called after the holdoff
+// period expires.
+func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
+	rle, ok := err.(cloud.RateLimitError)
+	if !ok {
+		return
+	}
+	until := rle.EarliestRetry()
+	if !until.After(time.Now()) {
+		return
+	}
+	dur := until.Sub(time.Now())
+	logger.WithFields(logrus.Fields{
+		"CallType": callType,
+		"Duration": dur,
+		"ResumeAt": until,
+	}).Info("suspending remote calls due to rate-limit error")
+	thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until, notify)
+}
+
+func (thr *throttle) ErrorUntil(err error, until time.Time, notify func()) {
+	thr.mtx.Lock()
+	defer thr.mtx.Unlock()
+	thr.err, thr.until = err, until
+	if notify != nil {
+		time.AfterFunc(until.Sub(time.Now()), notify)
+	}
+}
+
+func (thr *throttle) Error() error {
+	thr.mtx.Lock()
+	defer thr.mtx.Unlock()
+	if thr.err != nil && time.Now().After(thr.until) {
+		thr.err = nil
+	}
+	return thr.err
+}
+
+type throttledInstanceSet struct {
+	cloud.InstanceSet
+	throttleCreate    throttle
+	throttleInstances throttle
+}
diff --git a/lib/dispatchcloud/worker/throttle_test.go b/lib/dispatchcloud/worker/throttle_test.go
new file mode 100644
index 000000000..045b61741
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle_test.go
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"errors"
+	"time"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ThrottleSuite{})
+
+type ThrottleSuite struct{}
+
+func (s *ThrottleSuite) TestRateLimitError(c *check.C) {
+	var t throttle
+	c.Check(t.Error(), check.IsNil)
+	t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Second), nil)
+	c.Check(t.Error(), check.NotNil)
+	t.ErrorUntil(nil, time.Now(), nil)
+	c.Check(t.Error(), check.IsNil)
+
+	notified := false
+	t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Millisecond), func() { notified = true })
+	c.Check(t.Error(), check.NotNil)
+	time.Sleep(time.Millisecond * 10)
+	c.Check(t.Error(), check.IsNil)
+	c.Check(notified, check.Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list