[ARVADOS] updated: 2.1.0-1036-g0f784d657
Git user
git at public.arvados.org
Thu Jul 22 14:46:00 UTC 2021
Summary of changes:
.../install-dispatch.html.textile.liquid | 4 +-
lib/lsf/dispatch.go | 48 ++++++----------------
sdk/go/dispatch/dispatch.go | 35 ++++++++++++++--
.../crunch-dispatch-local/crunch-dispatch-local.go | 5 ++-
.../crunch-dispatch-local_test.go | 12 +++---
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 40 +++---------------
.../crunch-dispatch-slurm_test.go | 35 +++++++---------
7 files changed, 73 insertions(+), 106 deletions(-)
via 0f784d657527c998e7cd1d7aee8cbd8f0d75e04a (commit)
via 7bebcf045b39a4bdecb345c9592c0515795c75aa (commit)
from 3b63632698de9868a501191e8989f14c23e4e743 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0f784d657527c998e7cd1d7aee8cbd8f0d75e04a
Author: Tom Clegg <tom at curii.com>
Date: Thu Jul 22 10:36:14 2021 -0400
17756: Move "no suitable instance type" reporting to dispatch lib.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index 760002aa0..7461597c4 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -5,7 +5,6 @@
package lsf
import (
- "bytes"
"context"
"errors"
"fmt"
@@ -162,7 +161,7 @@ func (disp *dispatcher) init() {
}
}
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(disp.Context)
defer cancel()
@@ -173,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
- if err := disp.submit(ctr, cmd); err != nil {
- var text string
- switch err := err.(type) {
- case dispatchcloud.ConstraintsNotSatisfiableError:
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
- default:
- text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
- }
- disp.logger.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
- disp.arvDispatcher.Unlock(ctr.UUID)
- return
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
@@ -237,7 +207,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
case dispatch.Locked:
disp.arvDispatcher.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
// status channel is closed, which is
@@ -273,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
}
<-ticker.C
}
+ return nil
}
func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index df43c2b10..00c75154f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -7,11 +7,13 @@
package dispatch
import (
+ "bytes"
"context"
"fmt"
"sync"
"time"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"github.com/sirupsen/logrus"
@@ -66,7 +68,7 @@ type Dispatcher struct {
// running, and return.
//
// The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
// Run watches the API server's queue for containers that are either
// ready to run and available to lock, or are already locked by this
@@ -170,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
}
tracker.updates <- c
go func() {
- d.RunContainer(d, c, tracker.updates)
- // RunContainer blocks for the lifetime of the container. When
- // it returns, the tracker should delete itself.
+ err := d.RunContainer(d, c, tracker.updates)
+ if err != nil {
+ text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+ if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+ }
+ }
+ text = logBuf.String()
+ d.UpdateState(c.UUID, Cancelled)
+ }
+ d.Logger.Printf("%s", text)
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": c.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ d.Arv.Create("logs", lr, nil)
+ d.Unlock(c.UUID)
+ }
+
d.mtx.Lock()
delete(d.trackers, c.UUID)
d.mtx.Unlock()
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 2922817b5..148633238 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -140,7 +140,7 @@ type LocalRun struct {
// crunch-run terminates, mark the container as Cancelled.
func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
container arvados.Container,
- status <-chan arvados.Container) {
+ status <-chan arvados.Container) error {
uuid := container.UUID
@@ -150,7 +150,7 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
case lr.concurrencyLimit <- true:
break
case <-lr.ctx.Done():
- return
+ return lr.ctx.Err()
}
defer func() { <-lr.concurrencyLimit }()
@@ -241,4 +241,5 @@ Finish:
}
dispatcher.Logger.Printf("finalized container %v", uuid)
+ return nil
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 5f51134df..6ec31b173 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -81,9 +81,9 @@ func (s *TestSuite) TestIntegration(c *C) {
return cmd.Start()
}
- dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
- cancel()
+ dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+ defer cancel()
+ return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
}
err = dispatcher.Run(ctx)
@@ -184,9 +184,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
return cmd.Start()
}
- dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
- cancel()
+ dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+ defer cancel()
+ return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
}
re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index a5899ce8a..5129495a0 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -7,7 +7,6 @@ package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bytes"
"context"
"flag"
"fmt"
@@ -270,7 +269,7 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -278,38 +277,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
log.Printf("Submitting container %s to slurm", ctr.UUID)
cmd := []string{disp.cluster.Containers.CrunchRunCommand}
cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
- if err := disp.submit(ctr, cmd); err != nil {
- var text string
- switch err := err.(type) {
- case dispatchcloud.ConstraintsNotSatisfiableError:
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.UpdateState(ctr.UUID, dispatch.Cancelled)
- default:
- text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
- }
- log.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.Arv.Create("logs", lr, nil)
-
- disp.Unlock(ctr.UUID)
- return
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
@@ -338,7 +308,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
case dispatch.Locked:
disp.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
log.Printf("container %s is done: cancel slurm job", ctr.UUID)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 480434de6..e7a89db23 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -104,7 +104,7 @@ func (sf *slurmFake) Cancel(name string) error {
func (s *IntegrationSuite) integrationTest(c *C,
expectBatch [][]string,
- runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+ runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
@@ -123,18 +123,21 @@ func (s *IntegrationSuite) integrationTest(c *C,
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
+ doneDispatch := make(chan error)
s.disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Second,
- RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
go func() {
runContainer(disp, ctr)
s.slurm.queue = ""
doneRun <- struct{}{}
}()
- s.disp.runContainer(disp, ctr, status)
+ err := s.disp.runContainer(disp, ctr, status)
cancel()
+ doneDispatch <- err
+ return nil
},
}
@@ -148,6 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
err = s.disp.Dispatcher.Run(ctx)
<-doneRun
c.Assert(err, Equals, context.Canceled)
+ errDispatch := <-doneDispatch
s.disp.sqCheck.Stop()
@@ -162,12 +166,12 @@ func (s *IntegrationSuite) integrationTest(c *C,
var container arvados.Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- return container
+ return container, errDispatch
}
func (s *IntegrationSuite) TestNormal(c *C) {
s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
- container := s.integrationTest(c,
+ container, _ := s.integrationTest(c,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -181,7 +185,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
readyToCancel := make(chan bool)
s.slurm.onCancel = func() { <-readyToCancel }
- container := s.integrationTest(c,
+ container, _ := s.integrationTest(c,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -199,7 +203,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
}
func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
- container := s.integrationTest(c,
+ container, _ := s.integrationTest(c,
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--nice=%d", 10000),
@@ -218,24 +222,14 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
func (s *IntegrationSuite) TestSbatchFail(c *C) {
s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
- container := s.integrationTest(c,
+ container, err := s.integrationTest(c,
[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
-
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, IsNil)
-
- var ll arvados.LogList
- err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
- {"object_uuid", "=", container.UUID},
- {"event_type", "=", "dispatch"},
- }}, &ll)
- c.Assert(err, IsNil)
- c.Assert(len(ll.Items), Equals, 1)
+ c.Check(err, ErrorMatches, `something terrible happened`)
}
type StubbedSuite struct {
@@ -280,7 +274,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
dispatcher := dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Second,
- RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+ RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
go func() {
time.Sleep(time.Second)
disp.UpdateState(ctr.UUID, dispatch.Running)
@@ -288,6 +282,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
}()
s.disp.runContainer(disp, ctr, status)
cancel()
+ return nil
},
}
commit 7bebcf045b39a4bdecb345c9592c0515795c75aa
Author: Tom Clegg <tom at curii.com>
Date: Fri Jul 16 16:42:46 2021 -0400
17756: Review feedback.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
index 0dd6a8b26..d8b6aefab 100644
--- a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
+++ b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
@@ -62,9 +62,7 @@ When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command b
<notextile>
<pre> Containers:
LSF:
- <code class="userinput">BsubArgumentsList:
- - <b>"-C"</b>
- - <b>"0"</b></code>
+ <code class="userinput">BsubArgumentsList: <b>["-C", "0"]</b></code>
</pre>
</notextile>
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index b7032dc73..760002aa0 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -240,6 +240,11 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
return
case updated, ok := <-status:
if !ok {
+ // status channel is closed, which is
+ // how arvDispatcher tells us to stop
+ // touching the container record, kill
+ // off any remaining LSF processes,
+ // etc.
done = true
break
}
@@ -247,7 +252,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
}
ctr = updated
- if ctr.Priority == 0 {
+ if ctr.Priority < 1 {
disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
disp.bkill(ctr)
} else {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list