[ARVADOS] created: 2.1.0-1639-gf6e8d7c2c
Git user
git at public.arvados.org
Thu Nov 18 21:44:23 UTC 2021
at f6e8d7c2cada1570bac3e98f0712ad8651b8d9fd (commit)
commit f6e8d7c2cada1570bac3e98f0712ad8651b8d9fd
Author: Tom Clegg <tom at curii.com>
Date: Thu Nov 18 16:43:33 2021 -0500
18298: Use bjobs select[] args, cancel on "no suitable host".
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 411a79650..7813db4f0 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1089,7 +1089,7 @@ Clusters:
# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index f8553c3eb..39e4ae00c 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1095,7 +1095,7 @@ Clusters:
# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index 6e35b7de9..537d52a07 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
if ctr.State != dispatch.Locked {
// already started by prior invocation
- } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
- // If the container disappears from the lsf queue, there is
- // no point in waiting for further dispatch updates: just
- // clean up and return.
go func(uuid string) {
+ cancelled := false
for ctx.Err() == nil {
- if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+ qent, ok := disp.lsfqueue.Lookup(uuid)
+ if !ok {
+ // If the container disappears from
+ // the lsf queue, there is no point in
+ // waiting for further dispatch
+ // updates: just clean up and return.
disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
cancel()
return
}
+ if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+ disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+ err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": qent.PendReason,
+ },
+ },
+ }, nil)
+ if err != nil {
+ disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+ continue // retry
+ }
+ err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+ if err != nil {
+ continue // retry (UpdateState() already logged the error)
+ }
+ cancelled = true
+ }
}
}(ctr.UUID)
@@ -236,10 +258,10 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
// from the queue.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
- for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
- err := disp.lsfcli.Bkill(jobid)
+ for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+ err := disp.lsfcli.Bkill(qent.ID)
if err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
<-ticker.C
}
@@ -262,10 +284,10 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
}
func (disp *dispatcher) bkill(ctr arvados.Container) {
- if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
- } else if err := disp.lsfcli.Bkill(jobid); err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index bb3b1b9df..c044df09f 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -30,7 +30,8 @@ func Test(t *testing.T) {
var _ = check.Suite(&suite{})
type suite struct {
- disp *dispatcher
+ disp *dispatcher
+ crTooBig arvados.ContainerRequest
}
func (s *suite) TearDownTest(c *check.C) {
@@ -47,6 +48,22 @@ func (s *suite) SetUpTest(c *check.C) {
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
}
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 1000000000000,
+ VCPUs: 1,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
}
type lsfstub struct {
@@ -83,7 +100,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
"-J", arvadostest.LockedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=0MB]",
+ "-R", "select[ncpus>=4]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
@@ -93,7 +113,23 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
"-J", arvadostest.QueuedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=45777MB]",
+ "-R", "select[ncpus>=4]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case s.crTooBig.ContainerUUID:
+ c.Check(args, check.DeepEquals, []string{
+ "-J", s.crTooBig.ContainerUUID,
+ "-n", "1",
+ "-D", "954187MB",
+ "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
+ "-R", "select[mem>=954187MB]",
+ "-R", "select[tmp>=256MB]",
+ "-R", "select[ncpus>=1]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
@@ -107,11 +143,16 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
var records []map[string]interface{}
for jobid, uuid := range fakejobq {
+ stat, reason := "RUN", ""
+ if uuid == s.crTooBig.ContainerUUID {
+ // The real bjobs output includes a trailing ';' here:
+ stat, reason = "PEND", "There are no suitable hosts for the job;"
+ }
records = append(records, map[string]interface{}{
"JOBID": fmt.Sprintf("%d", jobid),
- "STAT": "RUN",
+ "STAT": stat,
"JOB_NAME": uuid,
- "PEND_REASON": "",
+ "PEND_REASON": reason,
})
}
out, err := json.Marshal(map[string]interface{}{
@@ -151,6 +192,7 @@ func (s *suite) TestSubmit(c *check.C) {
sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
}.stubCommand(s, c)
s.disp.Start()
+
deadline := time.Now().Add(20 * time.Second)
for range time.NewTicker(time.Second).C {
if time.Now().After(deadline) {
@@ -158,23 +200,37 @@ func (s *suite) TestSubmit(c *check.C) {
break
}
// "queuedcontainer" should be running
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
continue
}
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ continue
+ }
+ // "crTooBig" should be cancelled because lsf stub
+ // reports there is no suitable instance type
+ if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
continue
}
var ctr arvados.Container
if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
continue
- }
- if ctr.State != arvados.ContainerStateQueued {
+ } else if ctr.State != arvados.ContainerStateQueued {
c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
continue
}
+
+ if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+ c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+ continue
+ } else if ctr.State != arvados.ContainerStateCancelled {
+ c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+ continue
+ } else {
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+ }
c.Log("reached desired state")
break
}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 971bdd421..3ed4d0c18 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -23,12 +23,12 @@ type lsfqueue struct {
latest map[string]bjobsEntry
}
-// JobID waits for the next queue update (so even a job that was only
+// Lookup waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
-// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (string, bool) {
+// queue information corresponding to the given container UUID.
+func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
ent, ok := q.getNext()[uuid]
- return ent.ID, ok
+ return ent, ok
}
// All waits for the next queue update, then returns the names of all
commit fa74a8d2176d115f21554e1e929a35729173a3c3
Author: Tom Clegg <tom at curii.com>
Date: Wed Nov 17 16:42:13 2021 -0500
18298: Request pend_reason field when polling bjobs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index 641453e54..bb3b1b9df 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -6,6 +6,7 @@ package lsf
import (
"context"
+ "encoding/json"
"fmt"
"math/rand"
"os/exec"
@@ -103,13 +104,26 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
}
return exec.Command("echo", "submitted job")
case "bjobs":
- c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
- out := ""
+ c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
+ var records []map[string]interface{}
for jobid, uuid := range fakejobq {
- out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+ records = append(records, map[string]interface{}{
+ "JOBID": fmt.Sprintf("%d", jobid),
+ "STAT": "RUN",
+ "JOB_NAME": uuid,
+ "PEND_REASON": "",
+ })
}
- c.Logf("bjobs out: %q", out)
- return exec.Command("printf", out)
+ out, err := json.Marshal(map[string]interface{}{
+ "COMMAND": "bjobs",
+ "JOBS": len(fakejobq),
+ "RECORDS": records,
+ })
+ if err != nil {
+ panic(err)
+ }
+ c.Logf("bjobs out: %s", out)
+ return exec.Command("printf", string(out))
case "bkill":
killid, _ := strconv.Atoi(args[0])
if uuid, ok := fakejobq[killid]; !ok {
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
index 9d712ee97..d17559568 100644
--- a/lib/lsf/lsfcli.go
+++ b/lib/lsf/lsfcli.go
@@ -6,6 +6,7 @@ package lsf
import (
"bytes"
+ "encoding/json"
"fmt"
"os"
"os/exec"
@@ -16,9 +17,10 @@ import (
)
type bjobsEntry struct {
- id int
- name string
- stat string
+ ID string `json:"JOBID"`
+ Name string `json:"JOB_NAME"`
+ Stat string `json:"STAT"`
+ PendReason string `json:"PEND_REASON"`
}
type lsfcli struct {
@@ -53,29 +55,21 @@ func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error
func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
cli.logger.Debugf("Bjobs()")
- cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+ cmd := cli.command("bjobs", "-u", "all", "-o", "jobid stat job_name pend_reason", "-json")
buf, err := cmd.Output()
if err != nil {
return nil, errWithStderr(err)
}
- var bjobs []bjobsEntry
- for _, line := range strings.Split(string(buf), "\n") {
- if line == "" {
- continue
- }
- var ent bjobsEntry
- if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
- cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
- continue
- }
- bjobs = append(bjobs, ent)
+ var resp struct {
+ Records []bjobsEntry `json:"RECORDS"`
}
- return bjobs, nil
+ err = json.Unmarshal(buf, &resp)
+ return resp.Records, err
}
-func (cli lsfcli) Bkill(id int) error {
- cli.logger.Infof("Bkill(%d)", id)
- cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+func (cli lsfcli) Bkill(id string) error {
+ cli.logger.Infof("Bkill(%s)", id)
+ cmd := cli.command("bkill", id)
buf, err := cmd.CombinedOutput()
if err == nil || strings.Index(string(buf), "already finished") >= 0 {
return nil
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 3c4fc4cb8..971bdd421 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -26,9 +26,9 @@ type lsfqueue struct {
// JobID waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (int, bool) {
+func (q *lsfqueue) JobID(uuid string) (string, bool) {
ent, ok := q.getNext()[uuid]
- return ent.id, ok
+ return ent.ID, ok
}
// All waits for the next queue update, then returns the names of all
@@ -94,7 +94,7 @@ func (q *lsfqueue) init() {
}
next := make(map[string]bjobsEntry, len(ents))
for _, ent := range ents {
- next[ent.name] = ent
+ next[ent.Name] = ent
}
// Replace q.latest and notify all the
// goroutines that the "next update" they
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list