[ARVADOS] created: 27816b602e9da83a2565e6fe8f87f250555b1ba5
Git user
git at public.curoverse.com
Tue Jul 12 16:39:15 EDT 2016
at 27816b602e9da83a2565e6fe8f87f250555b1ba5 (commit)
commit 27816b602e9da83a2565e6fe8f87f250555b1ba5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jul 12 15:55:34 2016 -0400
9570: Support Directory and file literal features. Support
success/temporary/permanent exit codes.
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 5d253ca..6fb0fb9 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -51,18 +51,6 @@ class ArvadosJob(object):
if p.type == "CreateFile":
script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
- # if self.generatefiles:
- # for t in self.generatefiles:
- # if isinstance(self.generatefiles[t], dict):
- # src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- # vwd.copy(rest, t, source_collection=src)
- # else:
- # with vwd.open(t, "w") as f:
- # f.write(t.encode('utf-8'))
- # vwd.save_new()
- # for t in self.generatefiles:
- # script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
if self.environment:
script_parameters["task.env"].update(self.environment)
@@ -76,6 +64,13 @@ class ArvadosJob(object):
if self.stderr:
script_parameters["task.stderr"] = self.stderr
+ if self.successCodes:
+ script_parameters["task.successCodes"] = self.successCodes
+ if self.temporaryFailCodes:
+ script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
+ if self.permanentFailCodes:
+ script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
+
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 06edac9..548b166 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -49,6 +49,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
def glob(self, pattern):
collection, rest = self.get_collection(pattern)
+ if collection and not rest:
+ return [pattern]
patternsegments = rest.split("/")
return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
@@ -69,21 +71,35 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
def isfile(self, fn): # type: (unicode) -> bool
collection, rest = self.get_collection(fn)
if collection:
- return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+ if rest:
+ return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+ else:
+ return False
else:
return super(CollectionFsAccess, self).isfile(fn)
def isdir(self, fn): # type: (unicode) -> bool
collection, rest = self.get_collection(fn)
if collection:
- return isinstance(collection.find(rest), arvados.arvfile.Collection)
+ if rest:
+ return isinstance(collection.find(rest), arvados.collection.Collection)
+ else:
+ return True
else:
return super(CollectionFsAccess, self).isdir(fn)
def listdir(self, fn): # type: (unicode) -> List[unicode]
collection, rest = self.get_collection(fn)
- dir = collection.find(rest)
+ if rest:
+ dir = collection.find(rest)
+ else:
+ dir = collection
if collection:
return [abspath(l, fn) for l in dir.keys()]
else:
return super(CollectionFsAccess, self).listdir(fn)
+
+ def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
+ if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
+ return paths[-1]
+ return os.path.join(path, *paths)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 0a11925..12f7cd7 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -1,13 +1,19 @@
import re
+import logging
import arvados.commands.run
import arvados.collection
+
from cwltool.pathmapper import PathMapper, MapperEnt, abspath
+from cwltool.workflow import WorkflowException
+
+logger = logging.getLogger('arvados.cwl-runner')
class ArvPathMapper(PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+ pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
+ pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
def __init__(self, arvrunner, referenced_files, input_basedir,
collection_pattern, file_pattern, name=None, **kwargs):
@@ -18,43 +24,109 @@ class ArvPathMapper(PathMapper):
self.name = name
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
+ def visit(self, srcobj, uploadfiles):
+ src = srcobj["location"]
+ if srcobj["class"] == "File":
+ if "#" in src:
+ src = src[:src.index("#")]
+ if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+ if src not in self._pathmap:
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ elif src.startswith("_:") and "contents" in srcobj:
+ pass
+ else:
+ raise WorkflowException("Input file path '%s' is invalid" % st)
+ if "secondaryFiles" in srcobj:
+ for l in srcobj["secondaryFiles"]:
+ self.visit(l, uploadfiles)
+ elif srcobj["class"] == "Directory":
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+ else:
+ for l in srcobj["listing"]:
+ self.visit(l, uploadfiles)
+
+ def addentry(self, obj, c, path, subdirs):
+ if obj["location"] in self._pathmap:
+ src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+ c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ for l in obj.get("secondaryFiles", []):
+ self.addentry(l, c, path, subdirs)
+ elif obj["class"] == "Directory":
+ for l in obj["listing"]:
+ self.addentry(l, c, path + "/" + obj["basename"], subdirs)
+ subdirs.append((obj["location"], path + "/" + obj["basename"]))
+ elif obj["location"].startswith("_:") and "contents" in obj:
+ with c.open(path + "/" + obj["basename"], "w") as f:
+ f.write(obj["contents"].encode("utf-8"))
+ else:
+ raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self._pathmap = self.arvrunner.get_uploaded()
uploadfiles = set()
for srcobj in referenced_files:
- if srcobj["class"] == "File":
- src = srcobj["location"]
- if "#" in src:
- src = src[:src.index("#")]
- if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
- self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
- if src not in self._pathmap:
- # Local FS ref, may need to be uploaded or may be on keep
- # mount.
- ab = abspath(src, self.input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
- if isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = MapperEnt(ab, st.fn, "File")
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+ self.visit(srcobj, uploadfiles)
if uploadfiles:
arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
self.arvrunner.api,
dry_run=False,
- num_retries=3,
+ num_retries=self.arvrunner.num_retries,
fnPattern=self.file_pattern,
name=self.name,
project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
self.arvrunner.add_uploaded(src, self._pathmap[src])
+ for srcobj in referenced_files:
+ if srcobj["class"] == "Directory":
+ if srcobj["location"] not in self._pathmap:
+ c = arvados.collection.Collection(api_client=self.arvrunner.api,
+ num_retries=self.arvrunner.num_retries)
+ subdirs = []
+ for l in srcobj["listing"]:
+ self.addentry(l, c, ".", subdirs)
+
+ check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+ if not check["items"]:
+ c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ ab = self.collection_pattern % c.portable_data_hash()
+ self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
+ for loc, sub in subdirs:
+ ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
+ (srcobj["location"].startswith("_:") and "contents" in srcobj)):
+
+ c = arvados.collection.Collection(api_client=self.arvrunner.api,
+ num_retries=self.arvrunner.num_retries )
+ subdirs = []
+ self.addentry(srcobj, c, ".", subdirs)
+
+ check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+ if not check["items"]:
+ c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
+ self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+ for loc, sub in subdirs:
+ ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+
self.keepdir = None
def reversemap(self, target):
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d60dfd7..fe451f6 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -3,7 +3,9 @@ import urlparse
from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
from cwltool.process import get_feature, scandeps, UnsupportedRequirement
@@ -17,6 +19,8 @@ from .pathmapper import ArvPathMapper
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index d74a9c1..cf797f0 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,9 +30,8 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool==1.0.20160708190014',
- 'arvados-python-client>=0.1.20160322001610',
- 'ruamel.yaml==0.11.11', # this should be declared by schema_salad instead, but see #9567
+ 'cwltool==1.0.20160712154127',
+ 'arvados-python-client>=0.1.20160322001610'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 5d29c45..54df452 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -171,6 +171,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
pdh = item["portable_data_hash"]
for c in files:
+ c.keepref = "%s/%s" % (pdh, c.fn)
c.fn = fnPattern % (pdh, c.fn)
os.chdir(orgdir)
commit 72ecf5e060b51c0b8c559329e713f8327a2e8c87
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jul 11 16:02:49 2016 -0400
9570: Pass tests except related to directory features
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 962a690..9f1c577 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -303,6 +303,7 @@ def main(args, stdout, stderr, api_client=None):
return 1
arvargs.conformance_test = None
+ arvargs.use_container = True
return cwltool.main.main(args=arvargs,
stdout=stdout,
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 33ce86c..5d253ca 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -7,6 +7,7 @@ from cwltool.errors import WorkflowException
from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+from cwltool.pathmapper import PathMapper
import arvados.collection
@@ -37,14 +38,18 @@ class ArvadosJob(object):
if self.generatefiles["listing"]:
vwd = arvados.collection.Collection()
script_parameters["task.vwd"] = {}
- for f, p in self.generatefiles["listing"]:
- if p.type == "File":
- pass
- if p.type == "File" or "WritableFile":
- pass
- elif p.type == "CreateFile":
- with open(p.target, "w") as n:
+ generatemapper = PathMapper([self.generatefiles], self.outdir,
+ ".", separateDirs=False)
+ for f, p in generatemapper.items():
+ if p.type == "CreateFile":
+ with vwd.open(p.target, "w") as n:
n.write(p.resolved.encode("utf-8"))
+ vwd.save_new()
+ for f, p in generatemapper.items():
+ if p.type == "File":
+ script_parameters["task.vwd"][p.target] = self.pathmapper.mapper(f).target
+ if p.type == "CreateFile":
+ script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
# if self.generatefiles:
# for t in self.generatefiles:
commit e34ddeba03a6a9b4a1f9913ff5871938e83ee1d2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jul 11 15:31:45 2016 -0400
9570: CWL v1.0 support wip
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 9bf93e7..5ad87d1 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -3,7 +3,8 @@ import json
import os
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.pathmapper import adjustFiles
import arvados.collection
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index f129dfa..33ce86c 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -4,7 +4,7 @@ import copy
from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
@@ -34,30 +34,43 @@ class ArvadosJob(object):
}
runtime_constraints = {}
- if self.generatefiles:
+ if self.generatefiles["listing"]:
vwd = arvados.collection.Collection()
script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t].encode('utf-8'))
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+ for f, p in self.generatefiles["listing"]:
+ if p.type == "File":
+ pass
+ if p.type == "File" or "WritableFile":
+ pass
+ elif p.type == "CreateFile":
+ with open(p.target, "w") as n:
+ n.write(p.resolved.encode("utf-8"))
+
+ # if self.generatefiles:
+ # for t in self.generatefiles:
+ # if isinstance(self.generatefiles[t], dict):
+ # src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+ # vwd.copy(rest, t, source_collection=src)
+ # else:
+ # with vwd.open(t, "w") as f:
+ # f.write(t.encode('utf-8'))
+ # vwd.save_new()
+ # for t in self.generatefiles:
+ # script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
if self.environment:
script_parameters["task.env"].update(self.environment)
if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+ script_parameters["task.stdin"] = self.stdin
if self.stdout:
script_parameters["task.stdout"] = self.stdout
+ if self.stderr:
+ script_parameters["task.stderr"] = self.stderr
+
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
@@ -82,7 +95,7 @@ class ArvadosJob(object):
"owner_uuid": self.arvrunner.project_uuid,
"script": "crunchrunner",
"repository": "arvados",
- "script_version": "master",
+ "script_version": "9570-cwl-v1.0",
"minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
"script_parameters": {"tasks": [script_parameters]},
"runtime_constraints": runtime_constraints
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index a2dffa6..021365c 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -17,7 +17,8 @@ class ArvadosCommandTool(CommandLineTool):
elif self.work_api == "jobs":
return ArvadosJob(self.arvrunner)
- def makePathMapper(self, reffiles, **kwargs):
+ def makePathMapper(self, reffiles, stagedir, **kwargs):
+ # type: (List[Any], unicode, **Any) -> PathMapper
if self.work_api == "containers":
return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
"/keep/%s",
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 28b0fee..06edac9 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -2,9 +2,11 @@ import fnmatch
import os
import cwltool.process
+from cwltool.pathmapper import abspath
import arvados.util
import arvados.collection
+import arvados.arvfile
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -55,11 +57,33 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
if collection:
return collection.open(rest, mode)
else:
- return open(self._abs(fn), mode)
+ return super(CollectionFsAccess, self).open(self._abs(fn), mode)
def exists(self, fn):
collection, rest = self.get_collection(fn)
if collection:
return collection.exists(rest)
else:
- return os.path.exists(self._abs(fn))
+ return super(CollectionFsAccess, self).exists(fn)
+
+ def isfile(self, fn): # type: (unicode) -> bool
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+ else:
+ return super(CollectionFsAccess, self).isfile(fn)
+
+ def isdir(self, fn): # type: (unicode) -> bool
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return isinstance(collection.find(rest), arvados.arvfile.Collection)
+ else:
+ return super(CollectionFsAccess, self).isdir(fn)
+
+ def listdir(self, fn): # type: (unicode) -> List[unicode]
+ collection, rest = self.get_collection(fn)
+ dir = collection.find(rest)
+ if collection:
+ return [abspath(l, fn) for l in dir.keys()]
+ else:
+ return super(CollectionFsAccess, self).listdir(fn)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 9538a91..0a11925 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -2,47 +2,58 @@ import re
import arvados.commands.run
import arvados.collection
-import cwltool.pathmapper
+from cwltool.pathmapper import PathMapper, MapperEnt, abspath
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
+class ArvPathMapper(PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
+ pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+
def __init__(self, arvrunner, referenced_files, input_basedir,
collection_pattern, file_pattern, name=None, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
+ self.arvrunner = arvrunner
+ self.input_basedir = input_basedir
+ self.collection_pattern = collection_pattern
+ self.file_pattern = file_pattern
+ self.name = name
+ super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
+
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None
+ self._pathmap = self.arvrunner.get_uploaded()
uploadfiles = set()
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, collection_pattern % src[5:])
- if "#" in src:
- src = src[:src.index("#")]
- if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+ for srcobj in referenced_files:
+ if srcobj["class"] == "File":
+ src = srcobj["location"]
+ if "#" in src:
+ src = src[:src.index("#")]
+ if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+ if src not in self._pathmap:
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ else:
+ raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
if uploadfiles:
arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
+ self.arvrunner.api,
+ dry_run=False,
num_retries=3,
- fnPattern=file_pattern,
- name=name,
- project=arvrunner.project_uuid)
+ fnPattern=self.file_pattern,
+ name=self.name,
+ project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
+ self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ self.arvrunner.add_uploaded(src, self._pathmap[src])
self.keepdir = None
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 629b104..d60dfd7 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -6,8 +6,9 @@ import json
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFiles
import arvados.collection
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 4e72091..d74a9c1 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool==1.0.20160630171631',
+ 'cwltool==1.0.20160708190014',
'arvados-python-client>=0.1.20160322001610',
'ruamel.yaml==0.11.11', # this should be declared by schema_salad instead, but see #9567
],
commit a206b8127df92039645d5ad3f9e144380cc55613
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jul 11 15:30:47 2016 -0400
9570: Add support to crunchrunner for redirecting stderr to support CWL 1.0.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 14c75af..040d7c2 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -20,6 +20,7 @@ type TaskDef struct {
Env map[string]string `json:"task.env"`
Stdin string `json:"task.stdin"`
Stdout string `json:"task.stdout"`
+ Stderr string `json:"task.stderr"`
Vwd map[string]string `json:"task.vwd"`
SuccessCodes []int `json:"task.successCodes"`
PermanentFailCodes []int `json:"task.permanentFailCodes"`
@@ -80,13 +81,13 @@ func checkOutputFilename(outdir, fn string) error {
return nil
}
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
if taskp.Vwd != nil {
for k, v := range taskp.Vwd {
v = substitute(v, replacements)
err = checkOutputFilename(outdir, k)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
os.Symlink(v, outdir+"/"+k)
}
@@ -97,26 +98,39 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
stdin = substitute(taskp.Stdin, replacements)
cmd.Stdin, err = os.Open(stdin)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
}
if taskp.Stdout != "" {
err = checkOutputFilename(outdir, taskp.Stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
// Set up stdout redirection
stdout = outdir + "/" + taskp.Stdout
cmd.Stdout, err = os.Create(stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
} else {
cmd.Stdout = os.Stdout
}
- cmd.Stderr = os.Stderr
+ if taskp.Stderr != "" {
+ err = checkOutputFilename(outdir, taskp.Stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ // Set up stderr redirection
+ stderr = outdir + "/" + taskp.Stderr
+ cmd.Stderr, err = os.Create(stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ } else {
+ cmd.Stderr = os.Stderr
+ }
if taskp.Env != nil {
// Set up subprocess environment
@@ -126,7 +140,7 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
cmd.Env = append(cmd.Env, k+"="+v)
}
}
- return stdin, stdout, nil
+ return stdin, stdout, stderr, nil
}
// Set up signal handlers. Go sends signal notifications to a "signal
@@ -227,8 +241,8 @@ func runner(api IArvadosClient,
cmd.Dir = outdir
- var stdin, stdout string
- stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ var stdin, stdout, stderr string
+ stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
if err != nil {
return err
}
@@ -240,7 +254,10 @@ func runner(api IArvadosClient,
if stdout != "" {
stdout = " > " + stdout
}
- log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+ if stderr != "" {
+ stderr = " 2> " + stderr
+ }
+ log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
var caughtSignal os.Signal
sigChan := setupSignals(cmd)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list