[ARVADOS] updated: 1.3.0-183-g3bf30090d

Git user git at public.curoverse.com
Fri Jan 18 13:29:53 EST 2019


Summary of changes:
 lib/dispatchcloud/container/queue.go            |  15 ++-
 lib/dispatchcloud/container/queue_test.go       | 122 ++++++++++++++++++++++++
 lib/dispatchcloud/ssh_executor/executor_test.go |  47 +++++++--
 lib/dispatchcloud/worker/pool.go                |  11 +++
 sdk/go/arvados/client.go                        |   9 +-
 sdk/go/arvadostest/fixtures.go                  |   4 +
 6 files changed, 198 insertions(+), 10 deletions(-)
 create mode 100644 lib/dispatchcloud/container/queue_test.go

       via  3bf30090d15962ea34b761ce8ca5b43a972ba7f1 (commit)
       via  003fe66cd741d4ac7e841da56eda30d7ea88f392 (commit)
       via  f696f142eb5dcc2b5daac56ea38f457c4106a8a7 (commit)
       via  3eab0b129b4756a665b4ff0143ef6335295ed1cb (commit)
       via  06b0ca6bdf1cb278f361d6eebcf9fe965c4f350f (commit)
      from  1d80809a16fc97d7351824d2c921578133a93f65 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3bf30090d15962ea34b761ce8ca5b43a972ba7f1
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Jan 18 13:29:19 2019 -0500

    14325: Cancel containers with unsatisfiable runtime constraints.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
index 965407e51..1d126540a 100644
--- a/lib/dispatchcloud/container/queue.go
+++ b/lib/dispatchcloud/container/queue.go
@@ -212,8 +212,19 @@ func (cq *Queue) Update() error {
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
 	it, err := cq.chooseType(&ctr)
 	if err != nil {
-		// FIXME: throttle warnings, cancel after timeout
-		cq.logger.Warnf("cannot run %s", &ctr)
+		cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+		go func() {
+			err := cq.Cancel(ctr.UUID)
+			if err != nil {
+				if ctr, ok := cq.Get(ctr.UUID); !ok || ctr.State == arvados.ContainerStateCancelled {
+					// Don't bother logging if the
+					// failure comes from losing a
+					// race.
+					return
+				}
+				cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("cancel failed")
+			}
+		}()
 		return
 	}
 	cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
diff --git a/lib/dispatchcloud/container/queue_test.go b/lib/dispatchcloud/container/queue_test.go
index 9d2f83090..aa17a0585 100644
--- a/lib/dispatchcloud/container/queue_test.go
+++ b/lib/dispatchcloud/container/queue_test.go
@@ -5,6 +5,7 @@
 package container
 
 import (
+	"errors"
 	"sync"
 	"testing"
 	"time"
@@ -24,9 +25,18 @@ var _ = check.Suite(&IntegrationSuite{})
 
 type IntegrationSuite struct{}
 
-func (*IntegrationSuite) TestControllerBackedQueue(c *check.C) {
+func (suite *IntegrationSuite) TearDownTest(c *check.C) {
+	err := arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+	c.Check(err, check.IsNil)
+}
+
+func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
+	typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+		return arvados.InstanceType{Name: "testType"}, nil
+	}
+
 	client := arvados.NewClientFromEnv()
-	cq := NewQueue(logrus.StandardLogger(), nil, testTypeChooser, client)
+	cq := NewQueue(logrus.StandardLogger(), nil, typeChooser, client)
 
 	err := cq.Update()
 	c.Check(err, check.IsNil)
@@ -77,6 +87,36 @@ func (*IntegrationSuite) TestControllerBackedQueue(c *check.C) {
 	c.Check(err, check.ErrorMatches, `.*State cannot change from Complete to Cancelled.*`)
 }
 
-func testTypeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
-	return arvados.InstanceType{Name: "testType"}, nil
+func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
+	errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+		return arvados.InstanceType{}, errors.New("no suitable type")
+	}
+
+	client := arvados.NewClientFromEnv()
+	cq := NewQueue(logrus.StandardLogger(), nil, errorTypeChooser, client)
+
+	var ctr arvados.Container
+	err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+	c.Check(err, check.IsNil)
+	c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+
+	cq.Update()
+
+	// Wait for the cancel operation to take effect. Container
+	// will have state=Cancelled or just disappear from the queue.
+	suite.waitfor(c, time.Second, func() bool {
+		err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+		return err == nil && ctr.State == arvados.ContainerStateCancelled
+	})
+
+}
+
+func (suite *IntegrationSuite) waitfor(c *check.C, timeout time.Duration, fn func() bool) {
+	defer func() {
+		c.Check(fn(), check.Equals, true)
+	}()
+	deadline := time.Now().Add(timeout)
+	for !fn() && time.Now().Before(deadline) {
+		time.Sleep(timeout / 1000)
+	}
 }

commit 003fe66cd741d4ac7e841da56eda30d7ea88f392
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 17 16:31:55 2019 -0500

    14325: Test dispatch container queue against controller.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/container/queue_test.go b/lib/dispatchcloud/container/queue_test.go
new file mode 100644
index 000000000..9d2f83090
--- /dev/null
+++ b/lib/dispatchcloud/container/queue_test.go
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"github.com/sirupsen/logrus"
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&IntegrationSuite{})
+
+type IntegrationSuite struct{}
+
+func (*IntegrationSuite) TestControllerBackedQueue(c *check.C) {
+	client := arvados.NewClientFromEnv()
+	cq := NewQueue(logrus.StandardLogger(), nil, testTypeChooser, client)
+
+	err := cq.Update()
+	c.Check(err, check.IsNil)
+
+	ents, threshold := cq.Entries()
+	c.Check(len(ents), check.Not(check.Equals), 0)
+	c.Check(time.Since(threshold) < time.Minute, check.Equals, true)
+	c.Check(time.Since(threshold) > 0, check.Equals, true)
+
+	_, ok := ents[arvadostest.QueuedContainerUUID]
+	c.Check(ok, check.Equals, true)
+
+	var wg sync.WaitGroup
+	for uuid, ent := range ents {
+		c.Check(ent.Container.UUID, check.Equals, uuid)
+		c.Check(ent.InstanceType.Name, check.Equals, "testType")
+		c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
+		c.Check(ent.Container.Priority > 0, check.Equals, true)
+
+		ctr, ok := cq.Get(uuid)
+		c.Check(ok, check.Equals, true)
+		c.Check(ctr.UUID, check.Equals, uuid)
+
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			err := cq.Unlock(uuid)
+			c.Check(err, check.NotNil)
+			err = cq.Lock(uuid)
+			c.Check(err, check.IsNil)
+			ctr, ok := cq.Get(uuid)
+			c.Check(ok, check.Equals, true)
+			c.Check(ctr.State, check.Equals, arvados.ContainerStateLocked)
+			err = cq.Lock(uuid)
+			c.Check(err, check.NotNil)
+			err = cq.Unlock(uuid)
+			c.Check(err, check.IsNil)
+			ctr, ok = cq.Get(uuid)
+			c.Check(ok, check.Equals, true)
+			c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+			err = cq.Unlock(uuid)
+			c.Check(err, check.NotNil)
+		}()
+	}
+	wg.Wait()
+
+	err = cq.Cancel(arvadostest.CompletedContainerUUID)
+	c.Check(err, check.ErrorMatches, `.*State cannot change from Complete to Cancelled.*`)
+}
+
+func testTypeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+	return arvados.InstanceType{Name: "testType"}, nil
+}
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index e0f248313..4f648e9b4 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -41,6 +41,10 @@ const (
 	QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
 	QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+	RunningContainerUUID = "zzzzz-dz642-runningcontainr"
+
+	CompletedContainerUUID = "zzzzz-dz642-compltcontainer"
+
 	ArvadosRepoUUID = "zzzzz-s0uqq-arvadosrepo0123"
 	ArvadosRepoName = "arvados"
 	FooRepoUUID     = "zzzzz-s0uqq-382brsig8rp3666"

commit f696f142eb5dcc2b5daac56ea38f457c4106a8a7
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 17 16:29:18 2019 -0500

    14325: Fix dropped request params when body not specified by caller.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index cca9f9bf1..787e01ab8 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -210,14 +210,19 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
 	if err != nil {
 		return err
 	}
-	if (method == "GET" || body != nil) && urlValues != nil {
-		// FIXME: what if params don't fit in URL
+	if urlValues == nil {
+		// Nothing to send
+	} else if method == "GET" || method == "HEAD" || body != nil {
+		// Must send params in query part of URL (FIXME: what
+		// if resulting URL is too long?)
 		u, err := url.Parse(urlString)
 		if err != nil {
 			return err
 		}
 		u.RawQuery = urlValues.Encode()
 		urlString = u.String()
+	} else {
+		body = strings.NewReader(urlValues.Encode())
 	}
 	req, err := http.NewRequest(method, urlString, body)
 	if err != nil {

commit 3eab0b129b4756a665b4ff0143ef6335295ed1cb
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 17 15:08:04 2019 -0500

    14325: Test that ssh_executor obeys a negative VerifyHostKey result.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 526840b13..f8565b4a7 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -6,6 +6,7 @@ package ssh_executor
 
 import (
 	"bytes"
+	"fmt"
 	"io"
 	"io/ioutil"
 	"net"
@@ -52,15 +53,49 @@ func (tt *testTarget) Port() string {
 	return p
 }
 
+type mitmTarget struct {
+	test.SSHService
+}
+
+func (*mitmTarget) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+	return fmt.Errorf("host key failed verification: %#v", key)
+}
+
 type ExecutorSuite struct{}
 
+func (s *ExecutorSuite) TestBadHostKey(c *check.C) {
+	_, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+	clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+	target := &mitmTarget{
+		SSHService: test.SSHService{
+			Exec: func(map[string]string, string, io.Reader, io.Writer, io.Writer) uint32 {
+				c.Error("Target Exec func called even though host key verification failed")
+				return 0
+			},
+			HostKey:        hostpriv,
+			AuthorizedKeys: []ssh.PublicKey{clientpub},
+		},
+	}
+
+	err := target.Start()
+	c.Check(err, check.IsNil)
+	c.Logf("target address %q", target.Address())
+	defer target.Close()
+
+	exr := New(target)
+	exr.SetSigners(clientpriv)
+
+	_, _, err = exr.Execute(nil, "true", nil)
+	c.Check(err, check.ErrorMatches, "host key failed verification: .*")
+}
+
 func (s *ExecutorSuite) TestExecute(c *check.C) {
 	command := `foo 'bar' "baz"`
 	stdinData := "foobar\nbaz\n"
 	_, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
 	clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
 	for _, exitcode := range []int{0, 1, 2} {
-		srv := &testTarget{
+		target := &testTarget{
 			SSHService: test.SSHService{
 				Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
 					c.Check(env["TESTVAR"], check.Equals, "test value")
@@ -89,12 +124,12 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 				AuthorizedKeys: []ssh.PublicKey{clientpub},
 			},
 		}
-		err := srv.Start()
+		err := target.Start()
 		c.Check(err, check.IsNil)
-		c.Logf("srv address %q", srv.Address())
-		defer srv.Close()
+		c.Logf("target address %q", target.Address())
+		defer target.Close()
 
-		exr := New(srv)
+		exr := New(target)
 		exr.SetSigners(clientpriv)
 
 		// Use the default target port (ssh). Execute will
@@ -111,7 +146,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 		c.Check(err, check.ErrorMatches, `.*connection refused.*`)
 
 		// Use the test server's listening port.
-		exr.SetTargetPort(srv.Port())
+		exr.SetTargetPort(target.Port())
 
 		done := make(chan bool)
 		go func() {

commit 06b0ca6bdf1cb278f361d6eebcf9fe965c4f350f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Jan 17 14:35:29 2019 -0500

    14325: Add total houly price metric.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index e69351317..f5dc01e27 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -137,6 +137,7 @@ type Pool struct {
 	setupOnce    sync.Once
 
 	mInstances         prometheus.Gauge
+	mInstancesPrice    prometheus.Gauge
 	mContainersRunning prometheus.Gauge
 	mVCPUs             prometheus.Gauge
 	mVCPUsInuse        prometheus.Gauge
@@ -481,6 +482,13 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
 		Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
 	})
 	reg.MustRegister(wp.mInstances)
+	wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
+		Namespace: "arvados",
+		Subsystem: "dispatchcloud",
+		Name:      "instances_price_total",
+		Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
+	})
+	reg.MustRegister(wp.mInstancesPrice)
 	wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
 		Namespace: "arvados",
 		Subsystem: "dispatchcloud",
@@ -531,8 +539,10 @@ func (wp *Pool) updateMetrics() {
 	wp.mtx.RLock()
 	defer wp.mtx.RUnlock()
 
+	var price float64
 	var alloc, cpu, cpuInuse, mem, memInuse int64
 	for _, wkr := range wp.workers {
+		price += wkr.instType.Price
 		cpu += int64(wkr.instType.VCPUs)
 		mem += int64(wkr.instType.RAM)
 		if len(wkr.running)+len(wkr.starting) == 0 {
@@ -543,6 +553,7 @@ func (wp *Pool) updateMetrics() {
 		memInuse += int64(wkr.instType.RAM)
 	}
 	wp.mInstances.Set(float64(len(wp.workers)))
+	wp.mInstancesPrice.Set(price)
 	wp.mContainersRunning.Set(float64(alloc))
 	wp.mVCPUs.Set(float64(cpu))
 	wp.mMemory.Set(float64(mem))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list