[ARVADOS] created: 2.1.0-1639-gf6e8d7c2c

Git user git at public.arvados.org
Thu Nov 18 21:44:23 UTC 2021


        at  f6e8d7c2cada1570bac3e98f0712ad8651b8d9fd (commit)


commit f6e8d7c2cada1570bac3e98f0712ad8651b8d9fd
Author: Tom Clegg <tom at curii.com>
Date:   Thu Nov 18 16:43:33 2021 -0500

    18298: Use bjobs select[] args, cancel on "no suitable host".
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 411a79650..7813db4f0 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1089,7 +1089,7 @@ Clusters:
         # in /tmp on the compute node each time an Arvados container
         # runs. Ensure you have something in place to delete old files
         # from /tmp, or adjust the "-o" and "-e" arguments accordingly.
-        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
 
         # Use sudo to switch to this user account when submitting LSF
         # jobs.
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index f8553c3eb..39e4ae00c 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -1095,7 +1095,7 @@ Clusters:
         # in /tmp on the compute node each time an Arvados container
         # runs. Ensure you have something in place to delete old files
         # from /tmp, or adjust the "-o" and "-e" arguments accordingly.
-        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
 
         # Use sudo to switch to this user account when submitting LSF
         # jobs.
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index 6e35b7de9..537d52a07 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
 	if ctr.State != dispatch.Locked {
 		// already started by prior invocation
-	} else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+	} else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
 		disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
 		cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
 		cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 	disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
 	defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
-	// If the container disappears from the lsf queue, there is
-	// no point in waiting for further dispatch updates: just
-	// clean up and return.
 	go func(uuid string) {
+		cancelled := false
 		for ctx.Err() == nil {
-			if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+			qent, ok := disp.lsfqueue.Lookup(uuid)
+			if !ok {
+				// If the container disappears from
+				// the lsf queue, there is no point in
+				// waiting for further dispatch
+				// updates: just clean up and return.
 				disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
 				cancel()
 				return
 			}
+			if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+				disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+				err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+					"container": map[string]interface{}{
+						"runtime_status": map[string]string{
+							"error": qent.PendReason,
+						},
+					},
+				}, nil)
+				if err != nil {
+					disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+					continue // retry
+				}
+				err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+				if err != nil {
+					continue // retry (UpdateState() already logged the error)
+				}
+				cancelled = true
+			}
 		}
 	}(ctr.UUID)
 
@@ -236,10 +258,10 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 	// from the queue.
 	ticker := time.NewTicker(5 * time.Second)
 	defer ticker.Stop()
-	for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
-		err := disp.lsfcli.Bkill(jobid)
+	for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+		err := disp.lsfcli.Bkill(qent.ID)
 		if err != nil {
-			disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+			disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
 		}
 		<-ticker.C
 	}
@@ -262,10 +284,10 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
 }
 
 func (disp *dispatcher) bkill(ctr arvados.Container) {
-	if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+	if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
 		disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
-	} else if err := disp.lsfcli.Bkill(jobid); err != nil {
-		disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+	} else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+		disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
 	}
 }
 
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index bb3b1b9df..c044df09f 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -30,7 +30,8 @@ func Test(t *testing.T) {
 var _ = check.Suite(&suite{})
 
 type suite struct {
-	disp *dispatcher
+	disp     *dispatcher
+	crTooBig arvados.ContainerRequest
 }
 
 func (s *suite) TearDownTest(c *check.C) {
@@ -47,6 +48,22 @@ func (s *suite) SetUpTest(c *check.C) {
 	s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
 		return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
 	}
+	err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+		"container_request": map[string]interface{}{
+			"runtime_constraints": arvados.RuntimeConstraints{
+				RAM:   1000000000000,
+				VCPUs: 1,
+			},
+			"container_image":     arvadostest.DockerImage112PDH,
+			"command":             []string{"sleep", "1"},
+			"mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+			"output_path":         "/mnt/out",
+			"state":               arvados.ContainerRequestStateCommitted,
+			"priority":            1,
+			"container_count_max": 1,
+		},
+	})
+	c.Assert(err, check.IsNil)
 }
 
 type lsfstub struct {
@@ -83,7 +100,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
 					"-J", arvadostest.LockedContainerUUID,
 					"-n", "4",
 					"-D", "11701MB",
-					"-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+					"-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+					"-R", "select[mem>=11701MB]",
+					"-R", "select[tmp>=0MB]",
+					"-R", "select[ncpus>=4]"})
 				mtx.Lock()
 				fakejobq[nextjobid] = args[1]
 				nextjobid++
@@ -93,7 +113,23 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
 					"-J", arvadostest.QueuedContainerUUID,
 					"-n", "4",
 					"-D", "11701MB",
-					"-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+					"-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+					"-R", "select[mem>=11701MB]",
+					"-R", "select[tmp>=45777MB]",
+					"-R", "select[ncpus>=4]"})
+				mtx.Lock()
+				fakejobq[nextjobid] = args[1]
+				nextjobid++
+				mtx.Unlock()
+			case s.crTooBig.ContainerUUID:
+				c.Check(args, check.DeepEquals, []string{
+					"-J", s.crTooBig.ContainerUUID,
+					"-n", "1",
+					"-D", "954187MB",
+					"-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
+					"-R", "select[mem>=954187MB]",
+					"-R", "select[tmp>=256MB]",
+					"-R", "select[ncpus>=1]"})
 				mtx.Lock()
 				fakejobq[nextjobid] = args[1]
 				nextjobid++
@@ -107,11 +143,16 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
 			c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
 			var records []map[string]interface{}
 			for jobid, uuid := range fakejobq {
+				stat, reason := "RUN", ""
+				if uuid == s.crTooBig.ContainerUUID {
+					// The real bjobs output includes a trailing ';' here:
+					stat, reason = "PEND", "There are no suitable hosts for the job;"
+				}
 				records = append(records, map[string]interface{}{
 					"JOBID":       fmt.Sprintf("%d", jobid),
-					"STAT":        "RUN",
+					"STAT":        stat,
 					"JOB_NAME":    uuid,
-					"PEND_REASON": "",
+					"PEND_REASON": reason,
 				})
 			}
 			out, err := json.Marshal(map[string]interface{}{
@@ -151,6 +192,7 @@ func (s *suite) TestSubmit(c *check.C) {
 		sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
 	}.stubCommand(s, c)
 	s.disp.Start()
+
 	deadline := time.Now().Add(20 * time.Second)
 	for range time.NewTicker(time.Second).C {
 		if time.Now().After(deadline) {
@@ -158,23 +200,37 @@ func (s *suite) TestSubmit(c *check.C) {
 			break
 		}
 		// "queuedcontainer" should be running
-		if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+		if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
 			continue
 		}
 		// "lockedcontainer" should be cancelled because it
 		// has priority 0 (no matching container requests)
-		if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+		if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+			continue
+		}
+		// "crTooBig" should be cancelled because lsf stub
+		// reports there is no suitable instance type
+		if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
 			continue
 		}
 		var ctr arvados.Container
 		if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
 			c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
 			continue
-		}
-		if ctr.State != arvados.ContainerStateQueued {
+		} else if ctr.State != arvados.ContainerStateQueued {
 			c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
 			continue
 		}
+
+		if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+			c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+			continue
+		} else if ctr.State != arvados.ContainerStateCancelled {
+			c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+			continue
+		} else {
+			c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+		}
 		c.Log("reached desired state")
 		break
 	}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 971bdd421..3ed4d0c18 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -23,12 +23,12 @@ type lsfqueue struct {
 	latest    map[string]bjobsEntry
 }
 
-// JobID waits for the next queue update (so even a job that was only
+// Lookup waits for the next queue update (so even a job that was only
 // submitted a nanosecond ago will show up) and then returns the LSF
-// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (string, bool) {
+// queue information corresponding to the given container UUID.
+func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
 	ent, ok := q.getNext()[uuid]
-	return ent.ID, ok
+	return ent, ok
 }
 
 // All waits for the next queue update, then returns the names of all

commit fa74a8d2176d115f21554e1e929a35729173a3c3
Author: Tom Clegg <tom at curii.com>
Date:   Wed Nov 17 16:42:13 2021 -0500

    18298: Request pend_reason field when polling bjobs.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
index 641453e54..bb3b1b9df 100644
--- a/lib/lsf/dispatch_test.go
+++ b/lib/lsf/dispatch_test.go
@@ -6,6 +6,7 @@ package lsf
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"math/rand"
 	"os/exec"
@@ -103,13 +104,26 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
 			}
 			return exec.Command("echo", "submitted job")
 		case "bjobs":
-			c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
-			out := ""
+			c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
+			var records []map[string]interface{}
 			for jobid, uuid := range fakejobq {
-				out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+				records = append(records, map[string]interface{}{
+					"JOBID":       fmt.Sprintf("%d", jobid),
+					"STAT":        "RUN",
+					"JOB_NAME":    uuid,
+					"PEND_REASON": "",
+				})
 			}
-			c.Logf("bjobs out: %q", out)
-			return exec.Command("printf", out)
+			out, err := json.Marshal(map[string]interface{}{
+				"COMMAND": "bjobs",
+				"JOBS":    len(fakejobq),
+				"RECORDS": records,
+			})
+			if err != nil {
+				panic(err)
+			}
+			c.Logf("bjobs out: %s", out)
+			return exec.Command("printf", string(out))
 		case "bkill":
 			killid, _ := strconv.Atoi(args[0])
 			if uuid, ok := fakejobq[killid]; !ok {
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
index 9d712ee97..d17559568 100644
--- a/lib/lsf/lsfcli.go
+++ b/lib/lsf/lsfcli.go
@@ -6,6 +6,7 @@ package lsf
 
 import (
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"os"
 	"os/exec"
@@ -16,9 +17,10 @@ import (
 )
 
 type bjobsEntry struct {
-	id   int
-	name string
-	stat string
+	ID         string `json:"JOBID"`
+	Name       string `json:"JOB_NAME"`
+	Stat       string `json:"STAT"`
+	PendReason string `json:"PEND_REASON"`
 }
 
 type lsfcli struct {
@@ -53,29 +55,21 @@ func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error
 
 func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
 	cli.logger.Debugf("Bjobs()")
-	cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+	cmd := cli.command("bjobs", "-u", "all", "-o", "jobid stat job_name pend_reason", "-json")
 	buf, err := cmd.Output()
 	if err != nil {
 		return nil, errWithStderr(err)
 	}
-	var bjobs []bjobsEntry
-	for _, line := range strings.Split(string(buf), "\n") {
-		if line == "" {
-			continue
-		}
-		var ent bjobsEntry
-		if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
-			cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
-			continue
-		}
-		bjobs = append(bjobs, ent)
+	var resp struct {
+		Records []bjobsEntry `json:"RECORDS"`
 	}
-	return bjobs, nil
+	err = json.Unmarshal(buf, &resp)
+	return resp.Records, err
 }
 
-func (cli lsfcli) Bkill(id int) error {
-	cli.logger.Infof("Bkill(%d)", id)
-	cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+func (cli lsfcli) Bkill(id string) error {
+	cli.logger.Infof("Bkill(%s)", id)
+	cmd := cli.command("bkill", id)
 	buf, err := cmd.CombinedOutput()
 	if err == nil || strings.Index(string(buf), "already finished") >= 0 {
 		return nil
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
index 3c4fc4cb8..971bdd421 100644
--- a/lib/lsf/lsfqueue.go
+++ b/lib/lsf/lsfqueue.go
@@ -26,9 +26,9 @@ type lsfqueue struct {
 // JobID waits for the next queue update (so even a job that was only
 // submitted a nanosecond ago will show up) and then returns the LSF
 // job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (int, bool) {
+func (q *lsfqueue) JobID(uuid string) (string, bool) {
 	ent, ok := q.getNext()[uuid]
-	return ent.id, ok
+	return ent.ID, ok
 }
 
 // All waits for the next queue update, then returns the names of all
@@ -94,7 +94,7 @@ func (q *lsfqueue) init() {
 			}
 			next := make(map[string]bjobsEntry, len(ents))
 			for _, ent := range ents {
-				next[ent.name] = ent
+				next[ent.Name] = ent
 			}
 			// Replace q.latest and notify all the
 			// goroutines that the "next update" they

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list