[ARVADOS] created: 1.3.0-539-gfe8ba4d73

Git user git at public.curoverse.com
Tue Mar 19 21:02:43 UTC 2019


        at  fe8ba4d738d1680f40b28ea01d08efea4a16574c (commit)


commit fe8ba4d738d1680f40b28ea01d08efea4a16574c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Mar 19 17:02:05 2019 -0400

    14807: Report queue metrics.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
index 4e807a12a..af17aaf39 100644
--- a/lib/dispatchcloud/container/queue.go
+++ b/lib/dispatchcloud/container/queue.go
@@ -53,7 +53,6 @@ func (c *QueueEnt) String() string {
 // cache up to date.
 type Queue struct {
 	logger     logrus.FieldLogger
-	reg        *prometheus.Registry
 	chooseType typeChooser
 	client     APIClient
 
@@ -79,14 +78,17 @@ type Queue struct {
 // Arvados cluster's queue during Update, chooseType will be called to
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
-	return &Queue{
+	cq := &Queue{
 		logger:      logger,
-		reg:         reg,
 		chooseType:  chooseType,
 		client:      client,
 		current:     map[string]QueueEnt{},
 		subscribers: map[<-chan struct{}]chan struct{}{},
 	}
+	if reg != nil {
+		go cq.runMetrics(reg)
+	}
+	return cq
 }
 
 // Subscribe returns a channel that becomes ready to receive when an
@@ -487,3 +489,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
 	}
 	return results, nil
 }
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+	mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "arvados",
+		Subsystem: "dispatchcloud",
+		Name:      "queue_entries",
+		Help:      "Number of active container entries in the controller database.",
+	}, []string{"state", "instance_type"})
+	reg.MustRegister(mEntries)
+
+	type entKey struct {
+		state arvados.ContainerState
+		inst  string
+	}
+	count := map[entKey]int{}
+
+	ch := cq.Subscribe()
+	defer cq.Unsubscribe(ch)
+	for range ch {
+		for k := range count {
+			count[k] = 0
+		}
+		ents, _ := cq.Entries()
+		for _, ent := range ents {
+			count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+		}
+		for k, v := range count {
+			mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+		}
+	}
+}

commit 2986810337995e8cd4876eb20096da915cea9edf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Mar 19 13:40:10 2019 -0400

    14807: Add "kill instance" management API.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index adf1028b3..52ddbf56b 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -39,6 +39,7 @@ type pool interface {
 	scheduler.WorkerPool
 	Instances() []worker.InstanceView
 	SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+	KillInstance(id cloud.InstanceID, reason string) error
 	Stop()
 }
 
@@ -147,6 +148,7 @@ func (disp *dispatcher) initialize() {
 		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
 		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
 		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
 		metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
 			ErrorLog: disp.logger,
 		})
@@ -212,6 +214,20 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
 	disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
 }
 
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+	id := cloud.InstanceID(r.FormValue("instance_id"))
+	if id == "" {
+		httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+		return
+	}
+	err := disp.pool.KillInstance(id)
+	if err != nil {
+		httpserver.Error(w, err.Error(), http.StatusNotFound)
+		return
+	}
+}
+
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
 	id := cloud.InstanceID(r.FormValue("instance_id"))
 	if id == "" {
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 81a658535..f3ac4b7c1 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -691,6 +691,21 @@ func (wp *Pool) Instances() []InstanceView {
 	return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+	wkr, ok := wp.workers[id]
+	if !ok {
+		return errors.New("instance not found")
+	}
+	logger.WithFields(logrus.Fields{
+		"Instance": wkr.instance,
+		"Reason":   reason,
+	}).Info("shutting down")
+	wkr.shutdown()
+	return nil
+}
+
 func (wp *Pool) setup() {
 	wp.creating = map[string]createCall{}
 	wp.exited = map[string]time.Time{}

commit fad0a6c5a1c0675f9c52d3d18ce1573847d66c0f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Mar 18 17:27:40 2019 -0400

    14807: Configurable rate limit for cloud provider API calls.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 44d5a0ae7..d1ee4b135 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -49,12 +49,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
 	s.cluster = &arvados.Cluster{
 		CloudVMs: arvados.CloudVMs{
-			Driver:          "test",
-			SyncInterval:    arvados.Duration(10 * time.Millisecond),
-			TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
-			TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
-			TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
-			TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+			Driver:               "test",
+			SyncInterval:         arvados.Duration(10 * time.Millisecond),
+			TimeoutIdle:          arvados.Duration(150 * time.Millisecond),
+			TimeoutBooting:       arvados.Duration(150 * time.Millisecond),
+			TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
+			TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
+			MaxCloudOpsPerSecond: 500,
 		},
 		Dispatch: arvados.Dispatch{
 			PrivateKey:         string(dispatchprivraw),
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index 0343f85b9..eb1e48737 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -6,12 +6,14 @@ package dispatchcloud
 
 import (
 	"fmt"
+	"time"
 
 	"git.curoverse.com/arvados.git/lib/cloud"
 	"git.curoverse.com/arvados.git/lib/cloud/azure"
 	"git.curoverse.com/arvados.git/lib/cloud/ec2"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/sirupsen/logrus"
+	"golang.org/x/crypto/ssh"
 )
 
 var drivers = map[string]cloud.Driver{
@@ -24,5 +26,33 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
 	if !ok {
 		return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
 	}
-	return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+	is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+	if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+		is = &rateLimitedInstanceSet{
+			InstanceSet: is,
+			ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
+		}
+	}
+	return is, err
+}
+
+type rateLimitedInstanceSet struct {
+	cloud.InstanceSet
+	ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+	<-is.ticker.C
+	inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+	return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+	cloud.Instance
+	ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+	<-inst.ticker.C
+	return inst.Instance.Destroy()
 }
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 73addb739..7c87ff029 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -154,6 +154,9 @@ type CloudVMs struct {
 	// Time after shutdown to retry shutdown
 	TimeoutShutdown Duration
 
+	// Maximum create/destroy-instance operations per second
+	MaxCloudOpsPerSecond int
+
 	ImageID string
 
 	Driver           string

commit 7858b49698e1d2c1d619b19b6db2978f2ab067be
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Mar 18 16:32:58 2019 -0400

    14807: Drain instances that crunch-run reports broken.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 7268f106a..44d5a0ae7 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -157,6 +157,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
 			stubvm.CrunchRunMissing = true
 		default:
 			stubvm.CrunchRunCrashRate = 0.1
+			stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)+200) * time.Millisecond)
 		}
 	}
 
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index a4521eab7..02346a970 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -181,6 +181,7 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 type StubVM struct {
 	Boot                  time.Time
 	Broken                time.Time
+	ReportBroken          time.Time
 	CrunchRunMissing      bool
 	CrunchRunCrashRate    float64
 	CrunchRunDetachDelay  time.Duration
@@ -314,6 +315,9 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
 		for uuid := range svm.running {
 			fmt.Fprintf(stdout, "%s\n", uuid)
 		}
+		if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+			fmt.Fprintln(stdout, "broken")
+		}
 		return 0
 	}
 	if strings.HasPrefix(command, "crunch-run --kill ") {
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 41117c1d4..49c5057b3 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -5,7 +5,6 @@
 package worker
 
 import (
-	"bytes"
 	"fmt"
 	"strings"
 	"sync"
@@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
 			logger.Info("instance booted; will try probeRunning")
 		}
 	}
+	reportedBroken := false
 	if booted || wkr.state == StateUnknown {
-		ctrUUIDs, ok = wkr.probeRunning()
+		ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
 	}
 	wkr.mtx.Lock()
 	defer wkr.mtx.Unlock()
+	if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+		logger.Info("probe reported broken instance")
+		wkr.setIdleBehavior(IdleBehaviorDrain)
+	}
 	if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
 		if wkr.state == StateShutdown && wkr.updated.After(updated) {
 			// Skip the logging noise if shutdown was
@@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() {
 	go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
 	cmd := "crunch-run --list"
 	if u := wkr.instance.RemoteUser(); u != "root" {
 		cmd = "sudo " + cmd
@@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
 			"stdout":  string(stdout),
 			"stderr":  string(stderr),
 		}).WithError(err).Warn("probe failed")
-		return nil, false
+		return
 	}
-	stdout = bytes.TrimRight(stdout, "\n")
-	if len(stdout) == 0 {
-		return nil, true
+	ok = true
+	for _, s := range strings.Split(string(stdout), "\n") {
+		if s == "broken" {
+			reportsBroken = true
+		} else if s != "" {
+			running = append(running, s)
+		}
 	}
-	return strings.Split(string(stdout), "\n"), true
+	return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go
index 933692bdc..852ccb6ec 100644
--- a/services/crunch-run/background.go
+++ b/services/crunch-run/background.go
@@ -20,6 +20,7 @@ var (
 	lockdir    = "/var/lock"
 	lockprefix = "crunch-run-"
 	locksuffix = ".lock"
+	brokenfile = "crunch-run-broken"
 )
 
 // procinfo is saved in each process's lockfile.
@@ -146,7 +147,10 @@ func ListProcesses(stdout, stderr io.Writer) int {
 		if info.IsDir() && path != walkdir {
 			return filepath.SkipDir
 		}
-		if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+		if name := info.Name(); name == brokenfile {
+			fmt.Fprintln(stdout, "broken")
+			return nil
+		} else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
 			return nil
 		}
 		if info.Size() == 0 {
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 0576337aa..3925b0b7b 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
 	if *brokenNodeHook == "" {
-		runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+		path := filepath.Join(lockdir, brokenfile)
+		runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+		f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+		if err != nil {
+			runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+			return
+		}
+		f.Close()
 	} else {
 		runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
 		// run killme script
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 17e5e1458..60729c019 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -2049,7 +2049,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
 
 	c.Check(api.CalledWith("container.state", "Queued"), NotNil)
 	c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-	c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+	c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 }
 
 func (s *TestSuite) TestFullBrokenDocker3(c *C) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list