[ARVADOS] created: 1.3.0-613-g451183bfa

Git user git at public.curoverse.com
Fri Mar 29 20:27:40 UTC 2019


        at  451183bfac95afc747a662fe98a3d52899dcdc53 (commit)


commit 451183bfac95afc747a662fe98a3d52899dcdc53
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Mar 29 16:27:04 2019 -0400

    15050: After waking up, check if the container was cancelled
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 62935447d..ae09c52f2 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -155,6 +155,17 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 
 		defer func() { <-lr.concurrencyLimit }()
 
+		select {
+		case c := <-status:
+			// Check for state updates after possibly
+			// waiting to be ready-to-run
+			if c.Priority == 0 {
+				goto Finish
+			}
+		default:
+			break
+		}
+
 		waitGroup.Add(1)
 		defer waitGroup.Done()
 
@@ -212,6 +223,8 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 		}
 	}
 
+Finish:
+
 	// If the container is not finalized, then change it to "Cancelled".
 	err := dispatcher.Arv.Get("containers", uuid, nil, &container)
 	if err != nil {

commit bcf51c36cb5044b6abc32cacf4b66391245cb919
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Mar 29 14:28:01 2019 -0400

    15050: Need to make sure to raise the semaphore when we're done
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 36d149272..62935447d 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -153,7 +153,10 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 			return
 		}
 
+		defer func() { <-lr.concurrencyLimit }()
+
 		waitGroup.Add(1)
+		defer waitGroup.Done()
 
 		cmd := exec.Command(*crunchRunCommand, uuid)
 		cmd.Stdin = nil
@@ -207,7 +210,6 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 			delete(runningCmds, uuid)
 			runningCmdsMutex.Unlock()
 		}
-		waitGroup.Done()
 	}
 
 	// If the container is not finalized, then change it to "Cancelled".

commit 1f2bdbd78e4dad9d831297c6ac280dc085733b5e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Mar 29 14:06:22 2019 -0400

    15050: Limit number of containers that crunch-dispatch-local runs at a time
    
    So I can do some testing with submitting a huge number of containers
    and not melt my machine.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index dcd54e896..36d149272 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -85,14 +85,15 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
+	ctx, cancel := context.WithCancel(context.Background())
+
 	dispatcher := dispatch.Dispatcher{
 		Logger:       logger,
 		Arv:          arv,
-		RunContainer: run,
+		RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
 		PollPeriod:   time.Duration(*pollInterval) * time.Second,
 	}
 
-	ctx, cancel := context.WithCancel(context.Background())
 	err = dispatcher.Run(ctx)
 	if err != nil {
 		return err
@@ -123,7 +124,11 @@ func startFunc(container arvados.Container, cmd *exec.Cmd) error {
 	return cmd.Start()
 }
 
-var startCmd = startFunc
+type LocalRun struct {
+	startCmd         func(container arvados.Container, cmd *exec.Cmd) error
+	concurrencyLimit chan bool
+	ctx              context.Context
+}
 
 // Run a container.
 //
@@ -133,13 +138,21 @@ var startCmd = startFunc
 //
 // If the container is in any other state, or is not Complete/Cancelled after
 // crunch-run terminates, mark the container as Cancelled.
-func run(dispatcher *dispatch.Dispatcher,
+func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
 	container arvados.Container,
 	status <-chan arvados.Container) {
 
 	uuid := container.UUID
 
 	if container.State == dispatch.Locked {
+
+		select {
+		case lr.concurrencyLimit <- true:
+			break
+		case <-lr.ctx.Done():
+			return
+		}
+
 		waitGroup.Add(1)
 
 		cmd := exec.Command(*crunchRunCommand, uuid)
@@ -153,7 +166,7 @@ func run(dispatcher *dispatch.Dispatcher,
 		// succeed in starting crunch-run.
 
 		runningCmdsMutex.Lock()
-		if err := startCmd(container, cmd); err != nil {
+		if err := lr.startCmd(container, cmd); err != nil {
 			runningCmdsMutex.Unlock()
 			dispatcher.Logger.Warnf("error starting %q for %s: %s", *crunchRunCommand, uuid, err)
 			dispatcher.UpdateState(uuid, dispatch.Cancelled)
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 6bae1f409..41357403f 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -73,18 +73,19 @@ func (s *TestSuite) TestIntegration(c *C) {
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Second,
-		RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-			run(d, c, s)
-			cancel()
-		},
 	}
 
-	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+	startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
 		dispatcher.UpdateState(container.UUID, "Complete")
 		return cmd.Start()
 	}
 
+	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+		(&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+		cancel()
+	}
+
 	err = dispatcher.Run(ctx)
 	c.Assert(err, Equals, context.Canceled)
 
@@ -175,18 +176,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	dispatcher := dispatch.Dispatcher{
 		Arv:        arv,
 		PollPeriod: time.Second / 20,
-		RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-			run(d, c, s)
-			cancel()
-		},
 	}
 
-	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
+	startCmd := func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
 		dispatcher.UpdateState(container.UUID, "Complete")
 		return cmd.Start()
 	}
 
+	dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+		(&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+		cancel()
+	}
+
 	re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
 	go func() {
 		for i := 0; i < 80 && !re.MatchString(buf.String()); i++ {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list