[ARVADOS] updated: 1.3.0-176-gdf9d7f8d6

Git user git at public.curoverse.com
Wed Jan 16 17:13:20 EST 2019


Summary of changes:
 apps/workbench/app/helpers/application_helper.rb   |   4 +
 .../app/views/users/_virtual_machines.html.erb     |   4 +-
 apps/workbench/package-build.version               |   1 +
 apps/workbench/test/the.patch                      |   3 +
 build/run-tests.sh                                 |   2 +
 doc/README.textile                                 |   2 +-
 doc/Rakefile                                       |   2 +-
 doc/_config.yml                                    |   1 +
 .../ssh-access-unix.html.textile.liquid            |  19 +-
 lib/cloud/azure.go                                 | 692 +++++++++++++++++++++
 lib/cloud/azure_test.go                            | 348 +++++++++++
 .../worker => cloud}/gocheck_test.go               |   2 +-
 lib/cloud/interfaces.go                            |  11 +-
 lib/controller/federation_test.go                  |   2 +-
 lib/controller/server_test.go                      |   2 +-
 lib/dispatchcloud/container/queue.go               |   2 +-
 lib/dispatchcloud/dispatcher.go                    |  52 +-
 lib/dispatchcloud/dispatcher_test.go               |   2 +-
 lib/dispatchcloud/driver.go                        |   9 +-
 lib/dispatchcloud/readme_states.txt                |   4 +
 lib/dispatchcloud/scheduler/run_queue.go           |   2 +-
 lib/dispatchcloud/scheduler/run_queue_test.go      |   2 +-
 lib/dispatchcloud/scheduler/scheduler.go           |   2 +-
 lib/dispatchcloud/scheduler/sync.go                |   2 +-
 lib/dispatchcloud/test/stub_driver.go              |   6 +-
 lib/dispatchcloud/worker/pool.go                   |  88 ++-
 lib/dispatchcloud/worker/pool_test.go              |  47 +-
 lib/dispatchcloud/worker/worker.go                 | 100 ++-
 lib/service/cmd.go                                 |   2 +-
 sdk/go/ctxlog/log.go                               |   2 +-
 sdk/go/dispatch/dispatch.go                        |   2 +-
 sdk/go/httpserver/logger.go                        |   2 +-
 sdk/go/httpserver/logger_test.go                   |   2 +-
 sdk/go/httpserver/metrics.go                       |   2 +-
 .../crunch-dispatch-local/crunch-dispatch-local.go |   2 +-
 .../crunch-dispatch-local_test.go                  |   2 +-
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |   2 +-
 .../crunch-dispatch-slurm_test.go                  |   2 +-
 services/crunch-dispatch-slurm/squeue_test.go      |   2 +-
 services/health/main.go                            |   2 +-
 services/keep-balance/balance.go                   |   2 +-
 services/keep-balance/balance_run_test.go          |   2 +-
 services/keep-balance/integration_test.go          |   2 +-
 services/keep-balance/main.go                      |   2 +-
 services/keep-balance/server.go                    |   2 +-
 services/keep-web/handler.go                       |   2 +-
 services/keep-web/main.go                          |   2 +-
 services/keepproxy/keepproxy.go                    |   2 +-
 services/keepstore/config.go                       |   2 +-
 services/keepstore/config_test.go                  |   2 +-
 services/ws/router.go                              |   2 +-
 services/ws/session_v0.go                          |   2 +-
 tools/arvbox/bin/arvbox                            |  18 +-
 tools/arvbox/lib/arvbox/docker/Dockerfile.base     |   4 +
 tools/arvbox/lib/arvbox/docker/Dockerfile.demo     |   6 +-
 tools/arvbox/lib/arvbox/docker/api-setup.sh        |  11 +-
 tools/arvbox/lib/arvbox/docker/common.sh           |  10 +-
 .../service/{api => certificate}/log/main/.gitstub |   0
 .../docker/service/{api => certificate}/log/run    |   0
 .../lib/arvbox/docker/service/certificate/run      |  81 +++
 .../arvbox/lib/arvbox/docker/service/composer/run  |   2 +-
 .../lib/arvbox/docker/service/gitolite/run-service |   2 +-
 .../lib/arvbox/docker/service/nginx/run-service    |  69 +-
 .../lib/arvbox/docker/service/ready/run-service    |   3 +-
 tools/arvbox/lib/arvbox/docker/service/ssh/run     |   2 +-
 .../lib/arvbox/docker/service/sso/run-service      |   8 +-
 .../arvbox/docker/service/websockets/run-service   |   4 +-
 .../arvbox/lib/arvbox/docker/service/workbench/run |   2 +
 .../arvbox/docker/service/workbench/run-service    |  10 +-
 .../service/{api => workbench2}/log/main/.gitstub  |   0
 .../docker/service/{api => workbench2}/log/run     |   0
 .../docker/service/{composer => workbench2}/run    |   2 +-
 .../arvbox/docker/service/workbench2/run-service   |  33 +
 vendor/vendor.json                                 | 106 +++-
 74 files changed, 1651 insertions(+), 183 deletions(-)
 create mode 100644 apps/workbench/package-build.version
 create mode 100644 apps/workbench/test/the.patch
 create mode 100644 lib/cloud/azure.go
 create mode 100644 lib/cloud/azure_test.go
 copy lib/{dispatchcloud/worker => cloud}/gocheck_test.go (93%)
 copy tools/arvbox/lib/arvbox/docker/service/{api => certificate}/log/main/.gitstub (100%)
 copy tools/arvbox/lib/arvbox/docker/service/{api => certificate}/log/run (100%)
 create mode 100755 tools/arvbox/lib/arvbox/docker/service/certificate/run
 copy tools/arvbox/lib/arvbox/docker/service/{api => workbench2}/log/main/.gitstub (100%)
 copy tools/arvbox/lib/arvbox/docker/service/{api => workbench2}/log/run (100%)
 copy tools/arvbox/lib/arvbox/docker/service/{composer => workbench2}/run (69%)
 create mode 100755 tools/arvbox/lib/arvbox/docker/service/workbench2/run-service

  discards  54643dadce1988a815b85a52349cb8a33cbdbf65 (commit)
  discards  de12a680c6cd6395a700aca5d03604465afa3df5 (commit)
       via  df9d7f8d64a2a51dea5552641f44afd3e43c636b (commit)
       via  2a018b15662ae5f0b30d1d11eb2d0ffa685964e0 (commit)
       via  efaa80ea0f18dfeaf344a5250fa37090451f0699 (commit)
       via  a7da936b6106bff17a7830124b1fd87ff01ad91e (commit)
       via  e62dc0ffea0983f6a17ef16b368a0dceb62b98ea (commit)
       via  a27348943c6e35eee983d6a333eb6977394c3f13 (commit)
       via  6756ee790a6c685b96bc1ec80b67e7d6c94e9846 (commit)
       via  3dca6408c9886ae3591486ed43410096b0a28488 (commit)
       via  f398e9d75d32fbfa6804041353997bf74195e489 (commit)
       via  443a0b96316ed46600dc5035193adae6ac4d1f74 (commit)
       via  855a0afbc604487ddaedaed0cc1a4ad6da34b602 (commit)
       via  1b2a4c98249e3a46242f7d10c00a70feeeb2a843 (commit)
       via  7f5b859e82a65ac27c403ff2416c471f9c770ac1 (commit)
       via  dc19260dea6ee0b7186a50245ad31c2cbe5e6b2a (commit)
       via  5dd601ec0484d193d5d71975dc1d6b85ea901f88 (commit)
       via  351505f3a2b2b4460a4214097d63f53ab00ec42f (commit)
       via  925644cc7c7e6761566a4059ac9ed9987653931e (commit)
       via  e3dc6df1694586eca13f2f20eb064b8b16d115e5 (commit)
       via  40e5a87a683bf463bac2f6a41beb8b2ee96bab27 (commit)
       via  146257d644f6149d5504aa0652286ddef6a2049e (commit)
       via  ce01f835b561911f87e0d264ac72dbabc552281f (commit)
       via  e7dfae5f99f9291789be40a6505738307137360f (commit)
       via  137f46c8040d22e70e4b17a495e4171c59a8033f (commit)
       via  ca801f2402482c9192f3c75461134c10834f2fcc (commit)
       via  95e21426a5a27c24f7aa561df53d88aee6e0360f (commit)
       via  abecf37b0d4028782f44d04540402a4857352735 (commit)
       via  c548afa3245de240910c0b8ca6cf2a9e8800555a (commit)
       via  3f649213bb9165b33a0ece597a4f852093cb409e (commit)
       via  ee8dedc9286a12d95c5a4d9eefe4daf4a40d2528 (commit)
       via  acc46e4fdbc9098ac2b4c522b17473b58b087013 (commit)
       via  b2b98cbc9c213d2831405b6eb37e0f52fa2e9e17 (commit)
       via  36c705beb497a5ef8348e5f977ee6ad6674c7d11 (commit)
       via  51c1daf863f3e1920f758f73b4e5d70ff2c706d6 (commit)
       via  e6bc5eeacc812ac41aadeec6fd7a907b5aab0237 (commit)
       via  1b2d73e101e1fb27de201b33142c081e80bb7fc6 (commit)
       via  76e3ea1d0d2f865272000a5e12f64266a90b3794 (commit)
       via  9c013e7742d72254ae747948f96237698198aa85 (commit)
       via  28aa8adcc90108922b1274f5568ce3c5745cca78 (commit)
       via  3259e9de3d048f7a0b27e67098811640e9da8230 (commit)
       via  2a6cb99cf7a21a273efe8dc793929b74149871f6 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (54643dadce1988a815b85a52349cb8a33cbdbf65)
            \
             N -- N -- N (df9d7f8d64a2a51dea5552641f44afd3e43c636b)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit df9d7f8d64a2a51dea5552641f44afd3e43c636b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Jan 16 17:09:40 2019 -0500

    14325: Add management API endpoints for drain/hold/run behavior.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 9a6ccded3..788463330 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -21,6 +21,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/auth"
 	"git.curoverse.com/arvados.git/sdk/go/httpserver"
+	"github.com/julienschmidt/httprouter"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@ const (
 type pool interface {
 	scheduler.WorkerPool
 	Instances() []worker.InstanceView
+	SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
 	Stop()
 }
 
@@ -135,14 +137,17 @@ func (disp *dispatcher) initialize() {
 			http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
 		})
 	} else {
-		mux := http.NewServeMux()
-		mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
-		mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+		mux := httprouter.New()
+		mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+		mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
+		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
+		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
+		mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
 		metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
 			ErrorLog: disp.logger,
 		})
-		mux.Handle("/metrics", metricsH)
-		mux.Handle("/metrics.json", metricsH)
+		mux.Handler("GET", "/metrics", metricsH)
+		mux.Handler("GET", "/metrics.json", metricsH)
 		disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
 	}
 }
@@ -169,10 +174,6 @@ func (disp *dispatcher) run() {
 
 // Management API: all active and queued containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
-	if r.Method != "GET" {
-		httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
-		return
-	}
 	var resp struct {
 		Items []container.QueueEnt
 	}
@@ -185,13 +186,34 @@ func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
 
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
-	if r.Method != "GET" {
-		httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
-		return
-	}
 	var resp struct {
 		Items []worker.InstanceView
 	}
 	resp.Items = disp.pool.Instances()
 	json.NewEncoder(w).Encode(resp)
 }
+
+// Management API: set idle behavior to "hold" for specified instance.
+func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
+	disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
+}
+
+// Management API: set idle behavior to "drain" for specified instance.
+func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
+	disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
+}
+
+// Management API: set idle behavior to "run" for specified instance.
+func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
+	disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
+}
+
+func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
+	params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
+	id := cloud.InstanceID(params.ByName("instance_id"))
+	err := disp.pool.SetIdleBehavior(id, want)
+	if err != nil {
+		httpserver.Error(w, err.Error(), http.StatusNotFound)
+		return
+	}
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index ec5fa21e2..1b24804b0 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -349,6 +349,12 @@
 			"revisionTime": "2015-08-17T12:26:01Z"
 		},
 		{
+			"checksumSHA1": "X7g98YfLr+zM7aN76AZvAfpZyfk=",
+			"path": "github.com/julienschmidt/httprouter",
+			"revision": "adbc77eec0d91467376ca515bc3a14b8434d0f18",
+			"revisionTime": "2018-04-11T15:45:01Z"
+		},
+		{
 			"checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
 			"path": "github.com/kevinburke/ssh_config",
 			"revision": "802051befeb51da415c46972b5caf36e7c33c53d",

commit 2a018b15662ae5f0b30d1d11eb2d0ffa685964e0
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Jan 16 17:09:34 2019 -0500

    14325: Add IdleBehavior to worker pool.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 25b57c6e5..e69351317 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -5,6 +5,7 @@
 package worker
 
 import (
+	"errors"
 	"io"
 	"sort"
 	"strings"
@@ -19,18 +20,19 @@ import (
 
 const (
 	tagKeyInstanceType = "InstanceType"
-	tagKeyHold         = "Hold"
+	tagKeyIdleBehavior = "IdleBehavior"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
-	Instance             string
+	Instance             cloud.InstanceID
 	Price                float64
 	ArvadosInstanceType  string
 	ProviderInstanceType string
 	LastContainerUUID    string
 	LastBusy             time.Time
 	WorkerState          string
+	IdleBehavior         IdleBehavior
 }
 
 // An Executor executes shell commands on a remote host.
@@ -173,7 +175,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type.  Workers in
+// hold/drain mode are not included.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 	wp.setupOnce.Do(wp.setup)
 	wp.mtx.RLock()
@@ -184,7 +187,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 		creating[it] = len(times)
 	}
 	for _, wkr := range wp.workers {
-		if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+		if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
 			continue
 		}
 		it := wkr.instType
@@ -222,7 +225,10 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 	if time.Now().Before(wp.atQuotaUntil) {
 		return wp.atQuotaErr
 	}
-	tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
+	tags := cloud.InstanceTags{
+		tagKeyInstanceType: it.Name,
+		tagKeyIdleBehavior: string(IdleBehaviorRun),
+	}
 	now := time.Now()
 	wp.creating[it] = append(wp.creating[it], now)
 	go func() {
@@ -259,6 +265,21 @@ func (wp *Pool) AtQuota() bool {
 	return time.Now().Before(wp.atQuotaUntil)
 }
 
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+	wp.mtx.Lock()
+	defer wp.mtx.Unlock()
+	wkr, ok := wp.workers[id]
+	if !ok {
+		return errors.New("requested instance does not exist")
+	}
+	wkr.idleBehavior = idleBehavior
+	wkr.saveTags()
+	wkr.shutdownIfIdle()
+	return nil
+}
+
 // Add or update worker attached to the given instance. Use
 // initialState if a new worker is created.
 //
@@ -274,32 +295,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 		if initialState == StateBooting && wkr.state == StateUnknown {
 			wkr.state = StateBooting
 		}
+		wkr.saveTags()
 		return wkr, false
 	}
-	if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
-		initialState = StateHold
+
+	// If an instance has a valid IdleBehavior tag when it first
+	// appears, initialize the new worker accordingly (this is how
+	// we restore IdleBehavior that was set by a prior dispatch
+	// process); otherwise, default to "run". After this,
+	// wkr.idleBehavior is the source of truth, and will only be
+	// changed via SetIdleBehavior().
+	idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+	if !validIdleBehavior[idleBehavior] {
+		idleBehavior = IdleBehaviorRun
 	}
+
 	logger := wp.logger.WithFields(logrus.Fields{
 		"InstanceType": it.Name,
 		"Instance":     inst,
 	})
-	logger.WithField("State", initialState).Infof("instance appeared in cloud")
+	logger.WithFields(logrus.Fields{
+		"State":        initialState,
+		"IdleBehavior": idleBehavior,
+	}).Infof("instance appeared in cloud")
 	now := time.Now()
 	wkr := &worker{
-		mtx:      &wp.mtx,
-		wp:       wp,
-		logger:   logger,
-		executor: wp.newExecutor(inst),
-		state:    initialState,
-		instance: inst,
-		instType: it,
-		appeared: now,
-		probed:   now,
-		busy:     now,
-		updated:  now,
-		running:  make(map[string]struct{}),
-		starting: make(map[string]struct{}),
-		probing:  make(chan struct{}, 1),
+		mtx:          &wp.mtx,
+		wp:           wp,
+		logger:       logger,
+		executor:     wp.newExecutor(inst),
+		state:        initialState,
+		idleBehavior: idleBehavior,
+		instance:     inst,
+		instType:     it,
+		appeared:     now,
+		probed:       now,
+		busy:         now,
+		updated:      now,
+		running:      make(map[string]struct{}),
+		starting:     make(map[string]struct{}),
+		probing:      make(chan struct{}, 1),
 	}
 	wp.workers[id] = wkr
 	return wkr, true
@@ -322,7 +357,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
 		// TODO: shutdown the worker with the longest idle
 		// time (Idle) or the earliest create time (Booting)
 		for _, wkr := range wp.workers {
-			if wkr.state == tryState && wkr.instType == it {
+			if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
 				logger.WithField("Instance", wkr.instance).Info("shutting down")
 				wkr.shutdown()
 				return true
@@ -590,18 +625,19 @@ func (wp *Pool) Instances() []InstanceView {
 	wp.mtx.Lock()
 	for _, w := range wp.workers {
 		r = append(r, InstanceView{
-			Instance:             w.instance.String(),
+			Instance:             w.instance.ID(),
 			Price:                w.instType.Price,
 			ArvadosInstanceType:  w.instType.Name,
 			ProviderInstanceType: w.instType.ProviderType,
 			LastContainerUUID:    w.lastUUID,
 			LastBusy:             w.busy,
 			WorkerState:          w.state.String(),
+			IdleBehavior:         w.idleBehavior,
 		})
 	}
 	wp.mtx.Unlock()
 	sort.Slice(r, func(i, j int) bool {
-		return strings.Compare(r[i].Instance, r[j].Instance) < 0
+		return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
 	})
 	return r
 }
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 5c9fbb38c..7e84613eb 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -47,6 +47,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 	lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
 	type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
 	type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+	type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
 	pool := &Pool{
 		logger:      logrus.StandardLogger(),
 		newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
@@ -54,6 +55,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 		instanceTypes: arvados.InstanceTypeMap{
 			type1.Name: type1,
 			type2.Name: type2,
+			type3.Name: type3,
 		},
 	}
 	notify := pool.Subscribe()
@@ -63,23 +65,39 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 
 	c.Check(pool.Unallocated()[type1], check.Equals, 0)
 	c.Check(pool.Unallocated()[type2], check.Equals, 0)
+	c.Check(pool.Unallocated()[type3], check.Equals, 0)
 	pool.Create(type2)
 	pool.Create(type1)
 	pool.Create(type2)
+	pool.Create(type3)
 	c.Check(pool.Unallocated()[type1], check.Equals, 1)
 	c.Check(pool.Unallocated()[type2], check.Equals, 2)
+	c.Check(pool.Unallocated()[type3], check.Equals, 1)
 
 	// Unblock the pending Create calls.
-	go lameInstanceSet.Release(3)
+	go lameInstanceSet.Release(4)
 
 	// Wait for each instance to either return from its Create
 	// call, or show up in a poll.
 	suite.wait(c, pool, notify, func() bool {
 		pool.mtx.RLock()
 		defer pool.mtx.RUnlock()
-		return len(pool.workers) == 3
+		return len(pool.workers) == 4
 	})
 
+	// Place type3 node on admin-hold
+	for _, instv := range pool.Instances() {
+		if instv.ArvadosInstanceType == type3.Name {
+			pool.SetIdleBehavior(instv.Instance, IdleBehaviorHold)
+			break
+		}
+	}
+	c.Check(pool.Shutdown(type3), check.Equals, false)
+	suite.wait(c, pool, notify, func() bool {
+		return pool.Unallocated()[type3] == 0
+	})
+
+	// Shutdown both type2 nodes
 	c.Check(pool.Shutdown(type2), check.Equals, true)
 	suite.wait(c, pool, notify, func() bool {
 		return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
@@ -99,16 +117,35 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 		}
 		break
 	}
+
+	// Shutdown type1 node
 	c.Check(pool.Shutdown(type1), check.Equals, true)
 	suite.wait(c, pool, notify, func() bool {
-		return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+		return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
 	})
 	select {
 	case <-notify2:
 	case <-time.After(time.Second):
 		c.Error("notify did not receive")
 	}
-	go lameInstanceSet.Release(3) // unblock Destroy calls
+
+	// Place type3 node on admin-drain so it shuts down right away
+	for _, instv := range pool.Instances() {
+		if instv.ArvadosInstanceType == type3.Name {
+			pool.SetIdleBehavior(instv.Instance, IdleBehaviorDrain)
+			break
+		}
+	}
+	suite.wait(c, pool, notify, func() bool {
+		return pool.Unallocated()[type3] == 0
+	})
+
+	go lameInstanceSet.Release(4) // unblock Destroy calls
+
+	suite.wait(c, pool, notify, func() bool {
+		pool.getInstancesAndSync()
+		return len(pool.Instances()) == 0
+	})
 }
 
 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 1adc7c29f..1c72cc4b7 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -15,6 +15,11 @@ import (
 	"github.com/sirupsen/logrus"
 )
 
+const (
+	// TODO: configurable
+	maxPingFailTime = 10 * time.Minute
+)
+
 // State indicates whether a worker is available to do work, and (if
 // not) whether/when it is expected to become ready.
 type State int
@@ -25,12 +30,6 @@ const (
 	StateIdle                  // instance booted, no containers are running
 	StateRunning               // instance is running one or more containers
 	StateShutdown              // worker has stopped monitoring the instance
-	StateHold                  // running, but not available to run new containers
-)
-
-const (
-	// TODO: configurable
-	maxPingFailTime = 10 * time.Minute
 )
 
 var stateString = map[State]string{
@@ -39,7 +38,6 @@ var stateString = map[State]string{
 	StateIdle:     "idle",
 	StateRunning:  "running",
 	StateShutdown: "shutdown",
-	StateHold:     "hold",
 }
 
 // String implements fmt.Stringer.
@@ -53,26 +51,42 @@ func (s State) MarshalText() ([]byte, error) {
 	return []byte(stateString[s]), nil
 }
 
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+	IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
+	IdleBehaviorHold               = "hold"  // don't shutdown or run more containers
+	IdleBehaviorDrain              = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+	IdleBehaviorRun:   true,
+	IdleBehaviorHold:  true,
+	IdleBehaviorDrain: true,
+}
+
 type worker struct {
 	logger   logrus.FieldLogger
 	executor Executor
 	wp       *Pool
 
-	mtx       sync.Locker // must be wp's Locker.
-	state     State
-	instance  cloud.Instance
-	instType  arvados.InstanceType
-	vcpus     int64
-	memory    int64
-	appeared  time.Time
-	probed    time.Time
-	updated   time.Time
-	busy      time.Time
-	destroyed time.Time
-	lastUUID  string
-	running   map[string]struct{} // remember to update state idle<->running when this changes
-	starting  map[string]struct{} // remember to update state idle<->running when this changes
-	probing   chan struct{}
+	mtx          sync.Locker // must be wp's Locker.
+	state        State
+	idleBehavior IdleBehavior
+	instance     cloud.Instance
+	instType     arvados.InstanceType
+	vcpus        int64
+	memory       int64
+	appeared     time.Time
+	probed       time.Time
+	updated      time.Time
+	busy         time.Time
+	destroyed    time.Time
+	lastUUID     string
+	running      map[string]struct{} // remember to update state idle<->running when this changes
+	starting     map[string]struct{} // remember to update state idle<->running when this changes
+	probing      chan struct{}
 }
 
 // caller must have lock.
@@ -275,7 +289,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) {
-	if wkr.state == StateHold {
+	if wkr.idleBehavior == IdleBehaviorHold {
 		return
 	}
 	label, threshold := "", wkr.wp.timeoutProbe
@@ -295,19 +309,25 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
 
 // caller must have lock.
 func (wkr *worker) shutdownIfIdle() bool {
-	if wkr.state != StateIdle {
+	if wkr.idleBehavior == IdleBehaviorHold {
+		return false
+	}
+	if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
 		return false
 	}
 	age := time.Since(wkr.busy)
-	if age < wkr.wp.timeoutIdle {
+	if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
 		return false
 	}
-	wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+	wkr.logger.WithFields(logrus.Fields{
+		"Age":          age,
+		"IdleBehavior": wkr.idleBehavior,
+	}).Info("shutdown idle worker")
 	wkr.shutdown()
 	return true
 }
 
-// caller must have lock
+// caller must have lock.
 func (wkr *worker) shutdown() {
 	now := time.Now()
 	wkr.updated = now
@@ -322,3 +342,27 @@ func (wkr *worker) shutdown() {
 		}
 	}()
 }
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+	instance := wkr.instance
+	have := instance.Tags()
+	want := cloud.InstanceTags{
+		tagKeyInstanceType: wkr.instType.Name,
+		tagKeyIdleBehavior: string(wkr.idleBehavior),
+	}
+	go func() {
+		for k, v := range want {
+			if v == have[k] {
+				continue
+			}
+			err := instance.SetTags(want)
+			if err != nil {
+				wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+			}
+			break
+
+		}
+	}()
+}

commit efaa80ea0f18dfeaf344a5250fa37090451f0699
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Dec 20 16:09:32 2018 -0500

    14325: Add worker state diagram.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/readme_states.txt b/lib/dispatchcloud/readme_states.txt
new file mode 100644
index 000000000..b654bbfaf
--- /dev/null
+++ b/lib/dispatchcloud/readme_states.txt
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]

commit a7da936b6106bff17a7830124b1fd87ff01ad91e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Dec 18 15:50:17 2018 -0500

    14325: Propagate API env vars to crunch-run.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 2415094ac..9a6ccded3 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -127,7 +127,7 @@ func (disp *dispatcher) initialize() {
 	}
 	disp.instanceSet = &instanceSetProxy{instanceSet}
 	disp.reg = prometheus.NewRegistry()
-	disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+	disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
 	disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
 
 	if disp.Cluster.ManagementToken == "" {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index b5dba9870..4b2478e94 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -76,12 +76,18 @@ func (exr *Executor) Target() cloud.ExecutorTarget {
 
 // Execute runs cmd on the target. If an existing connection is not
 // usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
 	session, err := exr.newSession()
 	if err != nil {
 		return nil, nil, err
 	}
 	defer session.Close()
+	for k, v := range env {
+		err = session.Setenv(k, v)
+		if err != nil {
+			return nil, nil, err
+		}
+	}
 	var stdout, stderr bytes.Buffer
 	session.Stdin = stdin
 	session.Stdout = &stdout
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 8dabfecad..619e47383 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -42,7 +42,8 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 	for _, exitcode := range []int{0, 1, 2} {
 		srv := &testTarget{
 			SSHService: test.SSHService{
-				Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+				Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+					c.Check(env["TESTVAR"], check.Equals, "test value")
 					c.Check(cmd, check.Equals, command)
 					var wg sync.WaitGroup
 					wg.Add(2)
@@ -78,7 +79,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
 
 		done := make(chan bool)
 		go func() {
-			stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+			stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
 			if exitcode == 0 {
 				c.Check(err, check.IsNil)
 			} else {
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
index b1e4e03b1..ed5995f4c 100644
--- a/lib/dispatchcloud/test/ssh_service.go
+++ b/lib/dispatchcloud/test/ssh_service.go
@@ -32,7 +32,7 @@ func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
 
 // An SSHExecFunc handles an "exec" session on a multiplexed SSH
 // connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
 
 // An SSHService accepts SSH connections on an available TCP port and
 // passes clients' "exec" sessions to the provided SSHExecFunc.
@@ -146,22 +146,37 @@ func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
 			log.Printf("accept channel: %s", err)
 			return
 		}
-		var execReq struct {
-			Command string
-		}
+		didExec := false
+		sessionEnv := map[string]string{}
 		go func() {
 			for req := range reqs {
-				if req.Type == "exec" && execReq.Command == "" {
+				switch {
+				case didExec:
+					// Reject anything after exec
+					req.Reply(false, nil)
+				case req.Type == "exec":
+					var execReq struct {
+						Command string
+					}
 					req.Reply(true, nil)
 					ssh.Unmarshal(req.Payload, &execReq)
 					go func() {
 						var resp struct {
 							Status uint32
 						}
-						resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+						resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
 						ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
 						ch.Close()
 					}()
+					didExec = true
+				case req.Type == "env":
+					var envReq struct {
+						Name  string
+						Value string
+					}
+					req.Reply(true, nil)
+					ssh.Unmarshal(req.Payload, &envReq)
+					sessionEnv[envReq.Name] = envReq.Value
 				}
 			}
 		}()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index f738e2066..ab6d079b9 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -157,7 +157,7 @@ func (svm *StubVM) Instance() stubInstance {
 	}
 }
 
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
 	queue := svm.sis.driver.Queue
 	uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
 	if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -173,6 +173,12 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
 		return 1
 	}
 	if strings.HasPrefix(command, "crunch-run --detach ") {
+		for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+			if env[name] == "" {
+				fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+				return 1
+			}
+		}
 		svm.Lock()
 		if svm.running == nil {
 			svm.running = map[string]bool{}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index fc3301d86..25b57c6e5 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -36,7 +36,7 @@ type InstanceView struct {
 // An Executor executes shell commands on a remote host.
 type Executor interface {
 	// Run cmd on the current target.
-	Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+	Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
 	// Use the given target for subsequent operations. The new
 	// target is the same host as the previous target, but it
@@ -75,9 +75,10 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
 	wp := &Pool{
 		logger:             logger,
+		arvClient:          arvClient,
 		instanceSet:        instanceSet,
 		newExecutor:        newExecutor,
 		bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
@@ -107,6 +108,7 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
 	// configuration
 	logger             logrus.FieldLogger
+	arvClient          *arvados.Client
 	instanceSet        cloud.InstanceSet
 	newExecutor        func(cloud.Instance) Executor
 	bootProbeCommand   string
@@ -411,7 +413,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
 		"Instance":      wkr.instance,
 	})
 	logger.Debug("killing process")
-	stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
 	if err != nil {
 		logger.WithFields(logrus.Fields{
 			"stderr": string(stderr),
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 7551caff9..5c9fbb38c 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -128,7 +128,7 @@ type stubExecutor struct{}
 
 func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
 
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
 	return nil, nil, nil
 }
 
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index db6bc185b..1adc7c29f 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -86,7 +86,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
 	wkr.starting[ctr.UUID] = struct{}{}
 	wkr.state = StateRunning
 	go func() {
-		stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+		env := map[string]string{
+			"ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+			"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+		}
+		stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
 		wkr.mtx.Lock()
 		defer wkr.mtx.Unlock()
 		now := time.Now()
@@ -234,7 +238,7 @@ func (wkr *worker) probeAndUpdate() {
 
 func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
 	cmd := "crunch-run --list"
-	stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
 	if err != nil {
 		wkr.logger.WithFields(logrus.Fields{
 			"Command": cmd,
@@ -255,7 +259,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 	if cmd == "" {
 		cmd = "true"
 	}
-	stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+	stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
 	logger := wkr.logger.WithFields(logrus.Fields{
 		"Command": cmd,
 		"stdout":  string(stdout),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list