[ARVADOS] updated: 2.1.0-1036-g0f784d657

Git user git at public.arvados.org
Thu Jul 22 14:46:00 UTC 2021


Summary of changes:
 .../install-dispatch.html.textile.liquid           |  4 +-
 lib/lsf/dispatch.go                                | 48 ++++++----------------
 sdk/go/dispatch/dispatch.go                        | 35 ++++++++++++++--
 .../crunch-dispatch-local/crunch-dispatch-local.go |  5 ++-
 .../crunch-dispatch-local_test.go                  | 12 +++---
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 40 +++---------------
 .../crunch-dispatch-slurm_test.go                  | 35 +++++++---------
 7 files changed, 73 insertions(+), 106 deletions(-)

       via  0f784d657527c998e7cd1d7aee8cbd8f0d75e04a (commit)
       via  7bebcf045b39a4bdecb345c9592c0515795c75aa (commit)
      from  3b63632698de9868a501191e8989f14c23e4e743 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0f784d657527c998e7cd1d7aee8cbd8f0d75e04a
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jul 22 10:36:14 2021 -0400

    17756: Move "no suitable instance type" reporting to dispatch lib.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index 760002aa0..7461597c4 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -5,7 +5,6 @@
 package lsf
 
 import (
-	"bytes"
 	"context"
 	"errors"
 	"fmt"
@@ -162,7 +161,7 @@ func (disp *dispatcher) init() {
 	}
 }
 
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
 	ctx, cancel := context.WithCancel(disp.Context)
 	defer cancel()
 
@@ -173,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 		cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
 		cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
 		cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
-		if err := disp.submit(ctr, cmd); err != nil {
-			var text string
-			switch err := err.(type) {
-			case dispatchcloud.ConstraintsNotSatisfiableError:
-				var logBuf bytes.Buffer
-				fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-				if len(err.AvailableTypes) == 0 {
-					fmt.Fprint(&logBuf, "No instance types are configured.\n")
-				} else {
-					fmt.Fprint(&logBuf, "Available instance types:\n")
-					for _, t := range err.AvailableTypes {
-						fmt.Fprintf(&logBuf,
-							"Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-							t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-						)
-					}
-				}
-				text = logBuf.String()
-				disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
-			default:
-				text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
-			}
-			disp.logger.Print(text)
-
-			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-				"object_uuid": ctr.UUID,
-				"event_type":  "dispatch",
-				"properties":  map[string]string{"text": text}}}
-			disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
-			disp.arvDispatcher.Unlock(ctr.UUID)
-			return
+		err := disp.submit(ctr, cmd)
+		if err != nil {
+			return err
 		}
 	}
 
@@ -237,7 +207,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 			case dispatch.Locked:
 				disp.arvDispatcher.Unlock(ctr.UUID)
 			}
-			return
+			return nil
 		case updated, ok := <-status:
 			if !ok {
 				// status channel is closed, which is
@@ -273,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 		}
 		<-ticker.C
 	}
+	return nil
 }
 
 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index df43c2b10..00c75154f 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -7,11 +7,13 @@
 package dispatch
 
 import (
+	"bytes"
 	"context"
 	"fmt"
 	"sync"
 	"time"
 
+	"git.arvados.org/arvados.git/lib/dispatchcloud"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"github.com/sirupsen/logrus"
@@ -66,7 +68,7 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
@@ -170,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 	}
 	tracker.updates <- c
 	go func() {
-		d.RunContainer(d, c, tracker.updates)
-		// RunContainer blocks for the lifetime of the container.  When
-		// it returns, the tracker should delete itself.
+		err := d.RunContainer(d, c, tracker.updates)
+		if err != nil {
+			text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+			if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+				var logBuf bytes.Buffer
+				fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+				if len(err.AvailableTypes) == 0 {
+					fmt.Fprint(&logBuf, "No instance types are configured.\n")
+				} else {
+					fmt.Fprint(&logBuf, "Available instance types:\n")
+					for _, t := range err.AvailableTypes {
+						fmt.Fprintf(&logBuf,
+							"Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+							t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+					}
+				}
+				text = logBuf.String()
+				d.UpdateState(c.UUID, Cancelled)
+			}
+			d.Logger.Printf("%s", text)
+			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+				"object_uuid": c.UUID,
+				"event_type":  "dispatch",
+				"properties":  map[string]string{"text": text}}}
+			d.Arv.Create("logs", lr, nil)
+			d.Unlock(c.UUID)
+		}
+
 		d.mtx.Lock()
 		delete(d.trackers, c.UUID)
 		d.mtx.Unlock()
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 2922817b5..148633238 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -140,7 +140,7 @@ type LocalRun struct {
 // crunch-run terminates, mark the container as Cancelled.
 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 	container arvados.Container,
-	status <-chan arvados.Container) {
+	status <-chan arvados.Container) error {
 
 	uuid := container.UUID
 
@@ -150,7 +150,7 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 		case lr.concurrencyLimit <- true:
 			break
 		case <-lr.ctx.Done():
-			return
+			return lr.ctx.Err()
 		}
 
 		defer func() { <-lr.concurrencyLimit }()
@@ -241,4 +241,5 @@ Finish:
 	}
 
 	dispatcher.Logger.Printf("finalized container %v", uuid)
+	return nil
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 5f51134df..6ec31b173 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -81,9 +81,9 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return cmd.Start()
 	}
 
-	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-		(&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
-		cancel()
+	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+		defer cancel()
+		return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
 	}
 
 	err = dispatcher.Run(ctx)
@@ -184,9 +184,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		return cmd.Start()
 	}
 
-	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-		(&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
-		cancel()
+	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+		defer cancel()
+		return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
 	}
 
 	re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index a5899ce8a..5129495a0 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -7,7 +7,6 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-	"bytes"
 	"context"
 	"flag"
 	"fmt"
@@ -270,7 +269,7 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
 // already in the queue).  Cancel the slurm job if the container's
 // priority changes to zero or its state indicates it's no longer
 // running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
@@ -278,38 +277,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 		log.Printf("Submitting container %s to slurm", ctr.UUID)
 		cmd := []string{disp.cluster.Containers.CrunchRunCommand}
 		cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
-		if err := disp.submit(ctr, cmd); err != nil {
-			var text string
-			switch err := err.(type) {
-			case dispatchcloud.ConstraintsNotSatisfiableError:
-				var logBuf bytes.Buffer
-				fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-				if len(err.AvailableTypes) == 0 {
-					fmt.Fprint(&logBuf, "No instance types are configured.\n")
-				} else {
-					fmt.Fprint(&logBuf, "Available instance types:\n")
-					for _, t := range err.AvailableTypes {
-						fmt.Fprintf(&logBuf,
-							"Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-							t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-						)
-					}
-				}
-				text = logBuf.String()
-				disp.UpdateState(ctr.UUID, dispatch.Cancelled)
-			default:
-				text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
-			}
-			log.Print(text)
-
-			lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-				"object_uuid": ctr.UUID,
-				"event_type":  "dispatch",
-				"properties":  map[string]string{"text": text}}}
-			disp.Arv.Create("logs", lr, nil)
-
-			disp.Unlock(ctr.UUID)
-			return
+		err := disp.submit(ctr, cmd)
+		if err != nil {
+			return err
 		}
 	}
 
@@ -338,7 +308,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 			case dispatch.Locked:
 				disp.Unlock(ctr.UUID)
 			}
-			return
+			return nil
 		case updated, ok := <-status:
 			if !ok {
 				log.Printf("container %s is done: cancel slurm job", ctr.UUID)
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 480434de6..e7a89db23 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -104,7 +104,7 @@ func (sf *slurmFake) Cancel(name string) error {
 
 func (s *IntegrationSuite) integrationTest(c *C,
 	expectBatch [][]string,
-	runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+	runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
 	arvadostest.ResetEnv()
 
 	arv, err := arvadosclient.MakeArvadosClient()
@@ -123,18 +123,21 @@ func (s *IntegrationSuite) integrationTest(c *C,
 
 	ctx, cancel := context.WithCancel(context.Background())
 	doneRun := make(chan struct{})
+	doneDispatch := make(chan error)
 
 	s.disp.Dispatcher = &dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Second,
-		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
 			go func() {
 				runContainer(disp, ctr)
 				s.slurm.queue = ""
 				doneRun <- struct{}{}
 			}()
-			s.disp.runContainer(disp, ctr, status)
+			err := s.disp.runContainer(disp, ctr, status)
 			cancel()
+			doneDispatch <- err
+			return nil
 		},
 	}
 
@@ -148,6 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
 	err = s.disp.Dispatcher.Run(ctx)
 	<-doneRun
 	c.Assert(err, Equals, context.Canceled)
+	errDispatch := <-doneDispatch
 
 	s.disp.sqCheck.Stop()
 
@@ -162,12 +166,12 @@ func (s *IntegrationSuite) integrationTest(c *C,
 	var container arvados.Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	return container
+	return container, errDispatch
 }
 
 func (s *IntegrationSuite) TestNormal(c *C) {
 	s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
-	container := s.integrationTest(c,
+	container, _ := s.integrationTest(c,
 		nil,
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -181,7 +185,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
 	s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
 	readyToCancel := make(chan bool)
 	s.slurm.onCancel = func() { <-readyToCancel }
-	container := s.integrationTest(c,
+	container, _ := s.integrationTest(c,
 		nil,
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -199,7 +203,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
 }
 
 func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
-	container := s.integrationTest(c,
+	container, _ := s.integrationTest(c,
 		[][]string{{
 			fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
 			fmt.Sprintf("--nice=%d", 10000),
@@ -218,24 +222,14 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
 	s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
-	container := s.integrationTest(c,
+	container, err := s.integrationTest(c,
 		[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
 		func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
 			dispatcher.UpdateState(container.UUID, dispatch.Running)
 			dispatcher.UpdateState(container.UUID, dispatch.Complete)
 		})
 	c.Check(container.State, Equals, arvados.ContainerStateComplete)
-
-	arv, err := arvadosclient.MakeArvadosClient()
-	c.Assert(err, IsNil)
-
-	var ll arvados.LogList
-	err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
-		{"object_uuid", "=", container.UUID},
-		{"event_type", "=", "dispatch"},
-	}}, &ll)
-	c.Assert(err, IsNil)
-	c.Assert(len(ll.Items), Equals, 1)
+	c.Check(err, ErrorMatches, `something terrible happened`)
 }
 
 type StubbedSuite struct {
@@ -280,7 +274,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Second,
-		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+		RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
 			go func() {
 				time.Sleep(time.Second)
 				disp.UpdateState(ctr.UUID, dispatch.Running)
@@ -288,6 +282,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
 			}()
 			s.disp.runContainer(disp, ctr, status)
 			cancel()
+			return nil
 		},
 	}
 

commit 7bebcf045b39a4bdecb345c9592c0515795c75aa
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jul 16 16:42:46 2021 -0400

    17756: Review feedback.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
index 0dd6a8b26..d8b6aefab 100644
--- a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
+++ b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
@@ -62,9 +62,7 @@ When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command b
 <notextile>
 <pre>    Containers:
       LSF:
-        <code class="userinput">BsubArgumentsList:
-          - <b>"-C"</b>
-          - <b>"0"</b></code>
+        <code class="userinput">BsubArgumentsList: <b>["-C", "0"]</b></code>
 </pre>
 </notextile>
 
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
index b7032dc73..760002aa0 100644
--- a/lib/lsf/dispatch.go
+++ b/lib/lsf/dispatch.go
@@ -240,6 +240,11 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 			return
 		case updated, ok := <-status:
 			if !ok {
+				// status channel is closed, which is
+				// how arvDispatcher tells us to stop
+				// touching the container record, kill
+				// off any remaining LSF processes,
+				// etc.
 				done = true
 				break
 			}
@@ -247,7 +252,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 				disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
 			}
 			ctr = updated
-			if ctr.Priority == 0 {
+			if ctr.Priority < 1 {
 				disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
 				disp.bkill(ctr)
 			} else {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list