[ARVADOS] created: c78765cfbf09eea3d475f115750aecf143d313fb
Git user
git at public.curoverse.com
Tue Jul 26 12:29:23 EDT 2016
at c78765cfbf09eea3d475f115750aecf143d313fb (commit)
commit c78765cfbf09eea3d475f115750aecf143d313fb
Author: radhika <radhika at curoverse.com>
Date: Tue Jul 26 12:28:27 2016 -0400
9581: add json config file handling to slurm dispatcher.
diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go
index 84a3bff..ebb5992 100644
--- a/sdk/go/arvadostest/fixtures.go
+++ b/sdk/go/arvadostest/fixtures.go
@@ -18,8 +18,8 @@ const (
Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
)
-// A valid manifest designed to test various edge cases and parsing
-// requirements
+// PathologicalManifest : A valid manifest designed to test
+// various edge cases and parsing requirements
const PathologicalManifest = ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3+Z+K at xyzzy acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:zero at 0 0:1:f 1:0:zero at 1 1:4:ooba 4:0:zero at 4 5:1:r 5:4:rbaz 9:0:zero at 9\n" +
"./overlapReverse acbd18db4cc2f85cedef654fccc4a4d8+3 acbd18db4cc2f85cedef654fccc4a4d8+3 5:1:o 4:2:oo 2:4:ofoo\n" +
"./segmented acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 0:1:frob 5:1:frob 1:1:frob 1:2:oof 0:1:oof 5:0:frob 3:1:frob\n" +
@@ -37,4 +37,5 @@ var (
MD5CollisionMD5 = "cee9a457e790cf20d4bdaa6d69f01e41"
)
+// BlobSigningKey used by the test servers
const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
diff --git a/sdk/go/arvadostest/run_servers.go b/sdk/go/arvadostest/run_servers.go
index c61b68b..7edc482 100644
--- a/sdk/go/arvadostest/run_servers.go
+++ b/sdk/go/arvadostest/run_servers.go
@@ -3,21 +3,26 @@ package arvadostest
import (
"bufio"
"bytes"
+ "fmt"
+ "io/ioutil"
"log"
"os"
"os/exec"
+ "path"
"strconv"
"strings"
)
var authSettings = make(map[string]string)
+// ResetEnv resets test env
func ResetEnv() {
for k, v := range authSettings {
os.Setenv(k, v)
}
}
+// ParseAuthSettings parses auth settings from given input
func ParseAuthSettings(authScript []byte) {
scanner := bufio.NewScanner(bytes.NewReader(authScript))
for scanner.Scan() {
@@ -36,7 +41,7 @@ func ParseAuthSettings(authScript []byte) {
log.Printf("authSettings: %v", authSettings)
}
-var pythonTestDir string = ""
+var pythonTestDir string
func chdirToPythonTests() {
if pythonTestDir != "" {
@@ -59,6 +64,7 @@ func chdirToPythonTests() {
}
}
+// StartAPI starts test API server
func StartAPI() {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
@@ -76,6 +82,7 @@ func StartAPI() {
ResetEnv()
}
+// StopAPI stops test API server
func StopAPI() {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
@@ -132,3 +139,24 @@ func bgRun(cmd *exec.Cmd) {
log.Fatalf("%+v: %s", cmd.Args, err)
}
}
+
+// CreateBadPath creates a tmp dir, appends given string and returns that path
+// This will guarantee that the path being returned does not exist
+func CreateBadPath() (badpath string, err error) {
+ tempdir, err := ioutil.TempDir("", "bad")
+ if err != nil {
+ return "", fmt.Errorf("Could not create temporary directory for bad path: %v", err)
+ }
+ badpath = path.Join(tempdir, "bad")
+ return badpath, nil
+}
+
+// DestroyBadPath deletes the tmp dir created by the previous CreateBadPath call
+func DestroyBadPath(badpath string) error {
+ tempdir := path.Join(badpath, "..")
+ err := os.Remove(tempdir)
+ if err != nil {
+ return fmt.Errorf("Could not remove bad path temporary directory %v: %v", tempdir, err)
+ }
+ return nil
+}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index c78a2f6..a6a1e0a 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -3,6 +3,7 @@ package main
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
+ "encoding/json"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -17,6 +18,13 @@ import (
"time"
)
+// Config used by crunch-dispatch-slurm
+type Config struct {
+ SbatchArguments []string
+ PollPeriod *time.Duration
+ CrunchRunCommand *string
+}
+
func main() {
err := doMain()
if err != nil {
@@ -25,19 +33,26 @@ func main() {
}
var (
- crunchRunCommand *string
- squeueUpdater Squeue
+ config Config
+ squeueUpdater Squeue
)
+const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
+
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
- pollInterval := flags.Int(
+ configPath := flags.String(
+ "config",
+ defaultConfigPath,
+ "`path` to json configuration file")
+
+ config.PollPeriod = flags.Duration(
"poll-interval",
- 10,
- "Interval in seconds to poll for queued containers")
+ 10*time.Second,
+ "Time duration to poll for queued containers")
- crunchRunCommand = flags.String(
+ config.CrunchRunCommand = flags.String(
"crunch-run-command",
"/usr/bin/crunch-run",
"Crunch command to run container")
@@ -45,6 +60,12 @@ func doMain() error {
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ err := readConfig(config.SbatchArguments, *configPath)
+ if err != nil {
+ log.Printf("Error reading configuration: %v", err)
+ return err
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("Error making Arvados client: %v", err)
@@ -52,13 +73,13 @@ func doMain() error {
}
arv.Retries = 25
- squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
+ squeueUpdater.StartMonitor(*config.PollPeriod * time.Second)
defer squeueUpdater.Done()
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
- PollInterval: time.Duration(*pollInterval) * time.Second,
+ PollInterval: *config.PollPeriod * time.Second,
DoneProcessing: make(chan struct{})}
err = dispatcher.RunDispatcher()
@@ -72,10 +93,17 @@ func doMain() error {
// sbatchCmd
func sbatchFunc(container arvados.Container) *exec.Cmd {
memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
- return exec.Command("sbatch", "--share",
- fmt.Sprintf("--job-name=%s", container.UUID),
- fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+
+ var sbatchArgs []string
+ sbatchArgs = append(sbatchArgs, "--share")
+ for i := range config.SbatchArguments {
+ sbatchArgs = append(sbatchArgs, config.SbatchArguments[i])
+ }
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+
+ return exec.Command("sbatch", sbatchArgs...)
}
// scancelCmd
@@ -189,7 +217,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
log.Printf("About to submit queued container %v", container.UUID)
- if err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+ if err := submit(dispatcher, container, *config.CrunchRunCommand); err != nil {
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued
@@ -267,3 +295,18 @@ func run(dispatcher *dispatch.Dispatcher,
}
monitorDone = true
}
+
+func readConfig(dst interface{}, path string) error {
+ if buf, err := ioutil.ReadFile(path); err != nil && strings.Contains(err.Error(), "no such file") {
+ if path == defaultConfigPath || path == "" {
+ log.Printf("Config not specified. Continue with default configuration.")
+ } else {
+ return fmt.Errorf("Config file not found %q: %v", path, err)
+ }
+ } else if err != nil {
+ return fmt.Errorf("Error reading config %q: %v", path, err)
+ } else if err = json.Unmarshal(buf, dst); err != nil {
+ return fmt.Errorf("Error decoding config %q: %v", path, err)
+ }
+ return nil
+}
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index af27832..5528cb7 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -8,6 +8,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"io"
+ "io/ioutil"
"log"
"net/http"
"net/http/httptest"
@@ -143,7 +144,7 @@ func (s *TestSuite) integrationTest(c *C,
c.Check(len(containers.Items), Equals, 1)
echo := "echo"
- crunchRunCommand = &echo
+ config.CrunchRunCommand = &echo
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -205,7 +206,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
- crunchRunCommand = &crunchCmd
+ config.CrunchRunCommand = &crunchCmd
doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
@@ -236,3 +237,62 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
+
+func (s *MockArvadosServerSuite) Test_EmptyConfigFile(c *C) {
+ var config Config
+
+ err := readConfig(&config.SbatchArguments, "")
+ c.Assert(err, IsNil)
+}
+
+func (s *MockArvadosServerSuite) Test_NoSuchConfigFile(c *C) {
+ var config Config
+
+ badpath, err := arvadostest.CreateBadPath()
+ if err != nil {
+ c.Fatalf(err.Error())
+ }
+ defer func() {
+ err = arvadostest.DestroyBadPath(badpath)
+ if err != nil {
+ c.Fatalf(err.Error())
+ }
+ }()
+
+ err = readConfig(&config.SbatchArguments, badpath)
+ c.Assert(err, NotNil)
+}
+
+func (s *MockArvadosServerSuite) Test_BadConfig(c *C) {
+ var config Config
+
+ tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
+ c.Check(err, IsNil)
+ defer os.Remove(tmpfile.Name())
+
+ _, err = tmpfile.Write([]byte("fileContent"))
+ c.Check(err, IsNil)
+
+ err = readConfig(&config.SbatchArguments, tmpfile.Name())
+ c.Assert(err, NotNil)
+}
+
+func (s *MockArvadosServerSuite) Test_ReadConfig(c *C) {
+ var config Config
+
+ tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
+ c.Check(err, IsNil)
+ defer os.Remove(tmpfile.Name())
+
+ args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
+ argsS := `["--arg1=v1", "--arg2", "--arg3=v3"]`
+ _, err = tmpfile.Write([]byte(argsS))
+ c.Check(err, IsNil)
+
+ err = readConfig(&config.SbatchArguments, tmpfile.Name())
+ c.Assert(err, IsNil)
+ c.Check(3, Equals, len(config.SbatchArguments))
+ for i := range args {
+ c.Check(args[i], Equals, config.SbatchArguments[i])
+ }
+}
diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go
index a99ec6b..0ff6b77 100644
--- a/services/datamanager/datamanager_test.go
+++ b/services/datamanager/datamanager_test.go
@@ -538,32 +538,31 @@ func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
}
-func createBadPath(t *testing.T) (badpath string) {
- tempdir, err := ioutil.TempDir("", "bad")
- if err != nil {
- t.Fatalf("Could not create temporary directory for bad path: %v", err)
- }
- badpath = path.Join(tempdir, "bad")
- return
-}
-
-func destroyBadPath(t *testing.T, badpath string) {
- tempdir := path.Join(badpath, "..")
- err := os.Remove(tempdir)
+func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
+ badpath, err := arvadostest.CreateBadPath()
if err != nil {
- t.Fatalf("Could not remove bad path temporary directory %v: %v", tempdir, err)
+ t.Fatalf(err.Error())
}
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
- badpath := createBadPath(t)
- defer destroyBadPath(t, badpath)
+ defer func() {
+ err = arvadostest.DestroyBadPath(badpath)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ }()
testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
}
func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
- badpath := createBadPath(t)
- defer destroyBadPath(t, badpath)
+ badpath, err := arvadostest.CreateBadPath()
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ defer func() {
+ err = arvadostest.DestroyBadPath(badpath)
+ if err != nil {
+ t.Fatalf(err.Error())
+ }
+ }()
testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list