[arvados] updated: 2.5.0-113-g494cb7431
git repository hosting
git at public.arvados.org
Wed Feb 15 17:10:34 UTC 2023
Summary of changes:
lib/dispatchcloud/dispatcher_test.go | 42 +++++++++++++++++++++++----
lib/dispatchcloud/scheduler/run_queue.go | 6 +++-
lib/dispatchcloud/scheduler/run_queue_test.go | 16 +++++-----
lib/dispatchcloud/scheduler/sync_test.go | 4 +--
lib/dispatchcloud/worker/pool.go | 11 +++++--
sdk/go/arvados/client.go | 3 +-
sdk/go/arvados/limiter.go | 28 ++++++++++++++----
7 files changed, 82 insertions(+), 28 deletions(-)
via 494cb743161590207494ea1fd9c32afdfb89c8d1 (commit)
via 28f11611d09fa18d8dd09073ff3dc89a8337fe53 (commit)
from 663bc76408fc7b1a7a02c0f82aa2bf003c30b78a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 494cb743161590207494ea1fd9c32afdfb89c8d1
Author: Tom Clegg <tom at curii.com>
Date: Wed Feb 15 12:09:42 2023 -0500
19973: Test throttling after receiving 503 error.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 2d486da5f..a9ed95c7c 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -6,11 +6,13 @@ package dispatchcloud
import (
"context"
+ "crypto/tls"
"encoding/json"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
+ "net/url"
"os"
"sync"
"time"
@@ -28,11 +30,12 @@ import (
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- ctx context.Context
- cancel context.CancelFunc
- cluster *arvados.Cluster
- stubDriver *test.StubDriver
- disp *dispatcher
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
+ error503Server *httptest.Server
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
@@ -100,6 +103,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
arvClient, err := arvados.NewClientFromConfig(s.cluster)
c.Check(err, check.IsNil)
+ s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+ arvClient.Client = &http.Client{
+ Transport: &http.Transport{
+ Proxy: s.arvClientProxy(c),
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true}}}
+
s.disp = &dispatcher{
Cluster: s.cluster,
Context: s.ctx,
@@ -115,6 +125,20 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
func (s *DispatcherSuite) TearDownTest(c *check.C) {
s.cancel()
s.disp.Close()
+ s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+ return func(req *http.Request) (*url.URL, error) {
+ if req.URL.Path == "/503" {
+ return url.Parse(s.error503Server.URL)
+ } else {
+ return nil, nil
+ }
+ }
}
// DispatchToStubDriver checks that the dispatcher wires everything
@@ -157,6 +181,10 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
return
}
delete(waiting, ctr.UUID)
+ if len(waiting) == 100 {
+ // trigger scheduler maxConcurrency limit
+ s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+ }
if len(waiting) == 0 {
close(done)
}
@@ -230,7 +258,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
- c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
@@ -250,6 +278,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+ c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
}
func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 057ff8d6e..d0074ae28 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -62,7 +62,11 @@ func (sch *Scheduler) runQueue() {
sch.maxConcurrency = max
}
}
- sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+ if sch.last503time.IsZero() {
+ sch.mLast503Time.Set(0)
+ } else {
+ sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+ }
sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
sch.logger.WithFields(logrus.Fields{
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 3abcba6c7..c270eef49 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -397,10 +397,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
wp.mtx.Lock()
defer wp.mtx.Unlock()
- wkr := wp.workers[inst.ID()]
+ wkr, ok := wp.workers[inst.ID()]
+ if !ok {
+ // race: inst was removed from the pool
+ return
+ }
if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
- // the node is not in booting state (can happen if a-d-c is restarted) OR
- // this is not the first SSH connection
+ // the node is not in booting state (can happen if
+ // a-d-c is restarted) OR this is not the first SSH
+ // connection
return
}
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 5ace3b9d8..4eed6086a 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -277,8 +277,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
}
resp, err := c.httpClient().Do(req)
- c.requestLimiter.Report(resp, err)
- if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
+ if c.requestLimiter.Report(resp, err) {
c.last503.Store(time.Now())
}
if err == nil {
diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go
index ecbe0b180..f62264c63 100644
--- a/sdk/go/arvados/limiter.go
+++ b/sdk/go/arvados/limiter.go
@@ -6,7 +6,9 @@ package arvados
import (
"context"
+ "errors"
"net/http"
+ "net/url"
"sync"
"time"
)
@@ -91,13 +93,24 @@ func (rl *requestLimiter) Release() {
// Report uses the return values from (*http.Client)Do() to adjust the
// outgoing request limit (increase on success, decrease on 503).
-func (rl *requestLimiter) Report(resp *http.Response, err error) {
- if err != nil {
- return
- }
+//
+// Return value is true if the response was a 503.
+func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
rl.lock.Lock()
defer rl.lock.Unlock()
- if resp.StatusCode == http.StatusServiceUnavailable {
+ is503 := false
+ if err != nil {
+ uerr := &url.Error{}
+ if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
+ // This is how http.Client reports 503 from proxy server
+ is503 = true
+ } else {
+ return false
+ }
+ } else {
+ is503 = resp.StatusCode == http.StatusServiceUnavailable
+ }
+ if is503 {
if rl.limit == 0 {
// Concurrency was unlimited until now.
// Calculate new limit based on actual
@@ -112,7 +125,9 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
// a second.
rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
}
- } else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+ return true
+ }
+ if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
// After each non-server-error response, increase
// concurrency limit by at least 10% -- but not beyond
// 2x the highest concurrency level we've seen without
@@ -127,4 +142,5 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
}
rl.cond.Broadcast()
}
+ return false
}
commit 28f11611d09fa18d8dd09073ff3dc89a8337fe53
Author: Tom Clegg <tom at curii.com>
Date: Fri Feb 10 12:17:34 2023 -0500
19973: Update tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 5b5fa960a..c6ed2187e 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -188,7 +188,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
running: map[string]time.Time{},
canCreate: 0,
}
- New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
@@ -244,7 +244,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
starts: []string{},
canCreate: 0,
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.sync()
sch.runQueue()
@@ -303,7 +303,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
starts: []string{},
canCreate: 1,
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
for i := 0; i < 30; i++ {
sch.runQueue()
sch.sync()
@@ -405,7 +405,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
},
}
queue.Update()
- New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
@@ -449,7 +449,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
},
}
queue.Update()
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
c.Check(pool.running, check.HasLen, 1)
sch.sync()
for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -482,7 +482,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
pool := stubPool{
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
}
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
@@ -494,7 +494,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
// 'over quota' metric will be 1 because no workers are available and canCreate defaults
// to zero.
pool = stubPool{}
- sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
@@ -527,7 +527,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
running: map[string]time.Time{},
}
- sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
sch.updateMetrics()
diff --git a/lib/dispatchcloud/scheduler/sync_test.go b/lib/dispatchcloud/scheduler/sync_test.go
index a3ff0636e..df254cd32 100644
--- a/lib/dispatchcloud/scheduler/sync_test.go
+++ b/lib/dispatchcloud/scheduler/sync_test.go
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.sync()
ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
// Sync shouldn't cancel the container because it might be
// running on the VM with state=="unknown".
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list