[ARVADOS] updated: 1.3.0-190-ge8d02d1d3
Git user
git at public.curoverse.com
Tue Jan 22 17:31:52 EST 2019
Summary of changes:
lib/dispatchcloud/dispatcher.go | 5 +-
lib/dispatchcloud/dispatcher_test.go | 11 ++--
lib/dispatchcloud/instance_set_proxy.go | 75 -----------------------
lib/dispatchcloud/instance_set_proxy_test.go | 88 ---------------------------
lib/dispatchcloud/scheduler/interfaces.go | 2 +-
lib/dispatchcloud/scheduler/run_queue.go | 13 ++--
lib/dispatchcloud/scheduler/run_queue_test.go | 7 +--
lib/dispatchcloud/test/stub_driver.go | 28 ++++++---
lib/dispatchcloud/worker/pool.go | 49 +++++++++++----
lib/dispatchcloud/worker/pool_test.go | 2 +-
lib/dispatchcloud/worker/throttle.go | 68 +++++++++++++++++++++
lib/dispatchcloud/worker/throttle_test.go | 32 ++++++++++
12 files changed, 173 insertions(+), 207 deletions(-)
delete mode 100644 lib/dispatchcloud/instance_set_proxy.go
delete mode 100644 lib/dispatchcloud/instance_set_proxy_test.go
create mode 100644 lib/dispatchcloud/worker/throttle.go
create mode 100644 lib/dispatchcloud/worker/throttle_test.go
discards 3f1cd591126d255f842fb068de0d6788cff02949 (commit)
via e8d02d1d3d6804000abd81ad4ed07ecd53630b8e (commit)
via dd38b0af5dc3eed696fbaf289108fbd8bb9e705c (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (3f1cd591126d255f842fb068de0d6788cff02949)
\
N -- N -- N (e8d02d1d3d6804000abd81ad4ed07ecd53630b8e)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e8d02d1d3d6804000abd81ad4ed07ecd53630b8e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Jan 22 17:30:52 2019 -0500
14325: Wake up scheduler when quota timeout expires.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index ea51d6c3e..2e6cdb162 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -155,15 +155,21 @@ type Pool struct {
mMemoryInuse prometheus.Gauge
}
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
@@ -264,6 +270,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
wp.atQuotaErr = err
wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
}
logger.WithError(err).Error("create failed")
wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
commit dd38b0af5dc3eed696fbaf289108fbd8bb9e705c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Jan 22 17:13:56 2019 -0500
14325: Obey EarliestRetry specified by cloud rate-limiting errors.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e4a881aeb..3e3f5ee19 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -128,7 +128,7 @@ func (disp *dispatcher) initialize() {
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
- disp.instanceSet = &instanceSetProxy{instanceSet}
+ disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 93677a896..9ef170e46 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -43,9 +43,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
- ErrorRateDestroy: 0.1,
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
}
s.cluster = &arvados.Cluster{
@@ -254,8 +255,8 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
- err := s.disp.pool.Create(test.InstanceType(1))
- c.Check(err, check.IsNil)
+ ok := s.disp.pool.Create(test.InstanceType(1))
+ c.Check(ok, check.Equals, true)
<-ch
sr = getInstances()
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
deleted file mode 100644
index e728b67cd..000000000
--- a/lib/dispatchcloud/instance_set_proxy.go
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package dispatchcloud
-
-import (
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "golang.org/x/crypto/ssh"
-)
-
-type instanceSetProxy struct {
- cloud.InstanceSet
-}
-
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
- // TODO: return if Create failed recently with a RateLimitError or QuotaError
- return is.InstanceSet.Create(it, id, tags, pk)
-}
-
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
- // TODO: return if Instances failed recently with a RateLimitError
- return is.InstanceSet.Instances(tags)
-}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
index 59700c393..e1b575c8a 100644
--- a/lib/dispatchcloud/scheduler/interfaces.go
+++ b/lib/dispatchcloud/scheduler/interfaces.go
@@ -34,7 +34,7 @@ type WorkerPool interface {
Unallocated() map[arvados.InstanceType]int
CountWorkers() map[worker.State]int
AtQuota() bool
- Create(arvados.InstanceType) error
+ Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
KillContainer(uuid string)
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 8e74caef0..ecdae7f87 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -7,7 +7,6 @@ package scheduler
import (
"sort"
- "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
@@ -62,11 +61,13 @@ tryrun:
break tryrun
} else {
logger.Info("creating new instance")
- err := sch.pool.Create(it)
- if err != nil {
- if _, ok := err.(cloud.QuotaError); !ok {
- logger.WithError(err).Warn("error creating worker")
- }
+ if !sch.pool.Create(it) {
+ // (Note pool.Create works
+ // asynchronously and logs its
+ // own failures, so we don't
+ // need to log this as a
+ // failure.)
+
sch.queue.Unlock(ctr.UUID)
// Don't let lower-priority
// containers starve this one
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 8945f88a1..b586d62c9 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -5,7 +5,6 @@
package scheduler
import (
- "errors"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
@@ -56,14 +55,14 @@ func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
}
return r
}
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
p.creates = append(p.creates, it)
if p.canCreate < 1 {
- return stubQuotaError{errors.New("quota")}
+ return false
}
p.canCreate--
p.unalloc[it]++
- return nil
+ return true
}
func (p *stubPool) KillContainer(uuid string) {
p.running[uuid] = time.Now()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 7c948f4d3..64afa3acf 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -41,6 +41,11 @@ type StubDriver struct {
// Destroy. 0=always succeed, 1=always fail.
ErrorRateDestroy float64
+ // If Create() or Instances() is called too frequently, return
+ // rate-limiting errors.
+ MinTimeBetweenCreateCalls time.Duration
+ MinTimeBetweenInstancesCalls time.Duration
+
instanceSets []*StubInstanceSet
}
@@ -68,6 +73,9 @@ type StubInstanceSet struct {
servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
+
+ allowCreateCall time.Time
+ allowInstancesCall time.Time
}
func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
@@ -76,6 +84,12 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
if sis.stopped {
return nil, errors.New("StubInstanceSet: Create called after Stop")
}
+ if sis.allowCreateCall.After(time.Now()) {
+ return nil, RateLimitError{sis.allowCreateCall}
+ } else {
+ sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+ }
+
ak := sis.driver.AuthorizedKeys
if authKey != nil {
ak = append([]ssh.PublicKey{authKey}, ak...)
@@ -101,6 +115,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
sis.mtx.RLock()
defer sis.mtx.RUnlock()
+ if sis.allowInstancesCall.After(time.Now()) {
+ return nil, RateLimitError{sis.allowInstancesCall}
+ } else {
+ sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+ }
var r []cloud.Instance
for _, ss := range sis.servers {
r = append(r, ss.Instance())
@@ -117,6 +136,11 @@ func (sis *StubInstanceSet) Stop() {
sis.stopped = true
}
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 2c2d977d8..ea51d6c3e 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -67,6 +67,9 @@ const (
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -85,7 +88,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
wp := &Pool{
logger: logger,
arvClient: arvClient,
- instanceSet: instanceSet,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -115,7 +118,7 @@ type Pool struct {
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
- instanceSet cloud.InstanceSet
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
@@ -140,6 +143,9 @@ type Pool struct {
mtx sync.RWMutex
setupOnce sync.Once
+ throttleCreate throttle
+ throttleInstances throttle
+
mInstances prometheus.Gauge
mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
@@ -222,13 +228,18 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) {
- return wp.atQuotaErr
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
}
tags := cloud.InstanceTags{
tagKeyInstanceType: it.Name,
@@ -249,17 +260,18 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
break
}
}
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
- }
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
wp.updateWorker(inst, it, StateBooting)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
@@ -677,10 +689,14 @@ func (wp *Pool) notify() {
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 9c4542ea6..26087193f 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -51,7 +51,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
pool := &Pool{
logger: logrus.StandardLogger(),
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
- instanceSet: lameInstanceSet,
+ instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
type2.Name: type2,
diff --git a/lib/dispatchcloud/worker/throttle.go b/lib/dispatchcloud/worker/throttle.go
new file mode 100644
index 000000000..22c87e0fd
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle.go
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "github.com/sirupsen/logrus"
+)
+
+type throttle struct {
+ err error
+ until time.Time
+ mtx sync.Mutex
+}
+
+// CheckRateLimitError checks whether the given error is a
+// cloud.RateLimitError, and if so, ensures Error() returns a non-nil
+// error until the rate limiting holdoff period expires.
+//
+// If a notify func is given, it will be called after the holdoff
+// period expires.
+func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
+ rle, ok := err.(cloud.RateLimitError)
+ if !ok {
+ return
+ }
+ until := rle.EarliestRetry()
+ if !until.After(time.Now()) {
+ return
+ }
+ dur := until.Sub(time.Now())
+ logger.WithFields(logrus.Fields{
+ "CallType": callType,
+ "Duration": dur,
+ "ResumeAt": until,
+ }).Info("suspending remote calls due to rate-limit error")
+ thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until)
+ if notify != nil {
+ time.AfterFunc(dur, notify)
+ }
+}
+
+func (thr *throttle) ErrorUntil(err error, until time.Time) {
+ thr.mtx.Lock()
+ defer thr.mtx.Unlock()
+ thr.err, thr.until = err, until
+}
+
+func (thr *throttle) Error() error {
+ thr.mtx.Lock()
+ defer thr.mtx.Unlock()
+ if thr.err != nil && time.Now().After(thr.until) {
+ thr.err = nil
+ }
+ return thr.err
+}
+
+type throttledInstanceSet struct {
+ cloud.InstanceSet
+ throttleCreate throttle
+ throttleInstances throttle
+}
diff --git a/lib/dispatchcloud/worker/throttle_test.go b/lib/dispatchcloud/worker/throttle_test.go
new file mode 100644
index 000000000..6fdf9eeda
--- /dev/null
+++ b/lib/dispatchcloud/worker/throttle_test.go
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "errors"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ThrottleSuite{})
+
+type ThrottleSuite struct{}
+
+func (s *ThrottleSuite) TestRateLimitError(c *check.C) {
+ var t throttle
+ c.Check(t.Error(), check.IsNil)
+ t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Second), nil)
+ c.Check(t.Error(), check.NotNil)
+ t.ErrorUntil(nil, time.Now())
+ c.Check(t.Error(), check.IsNil)
+
+ notified := false
+ t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Millisecond), func() { notified = true })
+ c.Check(t.Error(), check.NotNil)
+ time.Sleep(time.Millisecond)
+ c.Check(t.Error(), check.IsNil)
+ c.Check(notified, check.Equals, true)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list