[ARVADOS] updated: 1.3.0-190-ge8d02d1d3

Git user git at public.curoverse.com
Tue Jan 22 17:31:52 EST 2019


Summary of changes:
 lib/dispatchcloud/dispatcher.go               |  5 +-
 lib/dispatchcloud/dispatcher_test.go          | 11 ++--
 lib/dispatchcloud/instance_set_proxy.go       | 75 -----------------------
 lib/dispatchcloud/instance_set_proxy_test.go  | 88 ---------------------------
 lib/dispatchcloud/scheduler/interfaces.go     |  2 +-
 lib/dispatchcloud/scheduler/run_queue.go      | 13 ++--
 lib/dispatchcloud/scheduler/run_queue_test.go |  7 +--
 lib/dispatchcloud/test/stub_driver.go         | 28 ++++++---
 lib/dispatchcloud/worker/pool.go              | 49 +++++++++++----
 lib/dispatchcloud/worker/pool_test.go         |  2 +-
 lib/dispatchcloud/worker/throttle.go          | 68 +++++++++++++++++++++
 lib/dispatchcloud/worker/throttle_test.go     | 32 ++++++++++
 12 files changed, 173 insertions(+), 207 deletions(-)
 delete mode 100644 lib/dispatchcloud/instance_set_proxy.go
 delete mode 100644 lib/dispatchcloud/instance_set_proxy_test.go
 create mode 100644 lib/dispatchcloud/worker/throttle.go
 create mode 100644 lib/dispatchcloud/worker/throttle_test.go

  discards  3f1cd591126d255f842fb068de0d6788cff02949 (commit)
       via  e8d02d1d3d6804000abd81ad4ed07ecd53630b8e (commit)
       via  dd38b0af5dc3eed696fbaf289108fbd8bb9e705c (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (3f1cd591126d255f842fb068de0d6788cff02949)
            \
             N -- N -- N (e8d02d1d3d6804000abd81ad4ed07ecd53630b8e)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e8d02d1d3d6804000abd81ad4ed07ecd53630b8e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Jan 22 17:30:52 2019 -0500

    14325: Wake up scheduler when quota timeout expires.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index ea51d6c3e..2e6cdb162 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -155,15 +155,21 @@ type Pool struct {
 	mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //	ch := wp.Subscribe()
 //	defer wp.Unsubscribe(ch)
 //	for range ch {
-//		// ...try scheduling some work...
+//		tryScheduling(wp)
 //		if done {
 //			break
 //		}
@@ -264,6 +270,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
 			if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
 				wp.atQuotaErr = err
 				wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+				time.AfterFunc(quotaErrorTTL, wp.notify)
 			}
 			logger.WithError(err).Error("create failed")
 			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)

commit dd38b0af5dc3eed696fbaf289108fbd8bb9e705c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Jan 22 17:13:56 2019 -0500

    14325: Obey EarliestRetry specified by cloud rate-limiting errors.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e4a881aeb..3e3f5ee19 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -128,7 +128,7 @@ func (disp *dispatcher) initialize() {
 	if err != nil {
 		disp.logger.Fatalf("error initializing driver: %s", err)
 	}
-	disp.instanceSet = &instanceSetProxy{instanceSet}
+	disp.instanceSet = instanceSet
 	disp.reg = prometheus.NewRegistry()
 	disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 93677a896..9ef170e46 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -43,9 +43,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
 	_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
 	s.stubDriver = &test.StubDriver{
-		HostKey:          hostpriv,
-		AuthorizedKeys:   []ssh.PublicKey{dispatchpub},
-		ErrorRateDestroy: 0.1,
+		HostKey:                   hostpriv,
+		AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+		ErrorRateDestroy:          0.1,
+		MinTimeBetweenCreateCalls: time.Millisecond,
 	}
 
 	s.cluster = &arvados.Cluster{
@@ -254,8 +255,8 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 
 	ch := s.disp.pool.Subscribe()
 	defer s.disp.pool.Unsubscribe(ch)
-	err := s.disp.pool.Create(test.InstanceType(1))
-	c.Check(err, check.IsNil)
+	ok := s.disp.pool.Create(test.InstanceType(1))
+	c.Check(ok, check.Equals, true)
 	<-ch
 
 	sr = getInstances()
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
deleted file mode 100644
index e728b67cd..000000000
--- a/lib/dispatchcloud/instance_set_proxy.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package dispatchcloud
-
-import (
-	"git.curoverse.com/arvados.git/lib/cloud"
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
-	"golang.org/x/crypto/ssh"
-)
-
-type instanceSetProxy struct {
-	cloud.InstanceSet
-}
-
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
-	// TODO: return if Create failed recently with a RateLimitError or QuotaError
-	return is.InstanceSet.Create(it, id, tags, pk)
-}
-
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
-	// TODO: return if Instances failed recently with a RateLimitError
-	return is.InstanceSet.Instances(tags)
-}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
index 59700c393..e1b575c8a 100644
--- a/lib/dispatchcloud/scheduler/interfaces.go
+++ b/lib/dispatchcloud/scheduler/interfaces.go
@@ -34,7 +34,7 @@ type WorkerPool interface {
 	Unallocated() map[arvados.InstanceType]int
 	CountWorkers() map[worker.State]int
 	AtQuota() bool
-	Create(arvados.InstanceType) error
+	Create(arvados.InstanceType) bool
 	Shutdown(arvados.InstanceType) bool
 	StartContainer(arvados.InstanceType, arvados.Container) bool
 	KillContainer(uuid string)
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 8e74caef0..ecdae7f87 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -7,7 +7,6 @@ package scheduler
 import (
 	"sort"
 
-	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/sirupsen/logrus"
@@ -62,11 +61,13 @@ tryrun:
 				break tryrun
 			} else {
 				logger.Info("creating new instance")
-				err := sch.pool.Create(it)
-				if err != nil {
-					if _, ok := err.(cloud.QuotaError); !ok {
-						logger.WithError(err).Warn("error creating worker")
-					}
+				if !sch.pool.Create(it) {
+					// (Note pool.Create works
+					// asynchronously and logs its
+					// own failures, so we don't
+					// need to log this as a
+					// failure.)
+
 					sch.queue.Unlock(ctr.UUID)
 					// Don't let lower-priority
 					// containers starve this one
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 8945f88a1..b586d62c9 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -5,7 +5,6 @@
 package scheduler
 
 import (
-	"errors"
 	"time"
 
 	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
@@ -56,14 +55,14 @@ func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
 	}
 	return r
 }
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
 	p.creates = append(p.creates, it)
 	if p.canCreate < 1 {
-		return stubQuotaError{errors.New("quota")}
+		return false
 	}
 	p.canCreate--
 	p.unalloc[it]++
-	return nil
+	return true
 }
 func (p *stubPool) KillContainer(uuid string) {
 	p.running[uuid] = time.Now()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 7c948f4d3..64afa3acf 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -41,6 +41,11 @@ type StubDriver struct {
 	// Destroy. 0=always succeed, 1=always fail.
 	ErrorRateDestroy float64
 
+	// If Create() or Instances() is called too frequently, return
+	// rate-limiting errors.
+	MinTimeBetweenCreateCalls    time.Duration
+	MinTimeBetweenInstancesCalls time.Duration
+
 	instanceSets []*StubInstanceSet
 }
 
@@ -68,6 +73,9 @@ type StubInstanceSet struct {
 	servers map[cloud.InstanceID]*StubVM
 	mtx     sync.RWMutex
 	stopped bool
+
+	allowCreateCall    time.Time
+	allowInstancesCall time.Time
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
@@ -76,6 +84,12 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 	if sis.stopped {
 		return nil, errors.New("StubInstanceSet: Create called after Stop")
 	}
+	if sis.allowCreateCall.After(time.Now()) {
+		return nil, RateLimitError{sis.allowCreateCall}
+	} else {
+		sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+	}
+
 	ak := sis.driver.AuthorizedKeys
 	if authKey != nil {
 		ak = append([]ssh.PublicKey{authKey}, ak...)
@@ -101,6 +115,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
 	sis.mtx.RLock()
 	defer sis.mtx.RUnlock()
+	if sis.allowInstancesCall.After(time.Now()) {
+		return nil, RateLimitError{sis.allowInstancesCall}
+	} else {
+		sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+	}
 	var r []cloud.Instance
 	for _, ss := range sis.servers {
 		r = append(r, ss.Instance())
@@ -117,6 +136,11 @@ func (sis *StubInstanceSet) Stop() {
 	sis.stopped = true
 }
 
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 2c2d977d8..ea51d6c3e 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -67,6 +67,9 @@ const (
 	// Time after a quota error to try again anyway, even if no
 	// instances have been shutdown.
 	quotaErrorTTL = time.Minute
+
+	// Time between "X failed because rate limiting" messages
+	logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -85,7 +88,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
 	wp := &Pool{
 		logger:             logger,
 		arvClient:          arvClient,
-		instanceSet:        instanceSet,
+		instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
 		newExecutor:        newExecutor,
 		bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
 		imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -115,7 +118,7 @@ type Pool struct {
 	// configuration
 	logger             logrus.FieldLogger
 	arvClient          *arvados.Client
-	instanceSet        cloud.InstanceSet
+	instanceSet        *throttledInstanceSet
 	newExecutor        func(cloud.Instance) Executor
 	bootProbeCommand   string
 	imageID            cloud.ImageID
@@ -140,6 +143,9 @@ type Pool struct {
 	mtx          sync.RWMutex
 	setupOnce    sync.Once
 
+	throttleCreate    throttle
+	throttleInstances throttle
+
 	mInstances         prometheus.Gauge
 	mInstancesPrice    prometheus.Gauge
 	mContainersRunning prometheus.Gauge
@@ -222,13 +228,18 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
 	logger := wp.logger.WithField("InstanceType", it.Name)
 	wp.setupOnce.Do(wp.setup)
 	wp.mtx.Lock()
 	defer wp.mtx.Unlock()
-	if time.Now().Before(wp.atQuotaUntil) {
-		return wp.atQuotaErr
+	if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+		return false
 	}
 	tags := cloud.InstanceTags{
 		tagKeyInstanceType: it.Name,
@@ -249,17 +260,18 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 				break
 			}
 		}
-		if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-			wp.atQuotaErr = err
-			wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
-		}
 		if err != nil {
+			if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+				wp.atQuotaErr = err
+				wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+			}
 			logger.WithError(err).Error("create failed")
+			wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
 			return
 		}
 		wp.updateWorker(inst, it, StateBooting)
 	}()
-	return nil
+	return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -677,10 +689,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
 	wp.setupOnce.Do(wp.setup)
+	if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+		return err
+	}
 	wp.logger.Debug("getting instance list")
 	threshold := time.Now()
 	instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
 	if err != nil {
+		wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
 		return err
 	}
 	wp.sync(threshold, instances)
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 9c4542ea6..26087193f 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -51,7 +51,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	pool := &Pool{
 		logger:      logrus.StandardLogger(),
 		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
-		instanceSet: lameInstanceSet,
+		instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
 			type2.Name: type2,
diff --git a/lib/dispatchcloud/worker/throttle.go b/lib/dispatchcloud/worker/throttle.go
new file mode 100644
index 000000000..22c87e0fd
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle.go
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"github.com/sirupsen/logrus"
+)
+
+type throttle struct {
+	err   error
+	until time.Time
+	mtx   sync.Mutex
+}
+
+// CheckRateLimitError checks whether the given error is a
+// cloud.RateLimitError, and if so, ensures Error() returns a non-nil
+// error until the rate limiting holdoff period expires.
+//
+// If a notify func is given, it will be called after the holdoff
+// period expires.
+func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
+	rle, ok := err.(cloud.RateLimitError)
+	if !ok {
+		return
+	}
+	until := rle.EarliestRetry()
+	if !until.After(time.Now()) {
+		return
+	}
+	dur := until.Sub(time.Now())
+	logger.WithFields(logrus.Fields{
+		"CallType": callType,
+		"Duration": dur,
+		"ResumeAt": until,
+	}).Info("suspending remote calls due to rate-limit error")
+	thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until)
+	if notify != nil {
+		time.AfterFunc(dur, notify)
+	}
+}
+
+func (thr *throttle) ErrorUntil(err error, until time.Time) {
+	thr.mtx.Lock()
+	defer thr.mtx.Unlock()
+	thr.err, thr.until = err, until
+}
+
+func (thr *throttle) Error() error {
+	thr.mtx.Lock()
+	defer thr.mtx.Unlock()
+	if thr.err != nil && time.Now().After(thr.until) {
+		thr.err = nil
+	}
+	return thr.err
+}
+
+type throttledInstanceSet struct {
+	cloud.InstanceSet
+	throttleCreate    throttle
+	throttleInstances throttle
+}
diff --git a/lib/dispatchcloud/worker/throttle_test.go b/lib/dispatchcloud/worker/throttle_test.go
new file mode 100644
index 000000000..6fdf9eeda
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle_test.go
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+	"errors"
+	"time"
+
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ThrottleSuite{})
+
+type ThrottleSuite struct{}
+
+func (s *ThrottleSuite) TestRateLimitError(c *check.C) {
+	var t throttle
+	c.Check(t.Error(), check.IsNil)
+	t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Second), nil)
+	c.Check(t.Error(), check.NotNil)
+	t.ErrorUntil(nil, time.Now())
+	c.Check(t.Error(), check.IsNil)
+
+	notified := false
+	t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Millisecond), func() { notified = true })
+	c.Check(t.Error(), check.NotNil)
+	time.Sleep(time.Millisecond)
+	c.Check(t.Error(), check.IsNil)
+	c.Check(notified, check.Equals, true)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list