[arvados] updated: 2.5.0-113-g494cb7431

git repository hosting git at public.arvados.org
Wed Feb 15 17:10:34 UTC 2023


Summary of changes:
 lib/dispatchcloud/dispatcher_test.go          | 42 +++++++++++++++++++++++----
 lib/dispatchcloud/scheduler/run_queue.go      |  6 +++-
 lib/dispatchcloud/scheduler/run_queue_test.go | 16 +++++-----
 lib/dispatchcloud/scheduler/sync_test.go      |  4 +--
 lib/dispatchcloud/worker/pool.go              | 11 +++++--
 sdk/go/arvados/client.go                      |  3 +-
 sdk/go/arvados/limiter.go                     | 28 ++++++++++++++----
 7 files changed, 82 insertions(+), 28 deletions(-)

       via  494cb743161590207494ea1fd9c32afdfb89c8d1 (commit)
       via  28f11611d09fa18d8dd09073ff3dc89a8337fe53 (commit)
      from  663bc76408fc7b1a7a02c0f82aa2bf003c30b78a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 494cb743161590207494ea1fd9c32afdfb89c8d1
Author: Tom Clegg <tom at curii.com>
Date:   Wed Feb 15 12:09:42 2023 -0500

    19973: Test throttling after receiving 503 error.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 2d486da5f..a9ed95c7c 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -6,11 +6,13 @@ package dispatchcloud
 
 import (
 	"context"
+	"crypto/tls"
 	"encoding/json"
 	"io/ioutil"
 	"math/rand"
 	"net/http"
 	"net/http/httptest"
+	"net/url"
 	"os"
 	"sync"
 	"time"
@@ -28,11 +30,12 @@ import (
 var _ = check.Suite(&DispatcherSuite{})
 
 type DispatcherSuite struct {
-	ctx        context.Context
-	cancel     context.CancelFunc
-	cluster    *arvados.Cluster
-	stubDriver *test.StubDriver
-	disp       *dispatcher
+	ctx            context.Context
+	cancel         context.CancelFunc
+	cluster        *arvados.Cluster
+	stubDriver     *test.StubDriver
+	disp           *dispatcher
+	error503Server *httptest.Server
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
@@ -100,6 +103,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 	arvClient, err := arvados.NewClientFromConfig(s.cluster)
 	c.Check(err, check.IsNil)
 
+	s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+	arvClient.Client = &http.Client{
+		Transport: &http.Transport{
+			Proxy: s.arvClientProxy(c),
+			TLSClientConfig: &tls.Config{
+				InsecureSkipVerify: true}}}
+
 	s.disp = &dispatcher{
 		Cluster:   s.cluster,
 		Context:   s.ctx,
@@ -115,6 +125,20 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 func (s *DispatcherSuite) TearDownTest(c *check.C) {
 	s.cancel()
 	s.disp.Close()
+	s.error503Server.Close()
+}
+
+// Intercept outgoing API requests for "/503" and respond HTTP
+// 503. This lets us force (*arvados.Client)Last503() to return
+// something.
+func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
+	return func(req *http.Request) (*url.URL, error) {
+		if req.URL.Path == "/503" {
+			return url.Parse(s.error503Server.URL)
+		} else {
+			return nil, nil
+		}
+	}
 }
 
 // DispatchToStubDriver checks that the dispatcher wires everything
@@ -157,6 +181,10 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 			return
 		}
 		delete(waiting, ctr.UUID)
+		if len(waiting) == 100 {
+			// trigger scheduler maxConcurrency limit
+			s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
+		}
 		if len(waiting) == 0 {
 			close(done)
 		}
@@ -230,7 +258,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="0",operation="Destroy"} [^0].*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="Create"} [^0].*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*driver_operations{error="1",operation="List"} 0\n.*`)
-	c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} 0.*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="aborted"} [0-9]+\n.*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="disappeared"} [^0].*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="failure"} [^0].*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*boot_outcomes{outcome="success"} [^0].*`)
@@ -250,6 +278,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="success"} [0-9e+.]*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_count{outcome="fail"} [0-9]*`)
 	c.Check(resp.Body.String(), check.Matches, `(?ms).*run_probe_duration_seconds_sum{outcome="fail"} [0-9e+.]*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*last_503_time [1-9][0-9e+.]*`)
+	c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
 }
 
 func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
index 057ff8d6e..d0074ae28 100644
--- a/lib/dispatchcloud/scheduler/run_queue.go
+++ b/lib/dispatchcloud/scheduler/run_queue.go
@@ -62,7 +62,11 @@ func (sch *Scheduler) runQueue() {
 			sch.maxConcurrency = max
 		}
 	}
-	sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+	if sch.last503time.IsZero() {
+		sch.mLast503Time.Set(0)
+	} else {
+		sch.mLast503Time.Set(float64(sch.last503time.Unix()))
+	}
 	sch.mMaxContainerConcurrency.Set(float64(sch.maxConcurrency))
 
 	sch.logger.WithFields(logrus.Fields{
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 3abcba6c7..c270eef49 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -397,10 +397,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
 	wp.mtx.Lock()
 	defer wp.mtx.Unlock()
-	wkr := wp.workers[inst.ID()]
+	wkr, ok := wp.workers[inst.ID()]
+	if !ok {
+		// race: inst was removed from the pool
+		return
+	}
 	if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
-		// the node is not in booting state (can happen if a-d-c is restarted) OR
-		// this is not the first SSH connection
+		// the node is not in booting state (can happen if
+		// a-d-c is restarted) OR this is not the first SSH
+		// connection
 		return
 	}
 
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 5ace3b9d8..4eed6086a 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -277,8 +277,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
 	}
 
 	resp, err := c.httpClient().Do(req)
-	c.requestLimiter.Report(resp, err)
-	if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
+	if c.requestLimiter.Report(resp, err) {
 		c.last503.Store(time.Now())
 	}
 	if err == nil {
diff --git a/sdk/go/arvados/limiter.go b/sdk/go/arvados/limiter.go
index ecbe0b180..f62264c63 100644
--- a/sdk/go/arvados/limiter.go
+++ b/sdk/go/arvados/limiter.go
@@ -6,7 +6,9 @@ package arvados
 
 import (
 	"context"
+	"errors"
 	"net/http"
+	"net/url"
 	"sync"
 	"time"
 )
@@ -91,13 +93,24 @@ func (rl *requestLimiter) Release() {
 
 // Report uses the return values from (*http.Client)Do() to adjust the
 // outgoing request limit (increase on success, decrease on 503).
-func (rl *requestLimiter) Report(resp *http.Response, err error) {
-	if err != nil {
-		return
-	}
+//
+// Return value is true if the response was a 503.
+func (rl *requestLimiter) Report(resp *http.Response, err error) bool {
 	rl.lock.Lock()
 	defer rl.lock.Unlock()
-	if resp.StatusCode == http.StatusServiceUnavailable {
+	is503 := false
+	if err != nil {
+		uerr := &url.Error{}
+		if errors.As(err, &uerr) && uerr.Err.Error() == "Service Unavailable" {
+			// This is how http.Client reports 503 from proxy server
+			is503 = true
+		} else {
+			return false
+		}
+	} else {
+		is503 = resp.StatusCode == http.StatusServiceUnavailable
+	}
+	if is503 {
 		if rl.limit == 0 {
 			// Concurrency was unlimited until now.
 			// Calculate new limit based on actual
@@ -112,7 +125,9 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
 			// a second.
 			rl.quietUntil = time.Now().Add(requestLimiterQuietPeriod)
 		}
-	} else if resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
+		return true
+	}
+	if err == nil && resp.StatusCode >= 200 && resp.StatusCode < 400 && rl.limit > 0 {
 		// After each non-server-error response, increase
 		// concurrency limit by at least 10% -- but not beyond
 		// 2x the highest concurrency level we've seen without
@@ -127,4 +142,5 @@ func (rl *requestLimiter) Report(resp *http.Response, err error) {
 		}
 		rl.cond.Broadcast()
 	}
+	return false
 }

commit 28f11611d09fa18d8dd09073ff3dc89a8337fe53
Author: Tom Clegg <tom at curii.com>
Date:   Fri Feb 10 12:17:34 2023 -0500

    19973: Update tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
index 5b5fa960a..c6ed2187e 100644
--- a/lib/dispatchcloud/scheduler/run_queue_test.go
+++ b/lib/dispatchcloud/scheduler/run_queue_test.go
@@ -188,7 +188,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 		running:   map[string]time.Time{},
 		canCreate: 0,
 	}
-	New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
 	c.Check(pool.running, check.HasLen, 1)
@@ -244,7 +244,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 			starts:    []string{},
 			canCreate: 0,
 		}
-		sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+		sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 		sch.runQueue()
 		sch.sync()
 		sch.runQueue()
@@ -303,7 +303,7 @@ func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
 		starts:    []string{},
 		canCreate: 1,
 	}
-	sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	for i := 0; i < 30; i++ {
 		sch.runQueue()
 		sch.sync()
@@ -405,7 +405,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
 		},
 	}
 	queue.Update()
-	New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+	New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
 	c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
 	c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
 	running := map[string]bool{}
@@ -449,7 +449,7 @@ func (*SchedulerSuite) TestKillNonexistentContainer(c *check.C) {
 		},
 	}
 	queue.Update()
-	sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	c.Check(pool.running, check.HasLen, 1)
 	sch.sync()
 	for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
@@ -482,7 +482,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	pool := stubPool{
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 	}
-	sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -494,7 +494,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 	// 'over quota' metric will be 1 because no workers are available and canCreate defaults
 	// to zero.
 	pool = stubPool{}
-	sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	sch.runQueue()
 	sch.updateMetrics()
 
@@ -527,7 +527,7 @@ func (*SchedulerSuite) TestContainersMetrics(c *check.C) {
 		unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
 		running: map[string]time.Time{},
 	}
-	sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch = New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	sch.runQueue()
 	sch.updateMetrics()
 
diff --git a/lib/dispatchcloud/scheduler/sync_test.go b/lib/dispatchcloud/scheduler/sync_test.go
index a3ff0636e..df254cd32 100644
--- a/lib/dispatchcloud/scheduler/sync_test.go
+++ b/lib/dispatchcloud/scheduler/sync_test.go
@@ -48,7 +48,7 @@ func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 	sch.sync()
 
 	ents, _ = queue.Entries()
@@ -80,7 +80,7 @@ func (*SchedulerSuite) TestCancelOrphanedContainers(c *check.C) {
 	ents, _ := queue.Entries()
 	c.Check(ents, check.HasLen, 1)
 
-	sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+	sch := New(ctx, arvados.NewClientFromEnv(), &queue, &pool, nil, time.Millisecond, time.Millisecond)
 
 	// Sync shouldn't cancel the container because it might be
 	// running on the VM with state=="unknown".

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list