[ARVADOS] updated: 1.3.0-176-gdf9d7f8d6
Git user
git at public.curoverse.com
Wed Jan 16 17:13:20 EST 2019
Summary of changes:
apps/workbench/app/helpers/application_helper.rb | 4 +
.../app/views/users/_virtual_machines.html.erb | 4 +-
apps/workbench/package-build.version | 1 +
apps/workbench/test/the.patch | 3 +
build/run-tests.sh | 2 +
doc/README.textile | 2 +-
doc/Rakefile | 2 +-
doc/_config.yml | 1 +
.../ssh-access-unix.html.textile.liquid | 19 +-
lib/cloud/azure.go | 692 +++++++++++++++++++++
lib/cloud/azure_test.go | 348 +++++++++++
.../worker => cloud}/gocheck_test.go | 2 +-
lib/cloud/interfaces.go | 11 +-
lib/controller/federation_test.go | 2 +-
lib/controller/server_test.go | 2 +-
lib/dispatchcloud/container/queue.go | 2 +-
lib/dispatchcloud/dispatcher.go | 52 +-
lib/dispatchcloud/dispatcher_test.go | 2 +-
lib/dispatchcloud/driver.go | 9 +-
lib/dispatchcloud/readme_states.txt | 4 +
lib/dispatchcloud/scheduler/run_queue.go | 2 +-
lib/dispatchcloud/scheduler/run_queue_test.go | 2 +-
lib/dispatchcloud/scheduler/scheduler.go | 2 +-
lib/dispatchcloud/scheduler/sync.go | 2 +-
lib/dispatchcloud/test/stub_driver.go | 6 +-
lib/dispatchcloud/worker/pool.go | 88 ++-
lib/dispatchcloud/worker/pool_test.go | 47 +-
lib/dispatchcloud/worker/worker.go | 100 ++-
lib/service/cmd.go | 2 +-
sdk/go/ctxlog/log.go | 2 +-
sdk/go/dispatch/dispatch.go | 2 +-
sdk/go/httpserver/logger.go | 2 +-
sdk/go/httpserver/logger_test.go | 2 +-
sdk/go/httpserver/metrics.go | 2 +-
.../crunch-dispatch-local/crunch-dispatch-local.go | 2 +-
.../crunch-dispatch-local_test.go | 2 +-
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 2 +-
.../crunch-dispatch-slurm_test.go | 2 +-
services/crunch-dispatch-slurm/squeue_test.go | 2 +-
services/health/main.go | 2 +-
services/keep-balance/balance.go | 2 +-
services/keep-balance/balance_run_test.go | 2 +-
services/keep-balance/integration_test.go | 2 +-
services/keep-balance/main.go | 2 +-
services/keep-balance/server.go | 2 +-
services/keep-web/handler.go | 2 +-
services/keep-web/main.go | 2 +-
services/keepproxy/keepproxy.go | 2 +-
services/keepstore/config.go | 2 +-
services/keepstore/config_test.go | 2 +-
services/ws/router.go | 2 +-
services/ws/session_v0.go | 2 +-
tools/arvbox/bin/arvbox | 18 +-
tools/arvbox/lib/arvbox/docker/Dockerfile.base | 4 +
tools/arvbox/lib/arvbox/docker/Dockerfile.demo | 6 +-
tools/arvbox/lib/arvbox/docker/api-setup.sh | 11 +-
tools/arvbox/lib/arvbox/docker/common.sh | 10 +-
.../service/{api => certificate}/log/main/.gitstub | 0
.../docker/service/{api => certificate}/log/run | 0
.../lib/arvbox/docker/service/certificate/run | 81 +++
.../arvbox/lib/arvbox/docker/service/composer/run | 2 +-
.../lib/arvbox/docker/service/gitolite/run-service | 2 +-
.../lib/arvbox/docker/service/nginx/run-service | 69 +-
.../lib/arvbox/docker/service/ready/run-service | 3 +-
tools/arvbox/lib/arvbox/docker/service/ssh/run | 2 +-
.../lib/arvbox/docker/service/sso/run-service | 8 +-
.../arvbox/docker/service/websockets/run-service | 4 +-
.../arvbox/lib/arvbox/docker/service/workbench/run | 2 +
.../arvbox/docker/service/workbench/run-service | 10 +-
.../service/{api => workbench2}/log/main/.gitstub | 0
.../docker/service/{api => workbench2}/log/run | 0
.../docker/service/{composer => workbench2}/run | 2 +-
.../arvbox/docker/service/workbench2/run-service | 33 +
vendor/vendor.json | 106 +++-
74 files changed, 1651 insertions(+), 183 deletions(-)
create mode 100644 apps/workbench/package-build.version
create mode 100644 apps/workbench/test/the.patch
create mode 100644 lib/cloud/azure.go
create mode 100644 lib/cloud/azure_test.go
copy lib/{dispatchcloud/worker => cloud}/gocheck_test.go (93%)
copy tools/arvbox/lib/arvbox/docker/service/{api => certificate}/log/main/.gitstub (100%)
copy tools/arvbox/lib/arvbox/docker/service/{api => certificate}/log/run (100%)
create mode 100755 tools/arvbox/lib/arvbox/docker/service/certificate/run
copy tools/arvbox/lib/arvbox/docker/service/{api => workbench2}/log/main/.gitstub (100%)
copy tools/arvbox/lib/arvbox/docker/service/{api => workbench2}/log/run (100%)
copy tools/arvbox/lib/arvbox/docker/service/{composer => workbench2}/run (69%)
create mode 100755 tools/arvbox/lib/arvbox/docker/service/workbench2/run-service
discards 54643dadce1988a815b85a52349cb8a33cbdbf65 (commit)
discards de12a680c6cd6395a700aca5d03604465afa3df5 (commit)
via df9d7f8d64a2a51dea5552641f44afd3e43c636b (commit)
via 2a018b15662ae5f0b30d1d11eb2d0ffa685964e0 (commit)
via efaa80ea0f18dfeaf344a5250fa37090451f0699 (commit)
via a7da936b6106bff17a7830124b1fd87ff01ad91e (commit)
via e62dc0ffea0983f6a17ef16b368a0dceb62b98ea (commit)
via a27348943c6e35eee983d6a333eb6977394c3f13 (commit)
via 6756ee790a6c685b96bc1ec80b67e7d6c94e9846 (commit)
via 3dca6408c9886ae3591486ed43410096b0a28488 (commit)
via f398e9d75d32fbfa6804041353997bf74195e489 (commit)
via 443a0b96316ed46600dc5035193adae6ac4d1f74 (commit)
via 855a0afbc604487ddaedaed0cc1a4ad6da34b602 (commit)
via 1b2a4c98249e3a46242f7d10c00a70feeeb2a843 (commit)
via 7f5b859e82a65ac27c403ff2416c471f9c770ac1 (commit)
via dc19260dea6ee0b7186a50245ad31c2cbe5e6b2a (commit)
via 5dd601ec0484d193d5d71975dc1d6b85ea901f88 (commit)
via 351505f3a2b2b4460a4214097d63f53ab00ec42f (commit)
via 925644cc7c7e6761566a4059ac9ed9987653931e (commit)
via e3dc6df1694586eca13f2f20eb064b8b16d115e5 (commit)
via 40e5a87a683bf463bac2f6a41beb8b2ee96bab27 (commit)
via 146257d644f6149d5504aa0652286ddef6a2049e (commit)
via ce01f835b561911f87e0d264ac72dbabc552281f (commit)
via e7dfae5f99f9291789be40a6505738307137360f (commit)
via 137f46c8040d22e70e4b17a495e4171c59a8033f (commit)
via ca801f2402482c9192f3c75461134c10834f2fcc (commit)
via 95e21426a5a27c24f7aa561df53d88aee6e0360f (commit)
via abecf37b0d4028782f44d04540402a4857352735 (commit)
via c548afa3245de240910c0b8ca6cf2a9e8800555a (commit)
via 3f649213bb9165b33a0ece597a4f852093cb409e (commit)
via ee8dedc9286a12d95c5a4d9eefe4daf4a40d2528 (commit)
via acc46e4fdbc9098ac2b4c522b17473b58b087013 (commit)
via b2b98cbc9c213d2831405b6eb37e0f52fa2e9e17 (commit)
via 36c705beb497a5ef8348e5f977ee6ad6674c7d11 (commit)
via 51c1daf863f3e1920f758f73b4e5d70ff2c706d6 (commit)
via e6bc5eeacc812ac41aadeec6fd7a907b5aab0237 (commit)
via 1b2d73e101e1fb27de201b33142c081e80bb7fc6 (commit)
via 76e3ea1d0d2f865272000a5e12f64266a90b3794 (commit)
via 9c013e7742d72254ae747948f96237698198aa85 (commit)
via 28aa8adcc90108922b1274f5568ce3c5745cca78 (commit)
via 3259e9de3d048f7a0b27e67098811640e9da8230 (commit)
via 2a6cb99cf7a21a273efe8dc793929b74149871f6 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (54643dadce1988a815b85a52349cb8a33cbdbf65)
\
N -- N -- N (df9d7f8d64a2a51dea5552641f44afd3e43c636b)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit df9d7f8d64a2a51dea5552641f44afd3e43c636b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Wed Jan 16 17:09:40 2019 -0500
14325: Add management API endpoints for drain/hold/run behavior.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 9a6ccded3..788463330 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -21,6 +21,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@ const (
type pool interface {
scheduler.WorkerPool
Instances() []worker.InstanceView
+ SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
Stop()
}
@@ -135,14 +137,17 @@ func (disp *dispatcher) initialize() {
http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
})
} else {
- mux := http.NewServeMux()
- mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
- mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+ mux := httprouter.New()
+ mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+ mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
- mux.Handle("/metrics", metricsH)
- mux.Handle("/metrics.json", metricsH)
+ mux.Handler("GET", "/metrics", metricsH)
+ mux.Handler("GET", "/metrics.json", metricsH)
disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
}
}
@@ -169,10 +174,6 @@ func (disp *dispatcher) run() {
// Management API: all active and queued containers.
func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
var resp struct {
Items []container.QueueEnt
}
@@ -185,13 +186,34 @@ func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
// Management API: all active instances (cloud VMs).
func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
var resp struct {
Items []worker.InstanceView
}
resp.Items = disp.pool.Instances()
json.NewEncoder(w).Encode(resp)
}
+
+// Management API: set idle behavior to "hold" for specified instance.
+func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
+}
+
+// Management API: set idle behavior to "drain" for specified instance.
+func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
+}
+
+// Management API: set idle behavior to "run" for specified instance.
+func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
+}
+
+func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
+ params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
+ id := cloud.InstanceID(params.ByName("instance_id"))
+ err := disp.pool.SetIdleBehavior(id, want)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index ec5fa21e2..1b24804b0 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -349,6 +349,12 @@
"revisionTime": "2015-08-17T12:26:01Z"
},
{
+ "checksumSHA1": "X7g98YfLr+zM7aN76AZvAfpZyfk=",
+ "path": "github.com/julienschmidt/httprouter",
+ "revision": "adbc77eec0d91467376ca515bc3a14b8434d0f18",
+ "revisionTime": "2018-04-11T15:45:01Z"
+ },
+ {
"checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
"path": "github.com/kevinburke/ssh_config",
"revision": "802051befeb51da415c46972b5caf36e7c33c53d",
commit 2a018b15662ae5f0b30d1d11eb2d0ffa685964e0
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Wed Jan 16 17:09:34 2019 -0500
14325: Add IdleBehavior to worker pool.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 25b57c6e5..e69351317 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -5,6 +5,7 @@
package worker
import (
+ "errors"
"io"
"sort"
"strings"
@@ -19,18 +20,19 @@ import (
const (
tagKeyInstanceType = "InstanceType"
- tagKeyHold = "Hold"
+ tagKeyIdleBehavior = "IdleBehavior"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
- Instance string
+ Instance cloud.InstanceID
Price float64
ArvadosInstanceType string
ProviderInstanceType string
LastContainerUUID string
LastBusy time.Time
WorkerState string
+ IdleBehavior IdleBehavior
}
// An Executor executes shell commands on a remote host.
@@ -173,7 +175,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
}
// Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type. Workers in
+// hold/drain mode are not included.
func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
@@ -184,7 +187,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) || wkr.idleBehavior != IdleBehaviorRun {
continue
}
it := wkr.instType
@@ -222,7 +225,10 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
if time.Now().Before(wp.atQuotaUntil) {
return wp.atQuotaErr
}
- tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ }
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
@@ -259,6 +265,21 @@ func (wp *Pool) AtQuota() bool {
return time.Now().Before(wp.atQuotaUntil)
}
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("requested instance does not exist")
+ }
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+ return nil
+}
+
// Add or update worker attached to the given instance. Use
// initialState if a new worker is created.
//
@@ -274,32 +295,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
if initialState == StateBooting && wkr.state == StateUnknown {
wkr.state = StateBooting
}
+ wkr.saveTags()
return wkr, false
}
- if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
- initialState = StateHold
+
+ // If an instance has a valid IdleBehavior tag when it first
+ // appears, initialize the new worker accordingly (this is how
+ // we restore IdleBehavior that was set by a prior dispatch
+ // process); otherwise, default to "run". After this,
+ // wkr.idleBehavior is the source of truth, and will only be
+ // changed via SetIdleBehavior().
+ idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ if !validIdleBehavior[idleBehavior] {
+ idleBehavior = IdleBehaviorRun
}
+
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
"Instance": inst,
})
- logger.WithField("State", initialState).Infof("instance appeared in cloud")
+ logger.WithFields(logrus.Fields{
+ "State": initialState,
+ "IdleBehavior": idleBehavior,
+ }).Infof("instance appeared in cloud")
now := time.Now()
wkr := &worker{
- mtx: &wp.mtx,
- wp: wp,
- logger: logger,
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- appeared: now,
- probed: now,
- busy: now,
- updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
+ mtx: &wp.mtx,
+ wp: wp,
+ logger: logger,
+ executor: wp.newExecutor(inst),
+ state: initialState,
+ idleBehavior: idleBehavior,
+ instance: inst,
+ instType: it,
+ appeared: now,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]struct{}),
+ starting: make(map[string]struct{}),
+ probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
@@ -322,7 +357,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
// TODO: shutdown the worker with the longest idle
// time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state == tryState && wkr.instType == it {
+ if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
logger.WithField("Instance", wkr.instance).Info("shutting down")
wkr.shutdown()
return true
@@ -590,18 +625,19 @@ func (wp *Pool) Instances() []InstanceView {
wp.mtx.Lock()
for _, w := range wp.workers {
r = append(r, InstanceView{
- Instance: w.instance.String(),
+ Instance: w.instance.ID(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
LastBusy: w.busy,
WorkerState: w.state.String(),
+ IdleBehavior: w.idleBehavior,
})
}
wp.mtx.Unlock()
sort.Slice(r, func(i, j int) bool {
- return strings.Compare(r[i].Instance, r[j].Instance) < 0
+ return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
})
return r
}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 5c9fbb38c..7e84613eb 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -47,6 +47,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+ type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
logger: logrus.StandardLogger(),
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
@@ -54,6 +55,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
type2.Name: type2,
+ type3.Name: type3,
},
}
notify := pool.Subscribe()
@@ -63,23 +65,39 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
c.Check(pool.Unallocated()[type1], check.Equals, 0)
c.Check(pool.Unallocated()[type2], check.Equals, 0)
+ c.Check(pool.Unallocated()[type3], check.Equals, 0)
pool.Create(type2)
pool.Create(type1)
pool.Create(type2)
+ pool.Create(type3)
c.Check(pool.Unallocated()[type1], check.Equals, 1)
c.Check(pool.Unallocated()[type2], check.Equals, 2)
+ c.Check(pool.Unallocated()[type3], check.Equals, 1)
// Unblock the pending Create calls.
- go lameInstanceSet.Release(3)
+ go lameInstanceSet.Release(4)
// Wait for each instance to either return from its Create
// call, or show up in a poll.
suite.wait(c, pool, notify, func() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()
- return len(pool.workers) == 3
+ return len(pool.workers) == 4
})
+ // Place type3 node on admin-hold
+ for _, instv := range pool.Instances() {
+ if instv.ArvadosInstanceType == type3.Name {
+ pool.SetIdleBehavior(instv.Instance, IdleBehaviorHold)
+ break
+ }
+ }
+ c.Check(pool.Shutdown(type3), check.Equals, false)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 0
+ })
+
+ // Shutdown both type2 nodes
c.Check(pool.Shutdown(type2), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
@@ -99,16 +117,35 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
}
break
}
+
+ // Shutdown type1 node
c.Check(pool.Shutdown(type1), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+ return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
})
select {
case <-notify2:
case <-time.After(time.Second):
c.Error("notify did not receive")
}
- go lameInstanceSet.Release(3) // unblock Destroy calls
+
+ // Place type3 node on admin-drain so it shuts down right away
+ for _, instv := range pool.Instances() {
+ if instv.ArvadosInstanceType == type3.Name {
+ pool.SetIdleBehavior(instv.Instance, IdleBehaviorDrain)
+ break
+ }
+ }
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 0
+ })
+
+ go lameInstanceSet.Release(4) // unblock Destroy calls
+
+ suite.wait(c, pool, notify, func() bool {
+ pool.getInstancesAndSync()
+ return len(pool.Instances()) == 0
+ })
}
func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 1adc7c29f..1c72cc4b7 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -15,6 +15,11 @@ import (
"github.com/sirupsen/logrus"
)
+const (
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
+)
+
// State indicates whether a worker is available to do work, and (if
// not) whether/when it is expected to become ready.
type State int
@@ -25,12 +30,6 @@ const (
StateIdle // instance booted, no containers are running
StateRunning // instance is running one or more containers
StateShutdown // worker has stopped monitoring the instance
- StateHold // running, but not available to run new containers
-)
-
-const (
- // TODO: configurable
- maxPingFailTime = 10 * time.Minute
)
var stateString = map[State]string{
@@ -39,7 +38,6 @@ var stateString = map[State]string{
StateIdle: "idle",
StateRunning: "running",
StateShutdown: "shutdown",
- StateHold: "hold",
}
// String implements fmt.Stringer.
@@ -53,26 +51,42 @@ func (s State) MarshalText() ([]byte, error) {
return []byte(stateString[s]), nil
}
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+ IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
+ IdleBehaviorHold = "hold" // don't shutdown or run more containers
+ IdleBehaviorDrain = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+ IdleBehaviorRun: true,
+ IdleBehaviorHold: true,
+ IdleBehaviorDrain: true,
+}
+
type worker struct {
logger logrus.FieldLogger
executor Executor
wp *Pool
- mtx sync.Locker // must be wp's Locker.
- state State
- instance cloud.Instance
- instType arvados.InstanceType
- vcpus int64
- memory int64
- appeared time.Time
- probed time.Time
- updated time.Time
- busy time.Time
- destroyed time.Time
- lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
- probing chan struct{}
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ idleBehavior IdleBehavior
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ appeared time.Time
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ destroyed time.Time
+ lastUUID string
+ running map[string]struct{} // remember to update state idle<->running when this changes
+ starting map[string]struct{} // remember to update state idle<->running when this changes
+ probing chan struct{}
}
// caller must have lock.
@@ -275,7 +289,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
// caller must have lock.
func (wkr *worker) shutdownIfBroken(dur time.Duration) {
- if wkr.state == StateHold {
+ if wkr.idleBehavior == IdleBehaviorHold {
return
}
label, threshold := "", wkr.wp.timeoutProbe
@@ -295,19 +309,25 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
// caller must have lock.
func (wkr *worker) shutdownIfIdle() bool {
- if wkr.state != StateIdle {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ return false
+ }
+ if !(wkr.state == StateIdle || (wkr.state == StateBooting && wkr.idleBehavior == IdleBehaviorDrain)) {
return false
}
age := time.Since(wkr.busy)
- if age < wkr.wp.timeoutIdle {
+ if wkr.idleBehavior != IdleBehaviorDrain && age < wkr.wp.timeoutIdle {
return false
}
- wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+ wkr.logger.WithFields(logrus.Fields{
+ "Age": age,
+ "IdleBehavior": wkr.idleBehavior,
+ }).Info("shutdown idle worker")
wkr.shutdown()
return true
}
-// caller must have lock
+// caller must have lock.
func (wkr *worker) shutdown() {
now := time.Now()
wkr.updated = now
@@ -322,3 +342,27 @@ func (wkr *worker) shutdown() {
}
}()
}
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+ instance := wkr.instance
+ have := instance.Tags()
+ want := cloud.InstanceTags{
+ tagKeyInstanceType: wkr.instType.Name,
+ tagKeyIdleBehavior: string(wkr.idleBehavior),
+ }
+ go func() {
+ for k, v := range want {
+ if v == have[k] {
+ continue
+ }
+ err := instance.SetTags(want)
+ if err != nil {
+ wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ }
+ break
+
+ }
+ }()
+}
commit efaa80ea0f18dfeaf344a5250fa37090451f0699
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Dec 20 16:09:32 2018 -0500
14325: Add worker state diagram.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/readme_states.txt b/lib/dispatchcloud/readme_states.txt
new file mode 100644
index 000000000..b654bbfaf
--- /dev/null
+++ b/lib/dispatchcloud/readme_states.txt
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]
commit a7da936b6106bff17a7830124b1fd87ff01ad91e
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Dec 18 15:50:17 2018 -0500
14325: Propagate API env vars to crunch-run.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 2415094ac..9a6ccded3 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -127,7 +127,7 @@ func (disp *dispatcher) initialize() {
}
disp.instanceSet = &instanceSetProxy{instanceSet}
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index b5dba9870..4b2478e94 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -76,12 +76,18 @@ func (exr *Executor) Target() cloud.ExecutorTarget {
// Execute runs cmd on the target. If an existing connection is not
// usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
session, err := exr.newSession()
if err != nil {
return nil, nil, err
}
defer session.Close()
+ for k, v := range env {
+ err = session.Setenv(k, v)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
var stdout, stderr bytes.Buffer
session.Stdin = stdin
session.Stdout = &stdout
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
index 8dabfecad..619e47383 100644
--- a/lib/dispatchcloud/ssh_executor/executor_test.go
+++ b/lib/dispatchcloud/ssh_executor/executor_test.go
@@ -42,7 +42,8 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
for _, exitcode := range []int{0, 1, 2} {
srv := &testTarget{
SSHService: test.SSHService{
- Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ c.Check(env["TESTVAR"], check.Equals, "test value")
c.Check(cmd, check.Equals, command)
var wg sync.WaitGroup
wg.Add(2)
@@ -78,7 +79,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
done := make(chan bool)
go func() {
- stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+ stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
if exitcode == 0 {
c.Check(err, check.IsNil)
} else {
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
index b1e4e03b1..ed5995f4c 100644
--- a/lib/dispatchcloud/test/ssh_service.go
+++ b/lib/dispatchcloud/test/ssh_service.go
@@ -32,7 +32,7 @@ func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
// An SSHExecFunc handles an "exec" session on a multiplexed SSH
// connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
// An SSHService accepts SSH connections on an available TCP port and
// passes clients' "exec" sessions to the provided SSHExecFunc.
@@ -146,22 +146,37 @@ func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
log.Printf("accept channel: %s", err)
return
}
- var execReq struct {
- Command string
- }
+ didExec := false
+ sessionEnv := map[string]string{}
go func() {
for req := range reqs {
- if req.Type == "exec" && execReq.Command == "" {
+ switch {
+ case didExec:
+ // Reject anything after exec
+ req.Reply(false, nil)
+ case req.Type == "exec":
+ var execReq struct {
+ Command string
+ }
req.Reply(true, nil)
ssh.Unmarshal(req.Payload, &execReq)
go func() {
var resp struct {
Status uint32
}
- resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+ resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
ch.Close()
}()
+ didExec = true
+ case req.Type == "env":
+ var envReq struct {
+ Name string
+ Value string
+ }
+ req.Reply(true, nil)
+ ssh.Unmarshal(req.Payload, &envReq)
+ sessionEnv[envReq.Name] = envReq.Value
}
}
}()
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index f738e2066..ab6d079b9 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -157,7 +157,7 @@ func (svm *StubVM) Instance() stubInstance {
}
}
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -173,6 +173,12 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
return 1
}
if strings.HasPrefix(command, "crunch-run --detach ") {
+ for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+ if env[name] == "" {
+ fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ return 1
+ }
+ }
svm.Lock()
if svm.running == nil {
svm.running = map[string]bool{}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index fc3301d86..25b57c6e5 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -36,7 +36,7 @@ type InstanceView struct {
// An Executor executes shell commands on a remote host.
type Executor interface {
// Run cmd on the current target.
- Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+ Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
// Use the given target for subsequent operations. The new
// target is the same host as the previous target, but it
@@ -75,9 +75,10 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
+ arvClient: arvClient,
instanceSet: instanceSet,
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
@@ -107,6 +108,7 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
type Pool struct {
// configuration
logger logrus.FieldLogger
+ arvClient *arvados.Client
instanceSet cloud.InstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
@@ -411,7 +413,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
"Instance": wkr.instance,
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 7551caff9..5c9fbb38c 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -128,7 +128,7 @@ type stubExecutor struct{}
func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (*stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
return nil, nil, nil
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index db6bc185b..1adc7c29f 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -86,7 +86,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
go func() {
- stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+ env := map[string]string{
+ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+ }
+ stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
@@ -234,7 +238,7 @@ func (wkr *worker) probeAndUpdate() {
func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
cmd := "crunch-run --list"
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
@@ -255,7 +259,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
if cmd == "" {
cmd = "true"
}
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
logger := wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
"stdout": string(stdout),
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list