[ARVADOS] created: 2.1.0-975-ga4521a3bd

Git user git at public.arvados.org
Thu Jul 1 20:26:26 UTC 2021


        at  a4521a3bd22c6e794eed8bfc0bf70f3dc1d51a16 (commit)


commit a4521a3bd22c6e794eed8bfc0bf70f3dc1d51a16
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jul 1 16:26:13 2021 -0400

    17756: Add lsf dispatcher. (work in progress)
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644
index 000000000..f9e73a2c7
--- /dev/null
+++ b/cmd/arvados-server/arvados-dispatch-lsf.service
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go
index d0aa9da94..4b94a7813 100644
--- a/cmd/arvados-server/cmd.go
+++ b/cmd/arvados-server/cmd.go
@@ -15,6 +15,7 @@ import (
 	"git.arvados.org/arvados.git/lib/crunchrun"
 	"git.arvados.org/arvados.git/lib/dispatchcloud"
 	"git.arvados.org/arvados.git/lib/install"
+	"git.arvados.org/arvados.git/lib/lsf"
 	"git.arvados.org/arvados.git/lib/recovercollection"
 	"git.arvados.org/arvados.git/services/ws"
 )
@@ -33,6 +34,7 @@ var (
 		"controller":         controller.Command,
 		"crunch-run":         crunchrun.Command,
 		"dispatch-cloud":     dispatchcloud.Command,
+		"dispatch-lsf":       lsf.DispatchCommand,
 		"install":            install.Command,
 		"init":               install.InitCommand,
 		"recover-collection": recovercollection.Command,
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644
index 000000000..732fe625b
--- /dev/null
+++ b/lib/lsf/dispatch.go
@@ -0,0 +1,307 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"math"
+	"net/http"
+	"strings"
+	"sync"
+	"time"
+
+	"git.arvados.org/arvados.git/lib/cmd"
+	"git.arvados.org/arvados.git/lib/dispatchcloud"
+	"git.arvados.org/arvados.git/lib/service"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+	"git.arvados.org/arvados.git/sdk/go/auth"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"git.arvados.org/arvados.git/sdk/go/dispatch"
+	"git.arvados.org/arvados.git/sdk/go/health"
+	"github.com/julienschmidt/httprouter"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
+	"github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+	ac, err := arvados.NewClientFromConfig(cluster)
+	if err != nil {
+		return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+	}
+	d := &dispatcher{
+		Cluster:   cluster,
+		Context:   ctx,
+		ArvClient: ac,
+		AuthToken: token,
+		Registry:  reg,
+	}
+	go d.Start()
+	return d
+}
+
+type dispatcher struct {
+	Cluster   *arvados.Cluster
+	Context   context.Context
+	ArvClient *arvados.Client
+	AuthToken string
+	Registry  *prometheus.Registry
+
+	logger        logrus.FieldLogger
+	lsfcli        lsfcli
+	lsfqueue      lsfqueue
+	arvDispatcher *dispatch.Dispatcher
+	httpHandler   http.Handler
+
+	initOnce sync.Once
+	stop     chan struct{}
+	stopped  chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+	disp.initOnce.Do(func() {
+		disp.init()
+		go disp.checkLsfQueueForOrphans()
+		go disp.arvDispatcher.Run(disp.Context)
+	})
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	disp.Start()
+	disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+	disp.Start()
+	return nil
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+	return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+	disp.Start()
+	select {
+	case disp.stop <- struct{}{}:
+	default:
+	}
+	<-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+	disp.logger = ctxlog.FromContext(disp.Context)
+	disp.lsfcli.logger = disp.logger
+	disp.lsfqueue = lsfqueue{
+		logger: disp.logger,
+		period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+		lsfcli: &disp.lsfcli,
+	}
+	disp.ArvClient.AuthToken = disp.AuthToken
+	disp.stop = make(chan struct{}, 1)
+	disp.stopped = make(chan struct{})
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		disp.logger.Fatalf("Error making Arvados client: %v", err)
+	}
+	arv.Retries = 25
+	arv.ApiToken = disp.AuthToken
+	disp.arvDispatcher = &dispatch.Dispatcher{
+		Arv:            arv,
+		Logger:         disp.logger,
+		BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
+		RunContainer:   disp.runContainer,
+		PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+		MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+	}
+
+	if disp.Cluster.ManagementToken == "" {
+		disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+			http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+		})
+	} else {
+		mux := httprouter.New()
+		metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+			ErrorLog: disp.logger,
+		})
+		mux.Handler("GET", "/metrics", metricsH)
+		mux.Handler("GET", "/metrics.json", metricsH)
+		mux.Handler("GET", "/_health/:check", &health.Handler{
+			Token:  disp.Cluster.ManagementToken,
+			Prefix: "/_health/",
+			Routes: health.Routes{"ping": disp.CheckHealth},
+		})
+		disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+	}
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+	ctx, cancel := context.WithCancel(disp.Context)
+	defer cancel()
+
+	if ctr.State != dispatch.Locked {
+		// already started by prior invocation
+	} else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+		disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+		cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
+		cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
+		if err := disp.submit(ctr, cmd); err != nil {
+			var text string
+			switch err := err.(type) {
+			case dispatchcloud.ConstraintsNotSatisfiableError:
+				var logBuf bytes.Buffer
+				fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+				if len(err.AvailableTypes) == 0 {
+					fmt.Fprint(&logBuf, "No instance types are configured.\n")
+				} else {
+					fmt.Fprint(&logBuf, "Available instance types:\n")
+					for _, t := range err.AvailableTypes {
+						fmt.Fprintf(&logBuf,
+							"Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+							t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+						)
+					}
+				}
+				text = logBuf.String()
+				disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+			default:
+				text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
+			}
+			disp.logger.Print(text)
+
+			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+				"object_uuid": ctr.UUID,
+				"event_type":  "dispatch",
+				"properties":  map[string]string{"text": text}}}
+			disp.arvDispatcher.Arv.Create("logs", lr, nil)
+
+			disp.arvDispatcher.Unlock(ctr.UUID)
+			return
+		}
+	}
+
+	disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+	defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+	// If the container disappears from the lsf queue, there is
+	// no point in waiting for further dispatch updates: just
+	// clean up and return.
+	go func(uuid string) {
+		for ctx.Err() == nil {
+			if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+				disp.logger.Printf("Container %s: job disappeared from LSF queue", uuid)
+				cancel()
+				return
+			}
+		}
+	}(ctr.UUID)
+
+	for {
+		select {
+		case <-ctx.Done():
+			// Disappeared from squeue
+			if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+				disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+			}
+			switch ctr.State {
+			case dispatch.Running:
+				disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+			case dispatch.Locked:
+				disp.arvDispatcher.Unlock(ctr.UUID)
+			}
+			return
+		case updated, ok := <-status:
+			if !ok {
+				disp.logger.Printf("container %s is done: cancel lsf job", ctr.UUID)
+				disp.bkill(ctr)
+			} else if updated.Priority == 0 {
+				disp.logger.Printf("container %s has state %q, priority %d: cancel lsf job", ctr.UUID, updated.State, updated.Priority)
+				disp.bkill(ctr)
+			} else {
+				p := int64(updated.Priority)
+				if p <= 1000 {
+					// API is providing
+					// user-assigned priority. If
+					// ctrs have equal priority,
+					// run the older one first.
+					p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+				}
+				disp.lsfqueue.SetPriority(ctr.UUID, p)
+			}
+		}
+	}
+}
+
+func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+	// Start with an empty slice here to ensure append() doesn't
+	// modify crunchRunCommand's underlying array
+	var crArgs []string
+	crArgs = append(crArgs, crunchRunCommand...)
+	crArgs = append(crArgs, container.UUID)
+	crScript := execScript(crArgs)
+
+	bsubArgs, err := disp.bsubArgs(container)
+	if err != nil {
+		return err
+	}
+	return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
+}
+
+func (disp *dispatcher) bkill(ctr arvados.Container) {
+	if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok {
+		disp.lsfcli.Bkill(jobid)
+	} else {
+		disp.logger.Infof("bkill(%s): job not in queue", ctr.UUID)
+	}
+}
+
+func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+	// Start with an empty slice here to ensure append() doesn't
+	// modify SbatchArgumentsList's underlying array
+	var args []string
+	args = append(args, disp.Cluster.Containers.SLURM.SbatchArgumentsList...)
+	// -J "job name"
+	args = append(args, "-J", container.UUID)
+	args = append(args, disp.bsubConstraintArgs(container)...)
+	return args, nil
+}
+
+func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+	// TODO: propagate container.SchedulingParameters.Partitions
+	tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+	vcpus := container.RuntimeConstraints.VCPUs
+	mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+		container.RuntimeConstraints.KeepCacheRAM+
+		int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
+	return []string{
+		"-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+	}
+}
+
+func (disp *dispatcher) checkLsfQueueForOrphans() {
+	disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+}
+
+func execScript(args []string) []byte {
+	s := "#!/bin/sh\nexec"
+	for _, w := range args {
+		s += ` '`
+		s += strings.Replace(w, `'`, `'\''`, -1)
+		s += `'`
+	}
+	return []byte(s + "\n")
+}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
new file mode 100644
index 000000000..25db95e10
--- /dev/null
+++ b/lib/lsf/dispatch_test.go
@@ -0,0 +1,145 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+	"context"
+	"fmt"
+	"math/rand"
+	"os/exec"
+	"strconv"
+	"sync"
+	"testing"
+	"time"
+
+	"git.arvados.org/arvados.git/lib/config"
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"github.com/prometheus/client_golang/prometheus"
+	"gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+	disp *dispatcher
+}
+
+func (s *suite) TearDownTest(c *check.C) {
+	arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+}
+
+func (s *suite) SetUpTest(c *check.C) {
+	cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+	c.Assert(err, check.IsNil)
+	cluster, err := cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+	cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+	s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
+	s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
+		return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
+	}
+}
+
+type lsfstub struct {
+	errorRate float64
+}
+
+func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd {
+	mtx := sync.Mutex{}
+	nextjobid := 100
+	fakejobq := map[int]string{}
+	return func(prog string, args ...string) *exec.Cmd {
+		c.Logf("stubCommand: %q %q", prog, args)
+		if rand.Float64() < stub.errorRate {
+			return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
+		}
+		switch prog {
+		case "bsub":
+			c.Assert(args, check.HasLen, 4)
+			c.Check(args[0], check.Equals, "-J")
+			switch args[1] {
+			case arvadostest.LockedContainerUUID:
+				c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+				mtx.Lock()
+				fakejobq[nextjobid] = args[1]
+				nextjobid++
+				mtx.Unlock()
+			case arvadostest.QueuedContainerUUID:
+				c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+				mtx.Lock()
+				fakejobq[nextjobid] = args[1]
+				nextjobid++
+				mtx.Unlock()
+			default:
+				c.Errorf("unexpected uuid passed to bsub: args %q", args)
+				return exec.Command("false")
+			}
+			return exec.Command("echo", "submitted job")
+		case "bjobs":
+			c.Check(args, check.DeepEquals, []string{"-noheader", "-o", "jobid stat job_name:30"})
+			out := ""
+			for jobid, uuid := range fakejobq {
+				out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+			}
+			c.Logf("bjobs out: %q", out)
+			return exec.Command("printf", out)
+		case "bkill":
+			killid, _ := strconv.Atoi(args[0])
+			if uuid, ok := fakejobq[killid]; !ok {
+				return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
+			} else if uuid == "" {
+				return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
+			} else {
+				go func() {
+					time.Sleep(time.Millisecond)
+					mtx.Lock()
+					delete(fakejobq, killid)
+					mtx.Unlock()
+				}()
+				return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
+			}
+		default:
+			return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
+		}
+	}
+}
+
+func (s *suite) TestSubmit(c *check.C) {
+	s.disp.lsfcli.stubCommand = lsfstub{errorRate: 0.1}.stubCommand(c)
+	s.disp.Start()
+	deadline := time.Now().Add(20 * time.Second)
+	for range time.NewTicker(time.Second).C {
+		if time.Now().After(deadline) {
+			c.Error("timed out")
+			break
+		}
+		// "queuedcontainer" should be running
+		if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+			continue
+		}
+		// "lockedcontainer" should be cancelled because it
+		// has priority 0 (no matching container requests)
+		if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+			continue
+		}
+		var ctr arvados.Container
+		if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
+			c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
+			continue
+		}
+		if ctr.State != arvados.ContainerStateQueued {
+			c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
+			continue
+		}
+		c.Log("reached desired state")
+		break
+	}
+}
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
new file mode 100644
index 000000000..6786374a6
--- /dev/null
+++ b/lib/lsf/lsfcli.go
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+	"bytes"
+	"fmt"
+	"os"
+	"os/exec"
+	"strings"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+	id   int
+	name string
+	stat string
+}
+
+type lsfcli struct {
+	logger logrus.FieldLogger
+	// (for testing) if non-nil, call stubCommand() instead of
+	// exec.Command() when running lsf command line programs.
+	stubCommand func(string, ...string) *exec.Cmd
+}
+
+func (cli lsfcli) command(prog string, args ...string) *exec.Cmd {
+	if f := cli.stubCommand; f != nil {
+		return f(prog, args...)
+	} else {
+		return exec.Command(prog, args...)
+	}
+}
+
+func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error {
+	cli.logger.Infof("Bsub(%q, %q)", script, args)
+	cmd := cli.command("bsub", args...)
+	cmd.Env = append([]string(nil), os.Environ()...)
+	cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost)
+	cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken)
+	if arv.Insecure {
+		cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1")
+	}
+	cmd.Stdin = bytes.NewReader(script)
+	return cmd.Run()
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+	cli.logger.Debugf("Bjobs()")
+	cmd := cli.command("bjobs", "-noheader", "-o", "jobid stat job_name:30")
+	buf, err := cmd.Output()
+	if err != nil {
+		return nil, err
+	}
+	var bjobs []bjobsEntry
+	for _, line := range strings.Split(string(buf), "\n") {
+		if line == "" {
+			continue
+		}
+		var ent bjobsEntry
+		if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+			cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+			continue
+		}
+		bjobs = append(bjobs, ent)
+	}
+	return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+	cli.logger.Infof("Bkill(%d)", id)
+	cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+	buf, err := cmd.CombinedOutput()
+	if err != nil && strings.Index(string(buf), "already finished") >= 0 {
+		return nil
+	}
+	return err
+}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
new file mode 100644
index 000000000..021015d44
--- /dev/null
+++ b/lib/lsf/lsfqueue.go
@@ -0,0 +1,81 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+	"sync"
+	"time"
+
+	"github.com/sirupsen/logrus"
+)
+
+type lsfqueue struct {
+	logger logrus.FieldLogger
+	period time.Duration
+	lsfcli *lsfcli
+
+	initOnce   sync.Once
+	mutex      sync.Mutex
+	needUpdate chan bool
+	updated    *sync.Cond
+	latest     map[string]bjobsEntry
+}
+
+// JobID waits for the next queue update (so even a job that was only
+// submitted a nanosecond ago will show up) and then returns the LSF
+// job ID corresponding to the given container UUID.
+func (q *lsfqueue) JobID(uuid string) (int, bool) {
+	q.initOnce.Do(q.init)
+	q.mutex.Lock()
+	defer q.mutex.Unlock()
+	select {
+	case q.needUpdate <- true:
+	default:
+		// an update is already pending
+	}
+	q.updated.Wait()
+	ent, ok := q.latest[uuid]
+	q.logger.Infof("JobID(%q) == %d", uuid, ent.id)
+	return ent.id, ok
+}
+
+func (q *lsfqueue) SetPriority(uuid string, priority int64) {
+	q.initOnce.Do(q.init)
+	q.logger.Debug("SetPriority is not implemented")
+}
+
+func (q *lsfqueue) init() {
+	q.updated = sync.NewCond(&q.mutex)
+	q.needUpdate = make(chan bool, 1)
+	ticker := time.NewTicker(time.Second)
+	go func() {
+		for range q.needUpdate {
+			q.logger.Info("running bjobs")
+			ents, err := q.lsfcli.Bjobs()
+			if err != nil {
+				q.logger.Warnf("bjobs: %s", err)
+				// Retry on the next tick, don't wait
+				// for another new call to JobID().
+				select {
+				case q.needUpdate <- true:
+				default:
+				}
+				<-ticker.C
+				continue
+			}
+			next := make(map[string]bjobsEntry, len(ents))
+			for _, ent := range ents {
+				next[ent.name] = ent
+			}
+			q.mutex.Lock()
+			q.latest = next
+			q.updated.Broadcast()
+			q.logger.Infof("waking up waiters with latest %v", q.latest)
+			q.mutex.Unlock()
+			// Limit "bjobs" invocations to 1 per second
+			<-ticker.C
+		}
+	}()
+}
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 403d501b4..e1c24f0fd 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -589,6 +589,7 @@ const (
 	ServiceNameRailsAPI      ServiceName = "arvados-api-server"
 	ServiceNameController    ServiceName = "arvados-controller"
 	ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+	ServiceNameDispatchLSF   ServiceName = "arvados-dispatch-lsf"
 	ServiceNameHealth        ServiceName = "arvados-health"
 	ServiceNameWorkbench1    ServiceName = "arvados-workbench1"
 	ServiceNameWorkbench2    ServiceName = "arvados-workbench2"
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index a4d7e88b2..f426a4ebb 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -42,6 +42,8 @@ const (
 	QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
 	QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+	LockedContainerUUID = "zzzzz-dz642-lockedcontainer"
+
 	RunningContainerUUID = "zzzzz-dz642-runningcontainr"
 
 	CompletedContainerUUID         = "zzzzz-dz642-compltcontainer"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list