[ARVADOS] updated: 1.3.0-189-g3f1cd5911

Git user git at public.curoverse.com
Mon Jan 21 15:21:42 EST 2019


Summary of changes:
 lib/dispatchcloud/dispatcher.go              |  5 +-
 lib/dispatchcloud/dispatcher_test.go         |  4 +-
 lib/dispatchcloud/instance_set_proxy.go      | 58 ++++++++++++++++--
 lib/dispatchcloud/instance_set_proxy_test.go | 88 ++++++++++++++++++++++++++++
 lib/dispatchcloud/test/stub_driver.go        | 21 ++++++-
 lib/dispatchcloud/worker/pool.go             |  6 +-
 lib/dispatchcloud/worker/worker.go           | 30 ++++++----
 7 files changed, 193 insertions(+), 19 deletions(-)
 create mode 100644 lib/dispatchcloud/instance_set_proxy_test.go

       via  3f1cd591126d255f842fb068de0d6788cff02949 (commit)
       via  262d59632f85b34ef4e2bcb1ee323a6e3b4435ed (commit)
       via  332f82b60422b3c445b3fd2d69ddd45915d23ff8 (commit)
       via  38cf2cccc4c3ec520d3d7ab85dbe27f427f6d394 (commit)
       via  3300a5ebde5463808e098e489d47b756d8613774 (commit)
      from  61381f4be1dbba9c56df342093ee614f0d5a28df (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3f1cd591126d255f842fb068de0d6788cff02949
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 21 15:20:11 2019 -0500

    14325: Obey EarliestRetry specified by cloud rate-limiting errors.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index e4a881aeb..c8ce2df7b 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -128,7 +128,10 @@ func (disp *dispatcher) initialize() {
 	if err != nil {
 		disp.logger.Fatalf("error initializing driver: %s", err)
 	}
-	disp.instanceSet = &instanceSetProxy{instanceSet}
+	disp.instanceSet = &instanceSetProxy{
+		InstanceSet: instanceSet,
+		Logger:      disp.logger,
+	}
 	disp.reg = prometheus.NewRegistry()
 	disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
index e728b67cd..ea0552df1 100644
--- a/lib/dispatchcloud/instance_set_proxy.go
+++ b/lib/dispatchcloud/instance_set_proxy.go
@@ -5,21 +5,71 @@
 package dispatchcloud
 
 import (
+	"sync"
+	"time"
+
 	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
 	"golang.org/x/crypto/ssh"
 )
 
 type instanceSetProxy struct {
 	cloud.InstanceSet
+	Logger logrus.FieldLogger
+
+	errCreate    errorUntil
+	errInstances errorUntil
 }
 
 func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
-	// TODO: return if Create failed recently with a RateLimitError or QuotaError
-	return is.InstanceSet.Create(it, id, tags, pk)
+	if err := is.errCreate.Err(); err != nil {
+		return nil, err
+	}
+	inst, err := is.InstanceSet.Create(it, id, tags, pk)
+	is.errCreate.Check(err, is.Logger, "Create()")
+	return inst, err
 }
 
 func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
-	// TODO: return if Instances failed recently with a RateLimitError
-	return is.InstanceSet.Instances(tags)
+	if err := is.errInstances.Err(); err != nil {
+		return nil, err
+	}
+	cis, err := is.InstanceSet.Instances(tags)
+	is.errInstances.Check(err, is.Logger, "Instances()")
+	return cis, err
+}
+
+type errorUntil struct {
+	err error
+	exp time.Time
+	mtx sync.Mutex
+}
+
+func (eu *errorUntil) Check(err error, logger logrus.FieldLogger, method string) {
+	var exp time.Time
+	if err, ok := err.(cloud.RateLimitError); !ok {
+		return
+	} else {
+		exp = err.EarliestRetry()
+	}
+	eu.mtx.Lock()
+	defer eu.mtx.Unlock()
+	if exp.After(eu.exp) {
+		eu.err = err
+		eu.exp = exp
+		logger.WithFields(logrus.Fields{
+			"EarliestRetry": exp,
+			"Method":        method,
+		}).WithError(err).Warn("RateLimitError")
+	}
+}
+
+func (eu *errorUntil) Err() error {
+	eu.mtx.Lock()
+	defer eu.mtx.Unlock()
+	if eu.exp.After(time.Now()) {
+		return eu.err
+	}
+	return nil
 }
diff --git a/lib/dispatchcloud/instance_set_proxy_test.go b/lib/dispatchcloud/instance_set_proxy_test.go
new file mode 100644
index 000000000..011125ced
--- /dev/null
+++ b/lib/dispatchcloud/instance_set_proxy_test.go
@@ -0,0 +1,88 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+	"errors"
+	"time"
+
+	"git.curoverse.com/arvados.git/lib/cloud"
+	"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&InstanceSetProxySuite{})
+
+type InstanceSetProxySuite struct{}
+
+func (s *InstanceSetProxySuite) TestRateLimitError(c *check.C) {
+	logger := logrus.StandardLogger()
+	driver := &test.StubDriver{}
+	is, err := driver.InstanceSet(nil, "", logger)
+	c.Assert(err, check.IsNil)
+
+	var proxy cloud.InstanceSet
+	for _, testcase := range []struct {
+		induce  func(error)
+		attempt func() error
+	}{
+		// Rate limit error returned by Create()
+		{
+			func(err error) {
+				driver.InduceCreateError = err
+			},
+			func() error {
+				_, err := proxy.Create(arvados.InstanceType{}, cloud.ImageID(""), nil, nil)
+				return err
+			},
+		},
+		// Rate limit error returned by Instances()
+		{
+			func(err error) {
+				driver.InduceInstancesError = err
+			},
+			func() error {
+				_, err := proxy.Instances(nil)
+				return err
+			},
+		},
+	} {
+		var induced error
+		proxy = &instanceSetProxy{
+			InstanceSet: is,
+			Logger:      logger,
+		}
+		c.Check(testcase.attempt(), check.IsNil)
+
+		// A rate-limiting error whose timeout has already
+		// passed is returned from the call that encounters
+		// it, but the very next call gets passed through to
+		// the driver.
+		induced = test.RateLimitError{time.Now().Add(-time.Nanosecond)}
+		testcase.induce(induced)
+		c.Check(testcase.attempt(), check.Equals, induced)
+		testcase.induce(nil)
+		c.Check(testcase.attempt(), check.IsNil)
+
+		// Non-rate-limiting errors are not cached either.
+		induced = errors.New("spurious error")
+		testcase.induce(induced)
+		c.Check(testcase.attempt(), check.Equals, induced)
+		testcase.induce(nil)
+		c.Check(testcase.attempt(), check.IsNil)
+
+		// A rate-limit error gets returned from the call
+		// where it's encountered, as well as any subsequent
+		// calls, until its timeout expires.
+		induced = test.RateLimitError{time.Now().Add(time.Second)}
+		testcase.induce(induced)
+		c.Check(testcase.attempt(), check.Equals, induced)
+		testcase.induce(nil)
+		c.Check(testcase.attempt(), check.Equals, induced)
+		c.Check(testcase.attempt(), check.Equals, induced)
+	}
+}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 7c948f4d3..82ff1b681 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -41,6 +41,11 @@ type StubDriver struct {
 	// Destroy. 0=always succeed, 1=always fail.
 	ErrorRateDestroy float64
 
+	// If non-nil, return these errors from Create() and
+	// Instances() calls.
+	InduceCreateError    error
+	InduceInstancesError error
+
 	instanceSets []*StubInstanceSet
 }
 
@@ -71,6 +76,9 @@ type StubInstanceSet struct {
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+	if err := sis.driver.InduceCreateError; err != nil {
+		return nil, err
+	}
 	sis.mtx.Lock()
 	defer sis.mtx.Unlock()
 	if sis.stopped {
@@ -99,6 +107,9 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 }
 
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+	if err := sis.driver.InduceInstancesError; err != nil {
+		return nil, err
+	}
 	sis.mtx.RLock()
 	defer sis.mtx.RUnlock()
 	var r []cloud.Instance
@@ -117,6 +128,11 @@ func (sis *StubInstanceSet) Stop() {
 	sis.stopped = true
 }
 
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //

commit 262d59632f85b34ef4e2bcb1ee323a6e3b4435ed
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 21 15:19:53 2019 -0500

    14325: Move magic number to const.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index f9566241d..2c2d977d8 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -63,6 +63,10 @@ const (
 	defaultTimeoutBooting     = time.Minute * 10
 	defaultTimeoutProbe       = time.Minute * 10
 	defaultTimeoutShutdown    = time.Second * 10
+
+	// Time after a quota error to try again anyway, even if no
+	// instances have been shutdown.
+	quotaErrorTTL = time.Minute
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -247,7 +251,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 		}
 		if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
 			wp.atQuotaErr = err
-			wp.atQuotaUntil = time.Now().Add(time.Minute)
+			wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
 		}
 		if err != nil {
 			logger.WithError(err).Error("create failed")

commit 332f82b60422b3c445b3fd2d69ddd45915d23ff8
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 21 10:19:17 2019 -0500

    14325: Log when a worker's first probe succeeds.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 6532158c8..93677a896 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -52,8 +52,8 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 		CloudVMs: arvados.CloudVMs{
 			Driver:          "test",
 			SyncInterval:    arvados.Duration(10 * time.Millisecond),
-			TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
-			TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
+			TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
+			TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
 			TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
 			TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
 		},
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 1cab5f0c2..0872e594f 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -235,19 +235,29 @@ func (wkr *worker) probeAndUpdate() {
 		}
 	}
 	if wkr.state == StateUnknown || wkr.state == StateBooting {
+		// Note: this will change again below if
+		// len(wkr.starting)+len(wkr.running) > 0.
 		wkr.state = StateIdle
 		changed = true
 	}
-	if changed {
-		wkr.running = running
-		if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
-			wkr.state = StateRunning
-		} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
-			wkr.state = StateIdle
-		}
-		wkr.updated = updateTime
-		go wkr.wp.notify()
+	if !changed {
+		return
+	}
+
+	wkr.running = running
+	if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+		wkr.state = StateRunning
+	} else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+		wkr.state = StateIdle
+	}
+	wkr.updated = updateTime
+	if needProbeBooted {
+		logger.WithFields(logrus.Fields{
+			"RunningContainers": len(running),
+			"State":             wkr.state,
+		}).Info("probes succeeded, instance is in service")
 	}
+	go wkr.wp.notify()
 }
 
 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {

commit 38cf2cccc4c3ec520d3d7ab85dbe27f427f6d394
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 21 10:09:09 2019 -0500

    14325: Shutdown unknown-state worker after boot timeout.
    
    (instead of idle timeout)
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 1c72cc4b7..1cab5f0c2 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -293,7 +293,7 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
 		return
 	}
 	label, threshold := "", wkr.wp.timeoutProbe
-	if wkr.state == StateBooting {
+	if wkr.state == StateUnknown || wkr.state == StateBooting {
 		label, threshold = "new ", wkr.wp.timeoutBooting
 	}
 	if dur < threshold {

commit 3300a5ebde5463808e098e489d47b756d8613774
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Jan 21 09:38:58 2019 -0500

    14325: Include instance ID in log messages from stub VM.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index ab6d079b9..7c948f4d3 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -187,7 +187,10 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
 		svm.Unlock()
 		time.Sleep(svm.CrunchRunDetachDelay)
 		fmt.Fprintf(stderr, "starting %s\n", uuid)
-		logger := logrus.WithField("ContainerUUID", uuid)
+		logger := logrus.WithFields(logrus.Fields{
+			"Instance":      svm.id,
+			"ContainerUUID": uuid,
+		})
 		logger.Printf("[test] starting crunch-run stub")
 		go func() {
 			crashluck := math_rand.Float64()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list