[ARVADOS] created: 2.1.0-1028-ga58fb1d04
Git user
git at public.arvados.org
Sun Jul 4 17:34:48 UTC 2021
at a58fb1d04bf2994de007e5347bd06d500ddbd68a (commit)
commit a58fb1d04bf2994de007e5347bd06d500ddbd68a
Author: Tom Clegg <tom at curii.com>
Date: Sun Jul 4 13:34:16 2021 -0400
17756: Build arvados-dispatch-lsf package.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/build/run-build-packages-one-target.sh b/build/run-build-packages-one-target.sh
index 8365fecad..056a1fbb1 100755
--- a/build/run-build-packages-one-target.sh
+++ b/build/run-build-packages-one-target.sh
@@ -194,6 +194,7 @@ if test -z "$packages" ; then
arvados-client
arvados-controller
arvados-dispatch-cloud
+ arvados-dispatch-lsf
arvados-docker-cleaner
arvados-git-httpd
arvados-health
diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh
index e231a83df..ba6811986 100755
--- a/build/run-build-packages.sh
+++ b/build/run-build-packages.sh
@@ -277,6 +277,8 @@ package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
package_go_binary cmd/arvados-server arvados-dispatch-cloud \
"Arvados cluster cloud dispatch"
+package_go_binary cmd/arvados-server arvados-dispatch-lsf \
+ "Dispatch Arvados containers to an LSF cluster"
package_go_binary services/arv-git-httpd arvados-git-httpd \
"Provide authenticated http access to Arvados-hosted git repositories"
package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
commit 5c8d598d480fb866b67f778cecbd93a6d4be148d
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 2 17:08:08 2021 -0400
17756: Add checkLsfQueueForOrphans.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index ff95d0db2..b7032dc73 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -11,6 +11,7 @@ import (
"fmt"
"math"
"net/http"
+ "regexp"
"strings"
"sync"
"time"
@@ -315,8 +316,23 @@ func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string
}
}
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
func (disp *dispatcher) checkLsfQueueForOrphans() {
- disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+ containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+ for _, uuid := range disp.lsfqueue.All() {
+ if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+ continue
+ }
+ err := disp.arvDispatcher.TrackContainer(uuid)
+ if err != nil {
+ disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+ }
+ }
}
func execScript(args []string) []byte {
commit 86d0b4d05d9f6d89e281cd992584e729c4ad63bd
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 2 17:07:27 2021 -0400
17756: Rewrite lsfqueue "wait for next update".
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 65f38690c..3c4fc4cb8 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -16,66 +16,93 @@ type lsfqueue struct {
period time.Duration
lsfcli *lsfcli
- initOnce sync.Once
- mutex sync.Mutex
- needUpdate chan bool
- updated *sync.Cond
- latest map[string]bjobsEntry
+ initOnce sync.Once
+ mutex sync.Mutex
+ nextReady chan (<-chan struct{})
+ updated *sync.Cond
+ latest map[string]bjobsEntry
}
// JobID waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
// job ID corresponding to the given container UUID.
func (q *lsfqueue) JobID(uuid string) (int, bool) {
- q.initOnce.Do(q.init)
- q.mutex.Lock()
- defer q.mutex.Unlock()
- select {
- case q.needUpdate <- true:
- default:
- // an update is already pending
- }
- q.updated.Wait()
- ent, ok := q.latest[uuid]
- q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+ ent, ok := q.getNext()[uuid]
return ent.id, ok
}
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+ latest := q.getNext()
+ names := make([]string, 0, len(latest))
+ for name := range latest {
+ names = append(names, name)
+ }
+ return names
+}
+
func (q *lsfqueue) SetPriority(uuid string, priority int64) {
q.initOnce.Do(q.init)
q.logger.Debug("SetPriority is not implemented")
}
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+ q.initOnce.Do(q.init)
+ <-(<-q.nextReady)
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ return q.latest
+}
+
func (q *lsfqueue) init() {
q.updated = sync.NewCond(&q.mutex)
- q.needUpdate = make(chan bool, 1)
+ q.nextReady = make(chan (<-chan struct{}))
ticker := time.NewTicker(time.Second)
go func() {
- for range q.needUpdate {
- q.logger.Debug("running bjobs")
- ents, err := q.lsfcli.Bjobs()
- if err != nil {
- q.logger.Warnf("bjobs: %s", err)
- // Retry on the next tick, don't wait
- // for another new call to JobID().
+ for range ticker.C {
+ // Send a new "next update ready" channel to
+ // the next goroutine that wants one (and any
+ // others that have already queued up since
+ // the first one started waiting).
+ //
+ // Below, when we get a new update, we'll
+ // signal that to the other goroutines by
+ // closing the ready chan.
+ ready := make(chan struct{})
+ q.nextReady <- ready
+ for {
select {
- case q.needUpdate <- true:
+ case q.nextReady <- ready:
+ continue
default:
}
+ break
+ }
+ // Run bjobs repeatedly if needed, until we
+ // get valid output.
+ var ents []bjobsEntry
+ for {
+ q.logger.Debug("running bjobs")
+ var err error
+ ents, err = q.lsfcli.Bjobs()
+ if err == nil {
+ break
+ }
+ q.logger.Warnf("bjobs: %s", err)
<-ticker.C
- continue
}
next := make(map[string]bjobsEntry, len(ents))
for _, ent := range ents {
next[ent.name] = ent
}
+ // Replace q.latest and notify all the
+ // goroutines that the "next update" they
+ // asked for is now ready.
q.mutex.Lock()
q.latest = next
- q.updated.Broadcast()
- q.logger.Debugf("waking up waiters with latest %v", q.latest)
q.mutex.Unlock()
- // Limit "bjobs" invocations to 1 per second
- <-ticker.C
+ close(ready)
}
}()
}
commit ffa1fd1fdf584c71e248e9bb7d523f788a517510
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 2 13:27:46 2021 -0400
17756: Add lsf dispatcher.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644
index 000000000..f9e73a2c7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-lsf.service
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index d0aa9da94..4b94a7813 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -15,6 +15,7 @@ import (
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/install"
+ "git.arvados.org/arvados.git/lib/lsf"
"git.arvados.org/arvados.git/lib/recovercollection"
"git.arvados.org/arvados.git/services/ws"
)
@@ -33,6 +34,7 @@ var (
"controller": controller.Command,
"crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
+ "dispatch-lsf": lsf.DispatchCommand,
"install": install.Command,
"init": install.InitCommand,
"recover-collection": recovercollection.Command,
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index e28d5cbb7..975b4cd0f 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -52,6 +52,9 @@ Clusters:
DispatchCloud:
InternalURLs: {SAMPLE: {}}
ExternalURL: "-"
+ DispatchLSF:
+ InternalURLs: {SAMPLE: {}}
+ ExternalURL: "-"
Keepproxy:
InternalURLs: {SAMPLE: {}}
ExternalURL: ""
@@ -1012,6 +1015,15 @@ Clusters:
# (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
AssignNodeHostname: "compute%<slot_number>d"
+ LSF:
+ # Additional arguments to bsub when submitting Arvados
+ # containers as LSF jobs.
+ BsubArgumentsList: []
+
+ # Use sudo to switch to this user account when submitting LSF
+ # jobs.
+ BsubSudoUser: "crunch"
+
JobsAPI:
# Enable the legacy 'jobs' API (crunch v1). This value must be a string.
#
diff --git a/lib/config/export.go b/lib/config/export.go
index 8753b52f2..7adb50ec3 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
"Containers.JobsAPI.GitInternalDir": false,
"Containers.Logging": false,
"Containers.LogReuseDecisions": false,
+ "Containers.LSF": false,
"Containers.MaxComputeVMs": false,
"Containers.MaxDispatchAttempts": false,
"Containers.MaxRetryAttempts": true,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index b15bf7eeb..dec250558 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -58,6 +58,9 @@ Clusters:
DispatchCloud:
InternalURLs: {SAMPLE: {}}
ExternalURL: "-"
+ DispatchLSF:
+ InternalURLs: {SAMPLE: {}}
+ ExternalURL: "-"
Keepproxy:
InternalURLs: {SAMPLE: {}}
ExternalURL: ""
@@ -1018,6 +1021,15 @@ Clusters:
# (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
AssignNodeHostname: "compute%<slot_number>d"
+ LSF:
+ # Additional arguments to bsub when submitting Arvados
+ # containers as LSF jobs.
+ BsubArgumentsList: []
+
+ # Use sudo to switch to this user account when submitting LSF
+ # jobs.
+ BsubSudoUser: "crunch"
+
JobsAPI:
# Enable the legacy 'jobs' API (crunch v1). This value must be a string.
#
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644
index 000000000..ff95d0db2
--- /dev/null
+++ b/lib/lsf/dispatch.go
@@ -0,0 +1,330 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "math"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/dispatch"
+ "git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/julienschmidt/httprouter"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+ ac, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+ d := &dispatcher{
+ Cluster: cluster,
+ Context: ctx,
+ ArvClient: ac,
+ AuthToken: token,
+ Registry: reg,
+ }
+ go d.Start()
+ return d
+}
+
+type dispatcher struct {
+ Cluster *arvados.Cluster
+ Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
+ Registry *prometheus.Registry
+
+ logger logrus.FieldLogger
+ lsfcli lsfcli
+ lsfqueue lsfqueue
+ arvDispatcher *dispatch.Dispatcher
+ httpHandler http.Handler
+
+ initOnce sync.Once
+ stop chan struct{}
+ stopped chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+ disp.initOnce.Do(func() {
+ disp.init()
+ go func() {
+ disp.checkLsfQueueForOrphans()
+ err := disp.arvDispatcher.Run(disp.Context)
+ if err != nil {
+ disp.logger.Error(err)
+ disp.Close()
+ }
+ }()
+ })
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ disp.Start()
+ disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+ disp.Start()
+ select {
+ case <-disp.stopped:
+ return errors.New("stopped")
+ default:
+ return nil
+ }
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+ return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+ disp.Start()
+ select {
+ case disp.stop <- struct{}{}:
+ default:
+ }
+ <-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+ disp.logger = ctxlog.FromContext(disp.Context)
+ disp.lsfcli.logger = disp.logger
+ disp.lsfqueue = lsfqueue{
+ logger: disp.logger,
+ period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+ lsfcli: &disp.lsfcli,
+ }
+ disp.ArvClient.AuthToken = disp.AuthToken
+ disp.stop = make(chan struct{}, 1)
+ disp.stopped = make(chan struct{})
+
+ arv, err := arvadosclient.New(disp.ArvClient)
+ if err != nil {
+ disp.logger.Fatalf("Error making Arvados client: %v", err)
+ }
+ arv.Retries = 25
+ arv.ApiToken = disp.AuthToken
+ disp.arvDispatcher = &dispatch.Dispatcher{
+ Arv: arv,
+ Logger: disp.logger,
+ BatchSize: disp.Cluster.API.MaxItemsPerResponse,
+ RunContainer: disp.runContainer,
+ PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+ MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+ }
+
+ if disp.Cluster.ManagementToken == "" {
+ disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+ })
+ } else {
+ mux := httprouter.New()
+ metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+ ErrorLog: disp.logger,
+ })
+ mux.Handler("GET", "/metrics", metricsH)
+ mux.Handler("GET", "/metrics.json", metricsH)
+ mux.Handler("GET", "/_health/:check", &health.Handler{
+ Token: disp.Cluster.ManagementToken,
+ Prefix: "/_health/",
+ Routes: health.Routes{"ping": disp.CheckHealth},
+ })
+ disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+ }
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ ctx, cancel := context.WithCancel(disp.Context)
+ defer cancel()
+
+ if ctr.State != dispatch.Locked {
+ // already started by prior invocation
+ } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+ cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
+ cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
+ cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
+ if err := disp.submit(ctr, cmd); err != nil {
+ var text string
+ switch err := err.(type) {
+ case dispatchcloud.ConstraintsNotSatisfiableError:
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+ )
+ }
+ }
+ text = logBuf.String()
+ disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ default:
+ text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
+ }
+ disp.logger.Print(text)
+
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": ctr.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ disp.arvDispatcher.Arv.Create("logs", lr, nil)
+
+ disp.arvDispatcher.Unlock(ctr.UUID)
+ return
+ }
+ }
+
+ disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+ defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+ // If the container disappears from the lsf queue, there is
+ // no point in waiting for further dispatch updates: just
+ // clean up and return.
+ go func(uuid string) {
+ for ctx.Err() == nil {
+ if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+ disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
+ cancel()
+ return
+ }
+ }
+ }(ctr.UUID)
+
+ for done := false; !done; {
+ select {
+ case <-ctx.Done():
+ // Disappeared from lsf queue
+ if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+ disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+ }
+ switch ctr.State {
+ case dispatch.Running:
+ disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ case dispatch.Locked:
+ disp.arvDispatcher.Unlock(ctr.UUID)
+ }
+ return
+ case updated, ok := <-status:
+ if !ok {
+ done = true
+ break
+ }
+ if updated.State != ctr.State {
+ disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
+ }
+ ctr = updated
+ if ctr.Priority == 0 {
+ disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
+ disp.bkill(ctr)
+ } else {
+ disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
+ }
+ }
+ }
+ disp.logger.Printf("container %s is done", ctr.UUID)
+
+ // Try "bkill" every few seconds until the LSF job disappears
+ // from the queue.
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+ for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
+ err := disp.lsfcli.Bkill(jobid)
+ if err != nil {
+ disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ }
+ <-ticker.C
+ }
+}
+
+func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+ // Start with an empty slice here to ensure append() doesn't
+ // modify crunchRunCommand's underlying array
+ var crArgs []string
+ crArgs = append(crArgs, crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := execScript(crArgs)
+
+ bsubArgs, err := disp.bsubArgs(container)
+ if err != nil {
+ return err
+ }
+ return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
+}
+
+func (disp *dispatcher) bkill(ctr arvados.Container) {
+ if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
+ } else if err := disp.lsfcli.Bkill(jobid); err != nil {
+ disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ }
+}
+
+func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+ args := []string{"bsub"}
+ args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
+ args = append(args, "-J", container.UUID)
+ args = append(args, disp.bsubConstraintArgs(container)...)
+ if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+ args = append([]string{"sudo", "-E", "-u", u}, args...)
+ }
+ return args, nil
+}
+
+func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+ // TODO: propagate container.SchedulingParameters.Partitions
+ tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+ vcpus := container.RuntimeConstraints.VCPUs
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+ container.RuntimeConstraints.KeepCacheRAM+
+ int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
+ return []string{
+ "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+ }
+}
+
+func (disp *dispatcher) checkLsfQueueForOrphans() {
+ disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+}
+
+func execScript(args []string) []byte {
+ s := "#!/bin/sh\nexec"
+ for _, w := range args {
+ s += ` '`
+ s += strings.Replace(w, `'`, `'\''`, -1)
+ s += `'`
+ }
+ return []byte(s + "\n")
+}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
new file mode 100644
index 000000000..25db95e10
--- /dev/null
+++ b/lib/lsf/dispatch_test.go
@@ -0,0 +1,145 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "os/exec"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
+ "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+ disp *dispatcher
+}
+
+func (s *suite) TearDownTest(c *check.C) {
+ arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+}
+
+func (s *suite) SetUpTest(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+ s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
+ s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
+ return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
+ }
+}
+
+type lsfstub struct {
+ errorRate float64
+}
+
+func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd {
+ mtx := sync.Mutex{}
+ nextjobid := 100
+ fakejobq := map[int]string{}
+ return func(prog string, args ...string) *exec.Cmd {
+ c.Logf("stubCommand: %q %q", prog, args)
+ if rand.Float64() < stub.errorRate {
+ return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
+ }
+ switch prog {
+ case "bsub":
+ c.Assert(args, check.HasLen, 4)
+ c.Check(args[0], check.Equals, "-J")
+ switch args[1] {
+ case arvadostest.LockedContainerUUID:
+ c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case arvadostest.QueuedContainerUUID:
+ c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ default:
+ c.Errorf("unexpected uuid passed to bsub: args %q", args)
+ return exec.Command("false")
+ }
+ return exec.Command("echo", "submitted job")
+ case "bjobs":
+ c.Check(args, check.DeepEquals, []string{"-noheader", "-o", "jobid stat job_name:30"})
+ out := ""
+ for jobid, uuid := range fakejobq {
+ out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+ }
+ c.Logf("bjobs out: %q", out)
+ return exec.Command("printf", out)
+ case "bkill":
+ killid, _ := strconv.Atoi(args[0])
+ if uuid, ok := fakejobq[killid]; !ok {
+ return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
+ } else if uuid == "" {
+ return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
+ } else {
+ go func() {
+ time.Sleep(time.Millisecond)
+ mtx.Lock()
+ delete(fakejobq, killid)
+ mtx.Unlock()
+ }()
+ return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
+ }
+ default:
+ return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
+ }
+ }
+}
+
+func (s *suite) TestSubmit(c *check.C) {
+ s.disp.lsfcli.stubCommand = lsfstub{errorRate: 0.1}.stubCommand(c)
+ s.disp.Start()
+ deadline := time.Now().Add(20 * time.Second)
+ for range time.NewTicker(time.Second).C {
+ if time.Now().After(deadline) {
+ c.Error("timed out")
+ break
+ }
+ // "queuedcontainer" should be running
+ if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+ continue
+ }
+ // "lockedcontainer" should be cancelled because it
+ // has priority 0 (no matching container requests)
+ if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+ continue
+ }
+ var ctr arvados.Container
+ if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
+ c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
+ continue
+ }
+ if ctr.State != arvados.ContainerStateQueued {
+ c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
+ continue
+ }
+ c.Log("reached desired state")
+ break
+ }
+}
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
new file mode 100644
index 000000000..9d712ee97
--- /dev/null
+++ b/lib/lsf/lsfcli.go
@@ -0,0 +1,92 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+ id int
+ name string
+ stat string
+}
+
+type lsfcli struct {
+ logger logrus.FieldLogger
+ // (for testing) if non-nil, call stubCommand() instead of
+ // exec.Command() when running lsf command line programs.
+ stubCommand func(string, ...string) *exec.Cmd
+}
+
+func (cli lsfcli) command(prog string, args ...string) *exec.Cmd {
+ if f := cli.stubCommand; f != nil {
+ return f(prog, args...)
+ } else {
+ return exec.Command(prog, args...)
+ }
+}
+
+func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error {
+ cli.logger.Infof("bsub command %q script %q", args, script)
+ cmd := cli.command(args[0], args[1:]...)
+ cmd.Env = append([]string(nil), os.Environ()...)
+ cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost)
+ cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken)
+ if arv.Insecure {
+ cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1")
+ }
+ cmd.Stdin = bytes.NewReader(script)
+ out, err := cmd.Output()
+ cli.logger.WithField("stdout", string(out)).Infof("bsub finished")
+ return errWithStderr(err)
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+ cli.logger.Debugf("Bjobs()")
+ cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+ buf, err := cmd.Output()
+ if err != nil {
+ return nil, errWithStderr(err)
+ }
+ var bjobs []bjobsEntry
+ for _, line := range strings.Split(string(buf), "\n") {
+ if line == "" {
+ continue
+ }
+ var ent bjobsEntry
+ if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+ cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+ continue
+ }
+ bjobs = append(bjobs, ent)
+ }
+ return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+ cli.logger.Infof("Bkill(%d)", id)
+ cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+ buf, err := cmd.CombinedOutput()
+ if err == nil || strings.Index(string(buf), "already finished") >= 0 {
+ return nil
+ } else {
+ return fmt.Errorf("%s (%q)", err, buf)
+ }
+}
+
+func errWithStderr(err error) error {
+ if err, ok := err.(*exec.ExitError); ok {
+ return fmt.Errorf("%s (%q)", err, err.Stderr)
+ }
+ return err
+}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
new file mode 100644
index 000000000..65f38690c
--- /dev/null
+++ b/lib/lsf/lsfqueue.go
@@ -0,0 +1,81 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+)
+
+type lsfqueue struct {
+ logger logrus.FieldLogger
+ period time.Duration
+ lsfcli *lsfcli
+
+ initOnce sync.Once
+ mutex sync.Mutex
+ needUpdate chan bool
+ updated *sync.Cond
+ latest map[string]bjobsEntry
+}
+
+// JobID waits for the next queue update (so even a job that was only
+// submitted a nanosecond ago will show up) and then returns the LSF
+// job ID corresponding to the given container UUID.
+func (q *lsfqueue) JobID(uuid string) (int, bool) {
+ q.initOnce.Do(q.init)
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ select {
+ case q.needUpdate <- true:
+ default:
+ // an update is already pending
+ }
+ q.updated.Wait()
+ ent, ok := q.latest[uuid]
+ q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+ return ent.id, ok
+}
+
+func (q *lsfqueue) SetPriority(uuid string, priority int64) {
+ q.initOnce.Do(q.init)
+ q.logger.Debug("SetPriority is not implemented")
+}
+
+func (q *lsfqueue) init() {
+ q.updated = sync.NewCond(&q.mutex)
+ q.needUpdate = make(chan bool, 1)
+ ticker := time.NewTicker(time.Second)
+ go func() {
+ for range q.needUpdate {
+ q.logger.Debug("running bjobs")
+ ents, err := q.lsfcli.Bjobs()
+ if err != nil {
+ q.logger.Warnf("bjobs: %s", err)
+ // Retry on the next tick, don't wait
+ // for another new call to JobID().
+ select {
+ case q.needUpdate <- true:
+ default:
+ }
+ <-ticker.C
+ continue
+ }
+ next := make(map[string]bjobsEntry, len(ents))
+ for _, ent := range ents {
+ next[ent.name] = ent
+ }
+ q.mutex.Lock()
+ q.latest = next
+ q.updated.Broadcast()
+ q.logger.Debugf("waking up waiters with latest %v", q.latest)
+ q.mutex.Unlock()
+ // Limit "bjobs" invocations to 1 per second
+ <-ticker.C
+ }
+ }()
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 6e59828a3..844991f41 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -329,6 +329,7 @@ type Services struct {
Composer Service
Controller Service
DispatchCloud Service
+ DispatchLSF Service
GitHTTP Service
GitSSH Service
Health Service
@@ -461,6 +462,10 @@ type ContainersConfig struct {
AssignNodeHostname string
}
}
+ LSF struct {
+ BsubSudoUser string
+ BsubArgumentsList []string
+ }
}
type CloudVMsConfig struct {
@@ -597,6 +602,7 @@ const (
ServiceNameRailsAPI ServiceName = "arvados-api-server"
ServiceNameController ServiceName = "arvados-controller"
ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+ ServiceNameDispatchLSF ServiceName = "arvados-dispatch-lsf"
ServiceNameHealth ServiceName = "arvados-health"
ServiceNameWorkbench1 ServiceName = "arvados-workbench1"
ServiceNameWorkbench2 ServiceName = "arvados-workbench2"
@@ -614,6 +620,7 @@ func (svcs Services) Map() map[ServiceName]Service {
ServiceNameRailsAPI: svcs.RailsAPI,
ServiceNameController: svcs.Controller,
ServiceNameDispatchCloud: svcs.DispatchCloud,
+ ServiceNameDispatchLSF: svcs.DispatchLSF,
ServiceNameHealth: svcs.Health,
ServiceNameWorkbench1: svcs.Workbench1,
ServiceNameWorkbench2: svcs.Workbench2,
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 4b7ad6dd5..3de4225d5 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -45,6 +45,8 @@ const (
QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
+ LockedContainerUUID = "zzzzz-dz642-lockedcontainer"
+
RunningContainerUUID = "zzzzz-dz642-runningcontainr"
CompletedContainerUUID = "zzzzz-dz642-compltcontainer"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list