[ARVADOS] created: 683f5374b0fc516579c1d6dc3379fc900d642322

Git user git at public.curoverse.com
Thu Jan 26 00:36:17 EST 2017


        at  683f5374b0fc516579c1d6dc3379fc900d642322 (commit)


commit 683f5374b0fc516579c1d6dc3379fc900d642322
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 16:37:26 2017 -0500

    10703: Un-pyramid run().

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 19ab5aa..8e61462 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -250,29 +250,30 @@ func run(dispatcher *dispatch.Dispatcher,
 	go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
 
 	for container = range status {
-		if container.State == dispatch.Locked || container.State == dispatch.Running {
-			if container.Priority == 0 {
-				log.Printf("Canceling container %s", container.UUID)
-
-				// Mutex between squeue sync and running sbatch or scancel.
-				squeueUpdater.SlurmLock.Lock()
-				cmd := scancelCmd(container)
-				msg, err := cmd.CombinedOutput()
-				squeueUpdater.SlurmLock.Unlock()
-
-				if err != nil {
-					log.Printf("Error stopping container %s with %v %v: %v %v",
-						container.UUID, cmd.Path, cmd.Args, err, string(msg))
-					if squeueUpdater.CheckSqueue(container.UUID) {
-						log.Printf("Container %s is still in squeue after scancel.",
-							container.UUID)
-						continue
-					}
-				}
-
-				err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+		if !(container.State == dispatch.Locked || container.State == dispatch.Running) {
+			continue
+		}
+		if container.Priority != 0 {
+			continue
+		}
+		log.Printf("Canceling container %s", container.UUID)
+
+		// Mutex between squeue sync and running sbatch or scancel.
+		squeueUpdater.SlurmLock.Lock()
+		cmd := scancelCmd(container)
+		msg, err := cmd.CombinedOutput()
+		squeueUpdater.SlurmLock.Unlock()
+
+		if err != nil {
+			log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+			if squeueUpdater.CheckSqueue(container.UUID) {
+				log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+				continue
 			}
 		}
+
+		// Ignore errors; if necessary, we'll try again next time
+		dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
 	}
 	monitorDone = true
 }

commit 318dd887d108e0664ef22a8d38a34fa18e1b2657
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 16:04:30 2017 -0500

    10701: Remove unneeded complexity in squeue invocation.

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index ae70800..19ab5aa 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,6 +3,7 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
+	"bytes"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -10,8 +11,6 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"github.com/coreos/go-systemd/daemon"
-	"io"
-	"io/ioutil"
 	"log"
 	"math"
 	"os"
@@ -154,70 +153,31 @@ func submit(dispatcher *dispatch.Dispatcher,
 		}
 	}()
 
-	// Create the command and attach to stdin/stdout
 	cmd := sbatchCmd(container)
-	stdinWriter, stdinerr := cmd.StdinPipe()
-	if stdinerr != nil {
-		submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
-		return
-	}
 
-	stdoutReader, stdoutErr := cmd.StdoutPipe()
-	if stdoutErr != nil {
-		submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
-		return
-	}
+	// Send a tiny script on stdin to execute the crunch-run
+	// command (slurm requires this to be a #! script)
+	cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
 
-	stderrReader, stderrErr := cmd.StderrPipe()
-	if stderrErr != nil {
-		submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
-		return
-	}
+	var stdout, stderr bytes.Buffer
+	cmd.Stdout = &stdout
+	cmd.Stderr = &stderr
 
 	// Mutex between squeue sync and running sbatch or scancel.
 	squeueUpdater.SlurmLock.Lock()
 	defer squeueUpdater.SlurmLock.Unlock()
 
-	log.Printf("sbatch starting: %+q", cmd.Args)
-	err := cmd.Start()
-	if err != nil {
-		submitErr = fmt.Errorf("Error starting sbatch: %v", err)
-		return
+	log.Printf("exec sbatch %+q", cmd.Args)
+	err := cmd.Run()
+	switch err.(type) {
+	case nil:
+		log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
+		return nil
+	case *exec.ExitError:
+		return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr)
+	default:
+		return fmt.Errorf("exec failed: %v", err)
 	}
-
-	stdoutChan := make(chan []byte)
-	go func() {
-		b, _ := ioutil.ReadAll(stdoutReader)
-		stdoutReader.Close()
-		stdoutChan <- b
-		close(stdoutChan)
-	}()
-
-	stderrChan := make(chan []byte)
-	go func() {
-		b, _ := ioutil.ReadAll(stderrReader)
-		stderrReader.Close()
-		stderrChan <- b
-		close(stderrChan)
-	}()
-
-	// Send a tiny script on stdin to execute the crunch-run command
-	// slurm actually enforces that this must be a #! script
-	io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
-	stdinWriter.Close()
-
-	stdoutMsg := <-stdoutChan
-	stderrmsg := <-stderrChan
-
-	err = cmd.Wait()
-
-	if err != nil {
-		submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
-		return
-	}
-
-	log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
-	return
 }
 
 // If the container is marked as Locked, check if it is already in the slurm

commit e356309e05714cd65d88456c563cea606f820394
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Jan 25 10:59:48 2017 -0500

    10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop" API.

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 4987c01..4129b24 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -7,10 +7,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"log"
-	"os"
-	"os/signal"
 	"sync"
-	"syscall"
 	"time"
 )
 
@@ -44,13 +41,11 @@ type Dispatcher struct {
 	// Amount of time to wait between polling for updates.
 	PollInterval time.Duration
 
-	// Channel used to signal that RunDispatcher loop should exit.
-	DoneProcessing chan struct{}
+	mineMutex sync.Mutex
+	mineMap   map[string]chan arvados.Container
+	Auth      arvados.APIClientAuthorization
 
-	mineMutex  sync.Mutex
-	mineMap    map[string]chan arvados.Container
-	Auth       arvados.APIClientAuthorization
-	containers chan arvados.Container
+	stop chan struct{}
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -110,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 	}
 	for _, container := range containers.Items {
 		touched[container.UUID] = true
-		dispatcher.containers <- container
+		dispatcher.handleUpdate(container)
 	}
 }
 
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
 	ticker := time.NewTicker(dispatcher.PollInterval)
+	defer ticker.Stop()
 
 	paramsQ := arvadosclient.Dict{
 		"filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -126,26 +122,24 @@ func (dispatcher *Dispatcher) pollContainers() {
 		"limit":   "1000"}
 
 	for {
+		touched := make(map[string]bool)
+		dispatcher.getContainers(paramsQ, touched)
+		dispatcher.getContainers(paramsP, touched)
+		dispatcher.mineMutex.Lock()
+		var monitored []string
+		for k := range dispatcher.mineMap {
+			if _, ok := touched[k]; !ok {
+				monitored = append(monitored, k)
+			}
+		}
+		dispatcher.mineMutex.Unlock()
+		if monitored != nil {
+			dispatcher.getContainers(arvadosclient.Dict{
+				"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+		}
 		select {
 		case <-ticker.C:
-			touched := make(map[string]bool)
-			dispatcher.getContainers(paramsQ, touched)
-			dispatcher.getContainers(paramsP, touched)
-			dispatcher.mineMutex.Lock()
-			var monitored []string
-			for k := range dispatcher.mineMap {
-				if _, ok := touched[k]; !ok {
-					monitored = append(monitored, k)
-				}
-			}
-			dispatcher.mineMutex.Unlock()
-			if monitored != nil {
-				dispatcher.getContainers(arvadosclient.Dict{
-					"filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-			}
-		case <-dispatcher.DoneProcessing:
-			close(dispatcher.containers)
-			ticker.Stop()
+		case <-stop:
 			return
 		}
 	}
@@ -221,10 +215,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error {
 	return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+	if dispatcher.stop == nil {
+		// already stopped
+		return
+	}
+	close(dispatcher.stop)
+	dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
 	err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
 	if err != nil {
 		log.Printf("Error getting my token UUID: %v", err)
@@ -232,26 +234,7 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
 	}
 
 	dispatcher.mineMap = make(map[string]chan arvados.Container)
-	dispatcher.containers = make(chan arvados.Container)
-
-	// Graceful shutdown on signal
-	sigChan := make(chan os.Signal)
-	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-	go func(sig <-chan os.Signal) {
-		for sig := range sig {
-			log.Printf("Caught signal: %v", sig)
-			dispatcher.DoneProcessing <- struct{}{}
-		}
-	}(sigChan)
-
-	defer close(sigChan)
-	defer signal.Stop(sigChan)
-
-	go dispatcher.pollContainers()
-	for container := range dispatcher.containers {
-		dispatcher.handleUpdate(container)
-	}
-
+	dispatcher.stop = make(chan struct{})
+	dispatcher.pollContainers(dispatcher.stop)
 	return nil
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index 0ca7651..cfb0c7d 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -10,7 +10,9 @@ import (
 	"log"
 	"os"
 	"os/exec"
+	"os/signal"
 	"sync"
+	"syscall"
 	"time"
 )
 
@@ -54,16 +56,24 @@ func doMain() error {
 	arv.Retries = 25
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:            arv,
-		RunContainer:   run,
-		PollInterval:   time.Duration(*pollInterval) * time.Second,
-		DoneProcessing: make(chan struct{})}
+		Arv:          arv,
+		RunContainer: run,
+		PollInterval: time.Duration(*pollInterval) * time.Second,
+	}
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	if err != nil {
 		return err
 	}
 
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+	sig := <-c
+	log.Printf("Received %s, shutting down", sig)
+	signal.Stop(c)
+
+	dispatcher.Stop()
+
 	runningCmdsMutex.Lock()
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index bcb406e..0454730 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -62,7 +62,6 @@ func (s *TestSuite) TestIntegration(c *C) {
 	echo := "echo"
 	crunchRunCommand = &echo
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Second,
@@ -70,9 +69,9 @@ func (s *TestSuite) TestIntegration(c *C) {
 			container arvados.Container,
 			status chan arvados.Container) {
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
@@ -80,7 +79,7 @@ func (s *TestSuite) TestIntegration(c *C) {
 		return cmd.Start()
 	}
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
@@ -166,7 +165,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	*crunchRunCommand = crunchCmd
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -174,9 +172,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 			container arvados.Container,
 			status chan arvados.Container) {
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
 		dispatcher.UpdateState(container.UUID, "Running")
@@ -188,10 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.DoneProcessing <- struct{}{}
+		dispatcher.Stop()
 	}()
 
-	err := dispatcher.RunDispatcher()
+	err := dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	// Wait for all running crunch jobs to complete / terminate
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index e768b50..ae70800 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -100,21 +100,16 @@ func doMain() error {
 	defer squeueUpdater.Done()
 
 	dispatcher := dispatch.Dispatcher{
-		Arv:            arv,
-		RunContainer:   run,
-		PollInterval:   time.Duration(theConfig.PollPeriod),
-		DoneProcessing: make(chan struct{})}
+		Arv:          arv,
+		RunContainer: run,
+		PollInterval: time.Duration(theConfig.PollPeriod),
+	}
 
 	if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
 		log.Printf("Error notifying init daemon: %v", err)
 	}
 
-	err = dispatcher.RunDispatcher()
-	if err != nil {
-		return err
-	}
-
-	return nil
+	return dispatcher.Run()
 }
 
 // sbatchCmd
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 4046103..86844dc 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -146,7 +146,6 @@ func (s *TestSuite) integrationTest(c *C,
 
 	theConfig.CrunchRunCommand = []string{"echo"}
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -155,13 +154,13 @@ func (s *TestSuite) integrationTest(c *C,
 			status chan arvados.Container) {
 			go runContainer(dispatcher, container)
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
 
-	err = dispatcher.RunDispatcher()
+	err = dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	squeueUpdater.Done()
@@ -208,7 +207,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	theConfig.CrunchRunCommand = []string{crunchCmd}
 
-	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
 		Arv:          arv,
 		PollInterval: time.Duration(1) * time.Second,
@@ -221,18 +219,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 				dispatcher.UpdateState(container.UUID, dispatch.Complete)
 			}()
 			run(dispatcher, container, status)
-			doneProcessing <- struct{}{}
+			dispatcher.Stop()
 		},
-		DoneProcessing: doneProcessing}
+	}
 
 	go func() {
 		for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
 			time.Sleep(100 * time.Millisecond)
 		}
-		dispatcher.DoneProcessing <- struct{}{}
+		dispatcher.Stop()
 	}()
 
-	err := dispatcher.RunDispatcher()
+	err := dispatcher.Run()
 	c.Assert(err, IsNil)
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list