[ARVADOS] created: cd2645820d630f99a4227b73a084f7b393d03da4
Git user
git at public.curoverse.com
Wed Oct 5 11:08:53 EDT 2016
at cd2645820d630f99a4227b73a084f7b393d03da4 (commit)
commit cd2645820d630f99a4227b73a084f7b393d03da4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Oct 5 11:08:40 2016 -0400
9308: Set task.keepTmpOutput: true when outputDirType is keep_output_dir.
diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
index 44b1b06..fe3eadd 100644
--- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
+++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
@@ -13,6 +13,29 @@ $graph:
_id: "@type"
_type: "@vocab"
+- name: OutputDirType
+ type: enum
+ symbols:
+ - local_output_dir
+ - keep_output_dir
+ doc:
+ - |
+ local_output_dir: Use regular file system local to the compute node.
+ There must be sufficient local scratch space to store entire output;
+ specify this this with `outdirMin` of `ResourceRequirement`. Files are
+ batch uploaded to Keep when the process completes. Most compatible, but
+ upload step can be time consuming for very large files.
+ - |
+ keep_output_dir: Use writable Keep mount. Files are streamed to Keep as
+ they are written. Does not consume local scratch space, but does consume
+ RAM for output buffers (up to 192 MiB per file simultaneously open for
+ writing.) Best suited to processes which produce sequential output of
+ large files (non-sequential writes may produced fragmented file
+ manifests). Supports regular files and directories, does not support
+ special files such as symlinks, hard links, named pipes, named sockets,
+ or device nodes.
+
+
- name: RuntimeConstraints
type: record
doc: |
@@ -31,6 +54,11 @@ $graph:
MiB. Increase this to reduce cache thrashing in situations such as
accessing multiple large (64+ MiB) files at the same time, or
performing random access on a large file.
+ - name: outputDirType
+ type: OutputDirType?
+ doc: |
+ Preferred backing store for output staging. If not specified, the
+ system may choose which one to use.
- name: APIRequirement
type: record
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 0818d5d..4ede88d 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -95,7 +95,13 @@ class ArvadosJob(object):
runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
- runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
+ if "keep_cache" in runtime_req:
+ runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
+ if "outputDirType" in runtime_req:
+ if runtime_req["outputDirType"] == "local_output_dir":
+ script_parameters["task.keepTmpOutput"] = False
+ elif runtime_req["outputDirType"] == "keep_output_dir":
+ script_parameters["task.keepTmpOutput"] = True
filters = [["repository", "=", "arvados"],
["script", "=", "crunchrunner"],
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 7f31520..99e34d3 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -82,7 +82,8 @@ class TestJob(unittest.TestCase):
"tmpdirMin": 4000
}, {
"class": "http://arvados.org/cwl#RuntimeConstraints",
- "keep_cache": 512
+ "keep_cache": 512,
+ "outputDirType": "keep_output_dir"
}, {
"class": "http://arvados.org/cwl#APIRequirement",
}],
@@ -101,6 +102,7 @@ class TestJob(unittest.TestCase):
'script_parameters': {
'tasks': [{
'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
+ 'task.keepTmpOutput': True,
'command': ['ls']
}]
},
commit 0d3dd79bff8665c36e2442a76a9f7bb700702101
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Oct 4 18:30:06 2016 -0400
9308: Add 'keepTmpOutput' option to crunchrunner
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 5c3d65c..114c49c 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -2,9 +2,12 @@ package main
import (
"crypto/x509"
+ "encoding/json"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io"
"io/ioutil"
"log"
"net/http"
@@ -25,6 +28,7 @@ type TaskDef struct {
SuccessCodes []int `json:"task.successCodes"`
PermanentFailCodes []int `json:"task.permanentFailCodes"`
TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ keepTmpOutput bool `json:"task.keepTmpOutput"`
}
type Tasks struct {
@@ -50,17 +54,21 @@ type IArvadosClient interface {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
tmpdir = crunchtmpdir + "/tmpdir"
err = os.Mkdir(tmpdir, 0700)
if err != nil {
return "", "", err
}
- outdir = crunchtmpdir + "/outdir"
- err = os.Mkdir(outdir, 0700)
- if err != nil {
- return "", "", err
+ if keepTmp {
+ outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+ } else {
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
}
return tmpdir, outdir, nil
@@ -81,6 +89,23 @@ func checkOutputFilename(outdir, fn string) error {
return nil
}
+func copyFile(dst, src string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer in.Close()
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return err
+ }
+ defer out.Close()
+
+ _, err = io.Copy(out, in)
+ return err
+}
+
func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
if taskp.Vwd != nil {
for k, v := range taskp.Vwd {
@@ -89,7 +114,12 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
if err != nil {
return "", "", "", err
}
- os.Symlink(v, outdir+"/"+k)
+ if taskp.keepTmpOutput {
+ // Is there an os.Copy?
+ copyFile(v, outdir+"/"+k)
+ } else {
+ os.Symlink(v, outdir+"/"+k)
+ }
}
}
@@ -180,6 +210,22 @@ func substitute(inp string, subst map[string]string) string {
return inp
}
+func getKeepTmp(outdir string) (manifest string, err error) {
+ fn, err := os.Open(outdir + "/" + ".arvados#collection")
+ if err != nil {
+ return "", err
+ }
+ defer fn.Close()
+
+ buf, err := ioutil.ReadAll(fn)
+ if err != nil {
+ return "", err
+ }
+ collection := arvados.Collection{}
+ json.Unmarshal(buf, &collection)
+ return collection.ManifestText, nil
+}
+
func runner(api IArvadosClient,
kc IKeepClient,
jobUuid, taskUuid, crunchtmpdir, keepmount string,
@@ -218,7 +264,7 @@ func runner(api IArvadosClient,
}
var tmpdir, outdir string
- tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.keepTmpOutput)
if err != nil {
return TempFail{err}
}
@@ -313,9 +359,14 @@ func runner(api IArvadosClient,
}
// Upload output directory
- manifest, err := WriteTree(kc, outdir)
- if err != nil {
- return TempFail{err}
+ var manifest string
+ if taskp.keepTmpOutput {
+ manifest, err = getKeepTmp(outdir)
+ } else {
+ manifest, err = WriteTree(kc, outdir)
+ if err != nil {
+ return TempFail{err}
+ }
}
// Set status
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list