[ARVADOS] created: 2.1.0-975-g7d1d7e387
Git user
git at public.arvados.org
Thu Jul 1 00:05:29 UTC 2021
at 7d1d7e387dea959ef4fd36e4ab73679c952c9796 (commit)
commit 7d1d7e387dea959ef4fd36e4ab73679c952c9796
Author: Tom Clegg <tom at curii.com>
Date: Wed Jun 30 20:05:13 2021 -0400
17756: Add lsf dispatcher. (work in progress)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644
index 000000000..f9e73a2c7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-lsf.service
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index d0aa9da94..4b94a7813 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -15,6 +15,7 @@ import (
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/install"
+ "git.arvados.org/arvados.git/lib/lsf"
"git.arvados.org/arvados.git/lib/recovercollection"
"git.arvados.org/arvados.git/services/ws"
)
@@ -33,6 +34,7 @@ var (
"controller": controller.Command,
"crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
+ "dispatch-lsf": lsf.DispatchCommand,
"install": install.Command,
"init": install.InitCommand,
"recover-collection": recovercollection.Command,
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644
index 000000000..01f892bdf
--- /dev/null
+++ b/lib/lsf/dispatch.go
@@ -0,0 +1,272 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "math"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/dispatch"
+ "git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/julienschmidt/httprouter"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+ ac, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+ d := &dispatcher{
+ Cluster: cluster,
+ Context: ctx,
+ ArvClient: ac,
+ AuthToken: token,
+ Registry: reg,
+ }
+ go d.Start()
+ return d
+}
+
+type dispatcher struct {
+ Cluster *arvados.Cluster
+ Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
+ Registry *prometheus.Registry
+
+ logger logrus.FieldLogger
+ lsfapi lsfapi
+ arvDispatcher *dispatch.Dispatcher
+ httpHandler http.Handler
+
+ initOnce sync.Once
+ stop chan struct{}
+ stopped chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+ disp.initOnce.Do(func() {
+ disp.init()
+ go disp.run()
+ })
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ disp.Start()
+ disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+ disp.Start()
+ return disp.pool.CheckHealth()
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+ return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+ disp.Start()
+ select {
+ case disp.stop <- struct{}{}:
+ default:
+ }
+ <-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+ disp.logger = ctxlog.FromContext(disp.Context)
+ disp.lsfapi = lsfcli{logger: disp.logger}
+ disp.ArvClient.AuthToken = disp.AuthToken
+ disp.stop = make(chan struct{}, 1)
+ disp.stopped = make(chan struct{})
+ disp.arvDispatcher = &dispatch.Dispatcher{
+ Arv: disp.ArvClient,
+ Logger: disp.logger,
+ BatchSize: disp.Cluster.API.MaxItemsPerResponse,
+ RunContainer: disp.runContainer,
+ PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+ MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+ }
+
+ if disp.Cluster.ManagementToken == "" {
+ disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+ })
+ } else {
+ mux := httprouter.New()
+ metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+ ErrorLog: disp.logger,
+ })
+ mux.Handler("GET", "/metrics", metricsH)
+ mux.Handler("GET", "/metrics.json", metricsH)
+ mux.Handler("GET", "/_health/:check", &health.Handler{
+ Token: disp.Cluster.ManagementToken,
+ Prefix: "/_health/",
+ Routes: health.Routes{"ping": disp.CheckHealth},
+ })
+ disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+ }
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ ctx, cancel := context.WithCancel(disp.Context)
+ defer cancel()
+
+ if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
+ disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+ cmd := []string{disp.cluster.Containers.CrunchRunCommand}
+ cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
+ if err := disp.submit(ctr, cmd); err != nil {
+ var text string
+ switch err := err.(type) {
+ case dispatchcloud.ConstraintsNotSatisfiableError:
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+ )
+ }
+ }
+ text = logBuf.String()
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ default:
+ text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
+ }
+ disp.logger.Print(text)
+
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": ctr.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ disp.Arv.Create("logs", lr, nil)
+
+ disp.Unlock(ctr.UUID)
+ return
+ }
+ }
+
+ disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+ defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+ // If the container disappears from the lsf queue, there is
+ // no point in waiting for further dispatch updates: just
+ // clean up and return.
+ go func(uuid string) {
+ for ctx.Err() == nil && disp.lsfqueue.HasUUID(uuid) {
+ }
+ cancel()
+ }(ctr.UUID)
+
+ for {
+ select {
+ case <-ctx.Done():
+ // Disappeared from squeue
+ if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+ disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+ }
+ switch ctr.State {
+ case dispatch.Running:
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ case dispatch.Locked:
+ disp.Unlock(ctr.UUID)
+ }
+ return
+ case updated, ok := <-status:
+ if !ok {
+ disp.logger.Printf("container %s is done: cancel slurm job", ctr.UUID)
+ disp.scancel(ctr)
+ } else if updated.Priority == 0 {
+ disp.logger.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+ disp.scancel(ctr)
+ } else {
+ p := int64(updated.Priority)
+ if p <= 1000 {
+ // API is providing
+ // user-assigned priority. If
+ // ctrs have equal priority,
+ // run the older one first.
+ p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+ }
+ disp.sqCheck.SetPriority(ctr.UUID, p)
+ }
+ }
+ }
+}
+
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+ // append() here avoids modifying crunchRunCommand's
+ // underlying array, which is shared with other goroutines.
+ crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := execScript(crArgs)
+
+ bsubArgs, err := disp.bsubArgs(container)
+ if err != nil {
+ return err
+ }
+ disp.logger.Infof("running bsub %+q", bsubArgs)
+ return disp.lsfapi.Bsub(crScript, bsubArgs)
+}
+
+func (disp *Dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+ var args []string
+ args = append(args, disp.cluster.Containers.SLURM.SbatchArgumentsList...)
+ args = append(args, "-J", container.UUID)
+ args = append(args, disp.bsubConstraintArgs(container)...)
+ return args, nil
+}
+
+func (disp *Dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+ // TODO: propagate container.SchedulingParameters.Partitions
+ tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+ vcpus := container.RuntimeConstraints.VCPUs
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+ container.RuntimeConstraints.KeepCacheRAM+
+ int64(disp.cluster.Containers.ReserveExtraRAM)) / 1048576))
+ return []string{
+ "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+ }
+}
+
+func execScript(args []string) []byte {
+ s := "#!/bin/sh\nexec"
+ for _, w := range args {
+ s += ` '`
+ s += strings.Replace(w, `'`, `'\''`, -1)
+ s += `'`
+ }
+ return []byte(s + "\n")
+}
diff --git a/lib/lsf/lsfapi.go b/lib/lsf/lsfapi.go
new file mode 100644
index 000000000..a58855bbd
--- /dev/null
+++ b/lib/lsf/lsfapi.go
@@ -0,0 +1,65 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "errors"
+ "fmt"
+ "os/exec"
+ "strings"
+
+ "github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+ id int
+ name string
+ stat string
+}
+
+type lsfapi interface {
+ Bsub(script []byte, args []string) error
+ Bjobs() ([]bjobsEntry, error)
+}
+
+type lsfcli struct {
+ logger logrus.FieldLogger
+}
+
+func (lsfcli) Bsub(script []byte, args []string) error {
+ cli.logger.Infof("Bsub(%q, %q)", script, args)
+ cmd := exec.Command("bsub", args...)
+ return errors.New("unimplemented")
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+ cmd := exec.Command("bjobs", "-noheader", "-o", "jobid stat job_name:30")
+ buf, err := cmd.Output()
+ if err != nil {
+ return nil, err
+ }
+ var bjobs []bjobsEntry
+ for _, line := range strings.Split(stdout.String(), "\n") {
+ if line == "" {
+ continue
+ }
+ var ent bjobsEntry
+ if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+ cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+ continue
+ }
+ bjobs = append(bjobs, ent)
+ }
+ return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+ cmd := exec.Command("bkill", fmt.Sprintf("%d", id))
+ buf, err := cmd.CombinedOutput()
+ if err != nil && strings.Index(string(buf), "already finished") {
+ return nil
+ }
+ return err
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 403d501b4..e1c24f0fd 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -589,6 +589,7 @@ const (
ServiceNameRailsAPI ServiceName = "arvados-api-server"
ServiceNameController ServiceName = "arvados-controller"
ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+ ServiceNameDispatchLSF ServiceName = "arvados-dispatch-lsf"
ServiceNameHealth ServiceName = "arvados-health"
ServiceNameWorkbench1 ServiceName = "arvados-workbench1"
ServiceNameWorkbench2 ServiceName = "arvados-workbench2"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list