[ARVADOS] created: c7137167b17cb07b2d2aca325fe028c54006741d
git at public.curoverse.com
git at public.curoverse.com
Mon Oct 26 15:53:16 EDT 2015
at c7137167b17cb07b2d2aca325fe028c54006741d (commit)
commit c7137167b17cb07b2d2aca325fe028c54006741d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 26 15:53:07 2015 -0400
7593: Don't upload the same files more than once. Fix handling "./" in glob paths.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index e2e9270..d1e53fa 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -14,6 +14,7 @@ import fnmatch
import logging
import re
import os
+import copy
from cwltool.process import get_feature
logger = logging.getLogger('arvados.cwl-runner')
@@ -56,6 +57,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
def _match(self, collection, patternsegments, parent):
ret = []
+ if len(patternsegments) == 0:
+ return ret
for filename in collection:
if fnmatch.fnmatch(filename, patternsegments[0]):
cur = os.path.join(parent, filename)
@@ -63,6 +66,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
ret.append(cur)
else:
ret.extend(self._match(collection[filename], patternsegments[1:], cur))
+ elif patternsegments[0] == '.':
+ ret.extend(self._match(collection, patternsegments[1:], parent))
return ret
def glob(self, pattern):
@@ -149,7 +154,7 @@ class ArvadosJob(object):
outputs = {}
outputs = self.collect_outputs(record["output"])
except Exception as e:
- logger.warn(str(e))
+ logger.exception("Got exception while collecting job outputs:")
processStatus = "permanentFail"
self.output_callback(outputs, processStatus)
@@ -158,7 +163,7 @@ class ArvadosJob(object):
class ArvPathMapper(cwltool.pathmapper.PathMapper):
def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
- self._pathmap = {}
+ self._pathmap = copy.copy(arvrunner.uploaded)
uploadfiles = []
pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/.+')
@@ -166,6 +171,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
for src in referenced_files:
if isinstance(src, basestring) and pdh_path.match(src):
self._pathmap[src] = (src, "$(task.keep)/%s" % src)
+ if src in self._pathmap:
+ pass
else:
ab = cwltool.pathmapper.abspath(src, basedir)
st = arvados.commands.run.statfile("", ab)
@@ -186,6 +193,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
fnPattern="$(task.keep)/%s/%s")
for src, ab, st in uploadfiles:
+ arvrunner.uploaded[src] = (ab, st.fn)
self._pathmap[src] = (ab, st.fn)
@@ -209,6 +217,7 @@ class ArvCwlRunner(object):
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.final_output = None
+ self.uploaded = {}
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
commit e9153135c39388bf403ea94896f935ce80309b01
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Oct 26 14:13:45 2015 -0400
7582: Passes draft-2 conformance tests.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 213496d..e2e9270 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -100,7 +100,7 @@ class ArvadosJob(object):
script_parameters["task.vwd"] = {}
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][6:])
+ src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][13:])
vwd.copy(rest, t, source_collection=src)
else:
with vwd.open(t, "w") as f:
commit 07ad618c10f03f9d24970670f991791d4bd22b62
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 23 17:43:38 2015 -0400
7582: Fixup to work with latest cwltool. Runs jobs with Go crunchrunner.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f3298ec..213496d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,7 +40,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
return dockerRequirement["dockerImageId"]
-class CollectionFsAccess(cwltool.draft2tool.StdFsAccess):
+class CollectionFsAccess(cwltool.process.StdFsAccess):
def __init__(self, basedir):
self.collections = {}
self.basedir = basedir
@@ -97,6 +97,7 @@ class ArvadosJob(object):
if self.generatefiles:
vwd = arvados.collection.Collection()
+ script_parameters["task.vwd"] = {}
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][6:])
@@ -105,7 +106,8 @@ class ArvadosJob(object):
with vwd.open(t, "w") as f:
f.write(self.generatefiles[t])
vwd.save_new()
- script_parameters["task.vwd"] = vwd.portable_data_hash()
+ for t in self.generatefiles:
+ script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
if self.environment:
@@ -120,13 +122,12 @@ class ArvadosJob(object):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
- runtime_constraints["arvados_sdk_version"] = "master"
response = self.arvrunner.api.jobs().create(body={
- "script": "run-command",
- "repository": "arvados",
+ "script": "crunchrunner",
+ "repository": "peteramstutz/cr",
"script_version": "master",
- "script_parameters": script_parameters,
+ "script_parameters": {"tasks": [script_parameters]},
"runtime_constraints": runtime_constraints
}, find_or_create=kwargs.get("enable_reuse", True)).execute()
@@ -164,9 +165,9 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
for src in referenced_files:
if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, "/keep/%s" % src)
+ self._pathmap[src] = (src, "$(task.keep)/%s" % src)
else:
- ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+ ab = cwltool.pathmapper.abspath(src, basedir)
st = arvados.commands.run.statfile("", ab)
if kwargs.get("conformance_test"):
self._pathmap[src] = (src, ab)
@@ -178,7 +179,11 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
if uploadfiles:
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
+ arvrunner.api,
+ dry_run=kwargs.get("dry_run"),
+ num_retries=3,
+ fnPattern="$(task.keep)/%s/%s")
for src, ab, st in uploadfiles:
self._pathmap[src] = (ab, st.fn)
@@ -187,7 +192,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
self.arvrunner = arvrunner
def makeJobRunner(self):
@@ -282,7 +287,7 @@ class ArvCwlRunner(object):
def main(args, stdout, stderr, api_client=None):
runner = ArvCwlRunner(api_client=arvados.api('v1'))
- args.append("--leave-outputs")
+ args.insert(0, "--leave-outputs")
parser = cwltool.main.arg_parser()
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-reuse", action="store_true",
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 4812252..187cce6 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -29,7 +29,7 @@ setup(name='arvados-cwl-runner',
'bin/cwl-runner'
],
install_requires=[
- 'cwltool==1.0.20150722144138',
+ 'cwltool>=1.0.20151014024436',
'arvados-python-client'
],
zip_safe=True,
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 8df945a..8815565 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -101,7 +101,7 @@ def statfile(prefix, fn):
return prefix+fn
-def uploadfiles(files, api, dry_run=False, num_retries=0, project=None):
+def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)"):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file pathes no longer have a common parent.
@@ -153,7 +153,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None):
logger.info("Uploaded to %s", item["uuid"])
for c in files:
- c.fn = "$(file %s/%s)" % (pdh, c.fn)
+ c.fn = fnPattern % (pdh, c.fn)
os.chdir(orgdir)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list