[ARVADOS] created: 1.3.0-539-gfe8ba4d73
Git user
git at public.curoverse.com
Tue Mar 19 21:02:43 UTC 2019
at fe8ba4d738d1680f40b28ea01d08efea4a16574c (commit)
commit fe8ba4d738d1680f40b28ea01d08efea4a16574c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Mar 19 17:02:05 2019 -0400
14807: Report queue metrics.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
index 4e807a12a..af17aaf39 100644
--- a/lib/dispatchcloud/container/queue.go
+++ b/lib/dispatchcloud/container/queue.go
@@ -53,7 +53,6 @@ func (c *QueueEnt) String() string {
// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
@@ -79,14 +78,17 @@ type Queue struct {
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
@@ -487,3 +489,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}
commit 2986810337995e8cd4876eb20096da915cea9edf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Mar 19 13:40:10 2019 -0400
14807: Add "kill instance" management API.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index adf1028b3..52ddbf56b 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -39,6 +39,7 @@ type pool interface {
scheduler.WorkerPool
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
@@ -147,6 +148,7 @@ func (disp *dispatcher) initialize() {
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
@@ -212,6 +214,20 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 81a658535..f3ac4b7c1 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -691,6 +691,21 @@ func (wp *Pool) Instances() []InstanceView {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ logger.WithFields(logrus.Fields{
+ "Instance": wkr.instance,
+ "Reason": reason,
+ }).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
commit fad0a6c5a1c0675f9c52d3d18ce1573847d66c0f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Mar 18 17:27:40 2019 -0400
14807: Configurable rate limit for cloud provider API calls.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 44d5a0ae7..d1ee4b135 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -49,12 +49,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
},
Dispatch: arvados.Dispatch{
PrivateKey: string(dispatchprivraw),
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
index 0343f85b9..eb1e48737 100644
--- a/lib/dispatchcloud/driver.go
+++ b/lib/dispatchcloud/driver.go
@@ -6,12 +6,14 @@ package dispatchcloud
import (
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/lib/cloud/ec2"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
var drivers = map[string]cloud.Driver{
@@ -24,5 +26,33 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
if !ok {
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
}
- return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = &rateLimitedInstanceSet{
+ InstanceSet: is,
+ ticker: time.NewTicker(time.Second / time.Duration(maxops)),
+ }
+ }
+ return is, err
+}
+
+type rateLimitedInstanceSet struct {
+ cloud.InstanceSet
+ ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ <-is.ticker.C
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+ cloud.Instance
+ ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+ <-inst.ticker.C
+ return inst.Instance.Destroy()
}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 73addb739..7c87ff029 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -154,6 +154,9 @@ type CloudVMs struct {
// Time after shutdown to retry shutdown
TimeoutShutdown Duration
+ // Maximum create/destroy-instance operations per second
+ MaxCloudOpsPerSecond int
+
ImageID string
Driver string
commit 7858b49698e1d2c1d619b19b6db2978f2ab067be
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Mar 18 16:32:58 2019 -0400
14807: Drain instances that crunch-run reports broken.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 7268f106a..44d5a0ae7 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -157,6 +157,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
stubvm.CrunchRunMissing = true
default:
stubvm.CrunchRunCrashRate = 0.1
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)+200) * time.Millisecond)
}
}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index a4521eab7..02346a970 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -181,6 +181,7 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
type StubVM struct {
Boot time.Time
Broken time.Time
+ ReportBroken time.Time
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
@@ -314,6 +315,9 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
for uuid := range svm.running {
fmt.Fprintf(stdout, "%s\n", uuid)
}
+ if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+ fmt.Fprintln(stdout, "broken")
+ }
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 41117c1d4..49c5057b3 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -5,7 +5,6 @@
package worker
import (
- "bytes"
"fmt"
"strings"
"sync"
@@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok = wkr.probeRunning()
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
@@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() {
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
@@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false
+ return
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
- return strings.Split(string(stdout), "\n"), true
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go
index 933692bdc..852ccb6ec 100644
--- a/services/crunch-run/background.go
+++ b/services/crunch-run/background.go
@@ -20,6 +20,7 @@ var (
lockdir = "/var/lock"
lockprefix = "crunch-run-"
locksuffix = ".lock"
+ brokenfile = "crunch-run-broken"
)
// procinfo is saved in each process's lockfile.
@@ -146,7 +147,10 @@ func ListProcesses(stdout, stderr io.Writer) int {
if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
- if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+ if name := info.Name(); name == brokenfile {
+ fmt.Fprintln(stdout, "broken")
+ return nil
+ } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
return nil
}
if info.Size() == 0 {
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 0576337aa..3925b0b7b 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
func (runner *ContainerRunner) runBrokenNodeHook() {
if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ path := filepath.Join(lockdir, brokenfile)
+ runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+ f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+ return
+ }
+ f.Close()
} else {
runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
// run killme script
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 17e5e1458..60729c019 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -2049,7 +2049,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
c.Check(api.CalledWith("container.state", "Queued"), NotNil)
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
}
func (s *TestSuite) TestFullBrokenDocker3(c *C) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list