[ARVADOS] created: c78765cfbf09eea3d475f115750aecf143d313fb

Git user git at public.curoverse.com
Tue Jul 26 12:29:23 EDT 2016


        at  c78765cfbf09eea3d475f115750aecf143d313fb (commit)


commit c78765cfbf09eea3d475f115750aecf143d313fb
Author: radhika <radhika at curoverse.com>
Date:   Tue Jul 26 12:28:27 2016 -0400

    9581: add json config file handling to slurm dispatcher.

diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 84a3bff..ebb5992 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -18,8 +18,8 @@ const (
 	Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
 )
 
-// A valid manifest designed to test various edge cases and parsing
-// requirements
+// PathologicalManifest : A valid manifest designed to test
+// various edge cases and parsing requirements
 const PathologicalManifest = ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3+Z+K at xyzzy acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:zero at 0 0:1:f 1:0:zero at 1 1:4:ooba 4:0:zero at 4 5:1:r 5:4:rbaz 9:0:zero at 9\n" +
 	"./overlapReverse acbd18db4cc2f85cedef654fccc4a4d8+3 acbd18db4cc2f85cedef654fccc4a4d8+3 5:1:o 4:2:oo 2:4:ofoo\n" +
 	"./segmented acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 0:1:frob 5:1:frob 1:1:frob 1:2:oof 0:1:oof 5:0:frob 3:1:frob\n" +
@@ -37,4 +37,5 @@ var (
 	MD5CollisionMD5 = "cee9a457e790cf20d4bdaa6d69f01e41"
 )
 
+// BlobSigningKey used by the test servers
 const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
diff --git a/sdk/go/arvadostest/run_servers.go b/sdk/go/arvadostest/run_servers.go
index c61b68b..7edc482 100644
--- a/sdk/go/arvadostest/run_servers.go
+++ b/sdk/go/arvadostest/run_servers.go
@@ -3,21 +3,26 @@ package arvadostest
 import (
 	"bufio"
 	"bytes"
+	"fmt"
+	"io/ioutil"
 	"log"
 	"os"
 	"os/exec"
+	"path"
 	"strconv"
 	"strings"
 )
 
 var authSettings = make(map[string]string)
 
+// ResetEnv resets test env
 func ResetEnv() {
 	for k, v := range authSettings {
 		os.Setenv(k, v)
 	}
 }
 
+// ParseAuthSettings parses auth settings from given input
 func ParseAuthSettings(authScript []byte) {
 	scanner := bufio.NewScanner(bytes.NewReader(authScript))
 	for scanner.Scan() {
@@ -36,7 +41,7 @@ func ParseAuthSettings(authScript []byte) {
 	log.Printf("authSettings: %v", authSettings)
 }
 
-var pythonTestDir string = ""
+var pythonTestDir string
 
 func chdirToPythonTests() {
 	if pythonTestDir != "" {
@@ -59,6 +64,7 @@ func chdirToPythonTests() {
 	}
 }
 
+// StartAPI starts test API server
 func StartAPI() {
 	cwd, _ := os.Getwd()
 	defer os.Chdir(cwd)
@@ -76,6 +82,7 @@ func StartAPI() {
 	ResetEnv()
 }
 
+// StopAPI stops test API server
 func StopAPI() {
 	cwd, _ := os.Getwd()
 	defer os.Chdir(cwd)
@@ -132,3 +139,24 @@ func bgRun(cmd *exec.Cmd) {
 		log.Fatalf("%+v: %s", cmd.Args, err)
 	}
 }
+
+// CreateBadPath creates a tmp dir, appends given string and returns that path
+// This will guarantee that the path being returned does not exist
+func CreateBadPath() (badpath string, err error) {
+	tempdir, err := ioutil.TempDir("", "bad")
+	if err != nil {
+		return "", fmt.Errorf("Could not create temporary directory for bad path: %v", err)
+	}
+	badpath = path.Join(tempdir, "bad")
+	return badpath, nil
+}
+
+// DestroyBadPath deletes the tmp dir created by the previous CreateBadPath call
+func DestroyBadPath(badpath string) error {
+	tempdir := path.Join(badpath, "..")
+	err := os.Remove(tempdir)
+	if err != nil {
+		return fmt.Errorf("Could not remove bad path temporary directory %v: %v", tempdir, err)
+	}
+	return nil
+}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index c78a2f6..a6a1e0a 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,6 +3,7 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
+	"encoding/json"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -17,6 +18,13 @@ import (
 	"time"
 )
 
+// Config used by crunch-dispatch-slurm
+type Config struct {
+	SbatchArguments  []string
+	PollPeriod       *time.Duration
+	CrunchRunCommand *string
+}
+
 func main() {
 	err := doMain()
 	if err != nil {
@@ -25,19 +33,26 @@ func main() {
 }
 
 var (
-	crunchRunCommand *string
-	squeueUpdater    Squeue
+	config        Config
+	squeueUpdater Squeue
 )
 
+const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
+
 func doMain() error {
 	flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
 
-	pollInterval := flags.Int(
+	configPath := flags.String(
+		"config",
+		defaultConfigPath,
+		"`path` to json configuration file")
+
+	config.PollPeriod = flags.Duration(
 		"poll-interval",
-		10,
-		"Interval in seconds to poll for queued containers")
+		10*time.Second,
+		"Time duration to poll for queued containers")
 
-	crunchRunCommand = flags.String(
+	config.CrunchRunCommand = flags.String(
 		"crunch-run-command",
 		"/usr/bin/crunch-run",
 		"Crunch command to run container")
@@ -45,6 +60,12 @@ func doMain() error {
 	// Parse args; omit the first arg which is the command name
 	flags.Parse(os.Args[1:])
 
+	err := readConfig(config.SbatchArguments, *configPath)
+	if err != nil {
+		log.Printf("Error reading configuration: %v", err)
+		return err
+	}
+
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Printf("Error making Arvados client: %v", err)
@@ -52,13 +73,13 @@ func doMain() error {
 	}
 	arv.Retries = 25
 
-	squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+	squeueUpdater.StartMonitor(*config.PollPeriod * time.Second)
 	defer squeueUpdater.Done()
 
 	dispatcher := dispatch.Dispatcher{
 		Arv:            arv,
 		RunContainer:   run,
-		PollInterval:   time.Duration(*pollInterval) * time.Second,
+		PollInterval:   *config.PollPeriod * time.Second,
 		DoneProcessing: make(chan struct{})}
 
 	err = dispatcher.RunDispatcher()
@@ -72,10 +93,17 @@ func doMain() error {
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
 	memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
-	return exec.Command("sbatch", "--share",
-		fmt.Sprintf("--job-name=%s", container.UUID),
-		fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
-		fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+
+	var sbatchArgs []string
+	sbatchArgs = append(sbatchArgs, "--share")
+	for i := range config.SbatchArguments {
+		sbatchArgs = append(sbatchArgs, config.SbatchArguments[i])
+	}
+	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
+	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
+	sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+
+	return exec.Command("sbatch", sbatchArgs...)
 }
 
 // scancelCmd
@@ -189,7 +217,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
 
 			log.Printf("About to submit queued container %v", container.UUID)
 
-			if err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+			if err := submit(dispatcher, container, *config.CrunchRunCommand); err != nil {
 				log.Printf("Error submitting container %s to slurm: %v",
 					container.UUID, err)
 				// maybe sbatch is broken, put it back to queued
@@ -267,3 +295,18 @@ func run(dispatcher *dispatch.Dispatcher,
 	}
 	monitorDone = true
 }
+
+func readConfig(dst interface{}, path string) error {
+	if buf, err := ioutil.ReadFile(path); err != nil && strings.Contains(err.Error(), "no such file") {
+		if path == defaultConfigPath || path == "" {
+			log.Printf("Config not specified. Continue with default configuration.")
+		} else {
+			return fmt.Errorf("Config file not found %q: %v", path, err)
+		}
+	} else if err != nil {
+		return fmt.Errorf("Error reading config %q: %v", path, err)
+	} else if err = json.Unmarshal(buf, dst); err != nil {
+		return fmt.Errorf("Error decoding config %q: %v", path, err)
+	}
+	return nil
+}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index af27832..5528cb7 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,6 +8,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
 	"io"
+	"io/ioutil"
 	"log"
 	"net/http"
 	"net/http/httptest"
@@ -143,7 +144,7 @@ func (s *TestSuite) integrationTest(c *C,
 	c.Check(len(containers.Items), Equals, 1)
 
 	echo := "echo"
-	crunchRunCommand = &echo
+	config.CrunchRunCommand = &echo
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -205,7 +206,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	log.SetOutput(io.MultiWriter(buf, os.Stderr))
 	defer log.SetOutput(os.Stderr)
 
-	crunchRunCommand = &crunchCmd
+	config.CrunchRunCommand = &crunchCmd
 
 	doneProcessing := make(chan struct{})
 	dispatcher := dispatch.Dispatcher{
@@ -236,3 +237,62 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
 	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
+
+func (s *MockArvadosServerSuite) Test_EmptyConfigFile(c *C) {
+	var config Config
+
+	err := readConfig(&config.SbatchArguments, "")
+	c.Assert(err, IsNil)
+}
+
+func (s *MockArvadosServerSuite) Test_NoSuchConfigFile(c *C) {
+	var config Config
+
+	badpath, err := arvadostest.CreateBadPath()
+	if err != nil {
+		c.Fatalf(err.Error())
+	}
+	defer func() {
+		err = arvadostest.DestroyBadPath(badpath)
+		if err != nil {
+			c.Fatalf(err.Error())
+		}
+	}()
+
+	err = readConfig(&config.SbatchArguments, badpath)
+	c.Assert(err, NotNil)
+}
+
+func (s *MockArvadosServerSuite) Test_BadConfig(c *C) {
+	var config Config
+
+	tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
+	c.Check(err, IsNil)
+	defer os.Remove(tmpfile.Name())
+
+	_, err = tmpfile.Write([]byte("fileContent"))
+	c.Check(err, IsNil)
+
+	err = readConfig(&config.SbatchArguments, tmpfile.Name())
+	c.Assert(err, NotNil)
+}
+
+func (s *MockArvadosServerSuite) Test_ReadConfig(c *C) {
+	var config Config
+
+	tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
+	c.Check(err, IsNil)
+	defer os.Remove(tmpfile.Name())
+
+	args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
+	argsS := `["--arg1=v1",  "--arg2", "--arg3=v3"]`
+	_, err = tmpfile.Write([]byte(argsS))
+	c.Check(err, IsNil)
+
+	err = readConfig(&config.SbatchArguments, tmpfile.Name())
+	c.Assert(err, IsNil)
+	c.Check(3, Equals, len(config.SbatchArguments))
+	for i := range args {
+		c.Check(args[i], Equals, config.SbatchArguments[i])
+	}
+}
diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go
index a99ec6b..0ff6b77 100644
--- a/services/datamanager/datamanager_test.go
+++ b/services/datamanager/datamanager_test.go
@@ -538,32 +538,31 @@ func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
 	testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
 }
 
-func createBadPath(t *testing.T) (badpath string) {
-	tempdir, err := ioutil.TempDir("", "bad")
-	if err != nil {
-		t.Fatalf("Could not create temporary directory for bad path: %v", err)
-	}
-	badpath = path.Join(tempdir, "bad")
-	return
-}
-
-func destroyBadPath(t *testing.T, badpath string) {
-	tempdir := path.Join(badpath, "..")
-	err := os.Remove(tempdir)
+func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
+	badpath, err := arvadostest.CreateBadPath()
 	if err != nil {
-		t.Fatalf("Could not remove bad path temporary directory %v: %v", tempdir, err)
+		t.Fatalf(err.Error())
 	}
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
-	badpath := createBadPath(t)
-	defer destroyBadPath(t, badpath)
+	defer func() {
+		err = arvadostest.DestroyBadPath(badpath)
+		if err != nil {
+			t.Fatalf(err.Error())
+		}
+	}()
 	testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
 }
 
 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
-	badpath := createBadPath(t)
-	defer destroyBadPath(t, badpath)
+	badpath, err := arvadostest.CreateBadPath()
+	if err != nil {
+		t.Fatalf(err.Error())
+	}
+	defer func() {
+		err = arvadostest.DestroyBadPath(badpath)
+		if err != nil {
+			t.Fatalf(err.Error())
+		}
+	}()
 	testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list