[ARVADOS] created: 27816b602e9da83a2565e6fe8f87f250555b1ba5

Git user git at public.curoverse.com
Tue Jul 12 16:39:15 EDT 2016


        at  27816b602e9da83a2565e6fe8f87f250555b1ba5 (commit)


commit 27816b602e9da83a2565e6fe8f87f250555b1ba5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 12 15:55:34 2016 -0400

    9570: Support Directory and file literal features.  Support
    success/temporary/permanent exit codes.

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 5d253ca..6fb0fb9 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -51,18 +51,6 @@ class ArvadosJob(object):
                 if p.type == "CreateFile":
                     script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
 
-        # if self.generatefiles:
-        #     for t in self.generatefiles:
-        #         if isinstance(self.generatefiles[t], dict):
-        #             src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-        #             vwd.copy(rest, t, source_collection=src)
-        #         else:
-        #             with vwd.open(t, "w") as f:
-        #                 f.write(t.encode('utf-8'))
-        #     vwd.save_new()
-        #     for t in self.generatefiles:
-        #         script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
         if self.environment:
             script_parameters["task.env"].update(self.environment)
@@ -76,6 +64,13 @@ class ArvadosJob(object):
         if self.stderr:
             script_parameters["task.stderr"] = self.stderr
 
+        if self.successCodes:
+            script_parameters["task.successCodes"] = self.successCodes
+        if self.temporaryFailCodes:
+            script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
+        if self.permanentFailCodes:
+            script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
+
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 06edac9..548b166 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -49,6 +49,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
 
     def glob(self, pattern):
         collection, rest = self.get_collection(pattern)
+        if collection and not rest:
+            return [pattern]
         patternsegments = rest.split("/")
         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
 
@@ -69,21 +71,35 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
     def isfile(self, fn):  # type: (unicode) -> bool
         collection, rest = self.get_collection(fn)
         if collection:
-            return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+            if rest:
+                return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+            else:
+                return False
         else:
             return super(CollectionFsAccess, self).isfile(fn)
 
     def isdir(self, fn):  # type: (unicode) -> bool
         collection, rest = self.get_collection(fn)
         if collection:
-            return isinstance(collection.find(rest), arvados.arvfile.Collection)
+            if rest:
+                return isinstance(collection.find(rest), arvados.collection.Collection)
+            else:
+                return True
         else:
             return super(CollectionFsAccess, self).isdir(fn)
 
     def listdir(self, fn):  # type: (unicode) -> List[unicode]
         collection, rest = self.get_collection(fn)
-        dir = collection.find(rest)
+        if rest:
+            dir = collection.find(rest)
+        else:
+            dir = collection
         if collection:
             return [abspath(l, fn) for l in dir.keys()]
         else:
             return super(CollectionFsAccess, self).listdir(fn)
+
+    def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
+        if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
+            return paths[-1]
+        return os.path.join(path, *paths)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 0a11925..12f7cd7 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -1,13 +1,19 @@
 import re
+import logging
 
 import arvados.commands.run
 import arvados.collection
+
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath
+from cwltool.workflow import WorkflowException
+
+logger = logging.getLogger('arvados.cwl-runner')
 
 class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
-    pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+    pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
+    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
@@ -18,43 +24,109 @@ class ArvPathMapper(PathMapper):
         self.name = name
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
+    def visit(self, srcobj, uploadfiles):
+        src = srcobj["location"]
+        if srcobj["class"] == "File":
+            if "#" in src:
+                src = src[:src.index("#")]
+            if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+            if src not in self._pathmap:
+                # Local FS ref, may need to be uploaded or may be on keep
+                # mount.
+                ab = abspath(src, self.input_basedir)
+                st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+                if isinstance(st, arvados.commands.run.UploadFile):
+                    uploadfiles.add((src, ab, st))
+                elif isinstance(st, arvados.commands.run.ArvFile):
+                    self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                elif src.startswith("_:") and "contents" in srcobj:
+                    pass
+                else:
+                    raise WorkflowException("Input file path '%s' is invalid" % st)
+            if "secondaryFiles" in srcobj:
+                for l in srcobj["secondaryFiles"]:
+                    self.visit(l, uploadfiles)
+        elif srcobj["class"] == "Directory":
+            if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+            else:
+                for l in srcobj["listing"]:
+                    self.visit(l, uploadfiles)
+
+    def addentry(self, obj, c, path, subdirs):
+        if obj["location"] in self._pathmap:
+            src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+            c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+            for l in obj.get("secondaryFiles", []):
+                self.addentry(l, c, path, subdirs)
+        elif obj["class"] == "Directory":
+            for l in obj["listing"]:
+                self.addentry(l, c, path + "/" + obj["basename"], subdirs)
+            subdirs.append((obj["location"], path + "/" + obj["basename"]))
+        elif obj["location"].startswith("_:") and "contents" in obj:
+            with c.open(path + "/" + obj["basename"], "w") as f:
+                f.write(obj["contents"].encode("utf-8"))
+        else:
+            raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
         self._pathmap = self.arvrunner.get_uploaded()
         uploadfiles = set()
 
         for srcobj in referenced_files:
-            if srcobj["class"] == "File":
-                src = srcobj["location"]
-                if "#" in src:
-                    src = src[:src.index("#")]
-                if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
-                    self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
-                if src not in self._pathmap:
-                    # Local FS ref, may need to be uploaded or may be on keep
-                    # mount.
-                    ab = abspath(src, self.input_basedir)
-                    st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
-                    if isinstance(st, arvados.commands.run.UploadFile):
-                        uploadfiles.add((src, ab, st))
-                    elif isinstance(st, arvados.commands.run.ArvFile):
-                        self._pathmap[src] = MapperEnt(ab, st.fn, "File")
-                    else:
-                        raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+            self.visit(srcobj, uploadfiles)
 
         if uploadfiles:
             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
                                              self.arvrunner.api,
                                              dry_run=False,
-                                             num_retries=3,
+                                             num_retries=self.arvrunner.num_retries,
                                              fnPattern=self.file_pattern,
                                              name=self.name,
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+            self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
+        for srcobj in referenced_files:
+            if srcobj["class"] == "Directory":
+                if srcobj["location"] not in self._pathmap:
+                    c = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                      num_retries=self.arvrunner.num_retries)
+                    subdirs = []
+                    for l in srcobj["listing"]:
+                        self.addentry(l, c, ".", subdirs)
+
+                    check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+                    if not check["items"]:
+                        c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+                    ab = self.collection_pattern % c.portable_data_hash()
+                    self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
+                    for loc, sub in subdirs:
+                        ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+                        self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+            elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
+                (srcobj["location"].startswith("_:") and "contents" in srcobj)):
+
+                c = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                  num_retries=self.arvrunner.num_retries                                                  )
+                subdirs = []
+                self.addentry(srcobj, c, ".", subdirs)
+
+                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+                if not check["items"]:
+                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+                ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
+                self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+                for loc, sub in subdirs:
+                    ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+                    self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+
         self.keepdir = None
 
     def reversemap(self, target):
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d60dfd7..fe451f6 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -3,7 +3,9 @@ import urlparse
 from functools import partial
 import logging
 import json
+import re
 
+import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import get_feature, scandeps, UnsupportedRequirement
@@ -17,6 +19,8 @@ from .pathmapper import ArvPathMapper
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index d74a9c1..cf797f0 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,9 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool==1.0.20160708190014',
-          'arvados-python-client>=0.1.20160322001610',
-          'ruamel.yaml==0.11.11', # this should be declared by schema_salad instead, but see #9567
+          'cwltool==1.0.20160712154127',
+          'arvados-python-client>=0.1.20160322001610'
       ],
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 5d29c45..54df452 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -171,6 +171,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
         pdh = item["portable_data_hash"]
 
     for c in files:
+        c.keepref = "%s/%s" % (pdh, c.fn)
         c.fn = fnPattern % (pdh, c.fn)
 
     os.chdir(orgdir)

commit 72ecf5e060b51c0b8c559329e713f8327a2e8c87
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 11 16:02:49 2016 -0400

    9570: Pass tests except related to directory features

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 962a690..9f1c577 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -303,6 +303,7 @@ def main(args, stdout, stderr, api_client=None):
         return 1
 
     arvargs.conformance_test = None
+    arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 33ce86c..5d253ca 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -7,6 +7,7 @@ from cwltool.errors import WorkflowException
 from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
+from cwltool.pathmapper import PathMapper
 
 import arvados.collection
 
@@ -37,14 +38,18 @@ class ArvadosJob(object):
         if self.generatefiles["listing"]:
             vwd = arvados.collection.Collection()
             script_parameters["task.vwd"] = {}
-            for f, p in self.generatefiles["listing"]:
-                if p.type == "File":
-                    pass
-                if p.type == "File" or "WritableFile":
-                    pass
-                elif p.type == "CreateFile":
-                    with open(p.target, "w") as n:
+            generatemapper = PathMapper([self.generatefiles], self.outdir,
+                                        ".", separateDirs=False)
+            for f, p in generatemapper.items():
+                if p.type == "CreateFile":
+                    with vwd.open(p.target, "w") as n:
                         n.write(p.resolved.encode("utf-8"))
+            vwd.save_new()
+            for f, p in generatemapper.items():
+                if p.type == "File":
+                    script_parameters["task.vwd"][p.target] = self.pathmapper.mapper(f).target
+                if p.type == "CreateFile":
+                    script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
 
         # if self.generatefiles:
         #     for t in self.generatefiles:

commit e34ddeba03a6a9b4a1f9913ff5871938e83ee1d2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 11 15:31:45 2016 -0400

    9570: CWL v1.0 support wip

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 9bf93e7..5ad87d1 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -3,7 +3,8 @@ import json
 import os
 
 from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.pathmapper import adjustFiles
 
 import arvados.collection
 
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index f129dfa..33ce86c 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -4,7 +4,7 @@ import copy
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 
@@ -34,30 +34,43 @@ class ArvadosJob(object):
         }
         runtime_constraints = {}
 
-        if self.generatefiles:
+        if self.generatefiles["listing"]:
             vwd = arvados.collection.Collection()
             script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t].encode('utf-8'))
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+            for f, p in self.generatefiles["listing"]:
+                if p.type == "File":
+                    pass
+                if p.type == "File" or "WritableFile":
+                    pass
+                elif p.type == "CreateFile":
+                    with open(p.target, "w") as n:
+                        n.write(p.resolved.encode("utf-8"))
+
+        # if self.generatefiles:
+        #     for t in self.generatefiles:
+        #         if isinstance(self.generatefiles[t], dict):
+        #             src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+        #             vwd.copy(rest, t, source_collection=src)
+        #         else:
+        #             with vwd.open(t, "w") as f:
+        #                 f.write(t.encode('utf-8'))
+        #     vwd.save_new()
+        #     for t in self.generatefiles:
+        #         script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
 
         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
         if self.environment:
             script_parameters["task.env"].update(self.environment)
 
         if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+            script_parameters["task.stdin"] = self.stdin
 
         if self.stdout:
             script_parameters["task.stdout"] = self.stdout
 
+        if self.stderr:
+            script_parameters["task.stderr"] = self.stderr
+
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
@@ -82,7 +95,7 @@ class ArvadosJob(object):
                     "owner_uuid": self.arvrunner.project_uuid,
                     "script": "crunchrunner",
                     "repository": "arvados",
-                    "script_version": "master",
+                    "script_version": "9570-cwl-v1.0",
                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
                     "script_parameters": {"tasks": [script_parameters]},
                     "runtime_constraints": runtime_constraints
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index a2dffa6..021365c 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -17,7 +17,8 @@ class ArvadosCommandTool(CommandLineTool):
         elif self.work_api == "jobs":
             return ArvadosJob(self.arvrunner)
 
-    def makePathMapper(self, reffiles, **kwargs):
+    def makePathMapper(self, reffiles, stagedir, **kwargs):
+        # type: (List[Any], unicode, **Any) -> PathMapper
         if self.work_api == "containers":
             return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
                                  "/keep/%s",
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 28b0fee..06edac9 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -2,9 +2,11 @@ import fnmatch
 import os
 
 import cwltool.process
+from cwltool.pathmapper import abspath
 
 import arvados.util
 import arvados.collection
+import arvados.arvfile
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -55,11 +57,33 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
         if collection:
             return collection.open(rest, mode)
         else:
-            return open(self._abs(fn), mode)
+            return super(CollectionFsAccess, self).open(self._abs(fn), mode)
 
     def exists(self, fn):
         collection, rest = self.get_collection(fn)
         if collection:
             return collection.exists(rest)
         else:
-            return os.path.exists(self._abs(fn))
+            return super(CollectionFsAccess, self).exists(fn)
+
+    def isfile(self, fn):  # type: (unicode) -> bool
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+        else:
+            return super(CollectionFsAccess, self).isfile(fn)
+
+    def isdir(self, fn):  # type: (unicode) -> bool
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return isinstance(collection.find(rest), arvados.arvfile.Collection)
+        else:
+            return super(CollectionFsAccess, self).isdir(fn)
+
+    def listdir(self, fn):  # type: (unicode) -> List[unicode]
+        collection, rest = self.get_collection(fn)
+        dir = collection.find(rest)
+        if collection:
+            return [abspath(l, fn) for l in dir.keys()]
+        else:
+            return super(CollectionFsAccess, self).listdir(fn)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 9538a91..0a11925 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -2,47 +2,58 @@ import re
 
 import arvados.commands.run
 import arvados.collection
-import cwltool.pathmapper
+from cwltool.pathmapper import PathMapper, MapperEnt, abspath
 
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
+class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
+    pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+
     def __init__(self, arvrunner, referenced_files, input_basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
+        self.arvrunner = arvrunner
+        self.input_basedir = input_basedir
+        self.collection_pattern = collection_pattern
+        self.file_pattern = file_pattern
+        self.name = name
+        super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
+
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], unicode) -> None
+        self._pathmap = self.arvrunner.get_uploaded()
         uploadfiles = set()
 
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, collection_pattern % src[5:])
-            if "#" in src:
-                src = src[:src.index("#")]
-            if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.add((src, ab, st))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+        for srcobj in referenced_files:
+            if srcobj["class"] == "File":
+                src = srcobj["location"]
+                if "#" in src:
+                    src = src[:src.index("#")]
+                if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+                    self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
+                if src not in self._pathmap:
+                    # Local FS ref, may need to be uploaded or may be on keep
+                    # mount.
+                    ab = abspath(src, self.input_basedir)
+                    st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+                    if isinstance(st, arvados.commands.run.UploadFile):
+                        uploadfiles.add((src, ab, st))
+                    elif isinstance(st, arvados.commands.run.ArvFile):
+                        self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                    else:
+                        raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
 
         if uploadfiles:
             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
+                                             self.arvrunner.api,
+                                             dry_run=False,
                                              num_retries=3,
-                                             fnPattern=file_pattern,
-                                             name=name,
-                                             project=arvrunner.project_uuid)
+                                             fnPattern=self.file_pattern,
+                                             name=self.name,
+                                             project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
+            self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+            self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         self.keepdir = None
 
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 629b104..d60dfd7 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -6,8 +6,9 @@ import json
 
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
 from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFiles
 
 import arvados.collection
 
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 4e72091..d74a9c1 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool==1.0.20160630171631',
+          'cwltool==1.0.20160708190014',
           'arvados-python-client>=0.1.20160322001610',
           'ruamel.yaml==0.11.11', # this should be declared by schema_salad instead, but see #9567
       ],

commit a206b8127df92039645d5ad3f9e144380cc55613
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 11 15:30:47 2016 -0400

    9570: Add support to crunchrunner for redirecting stderr to support CWL 1.0.

diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index 14c75af..040d7c2 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -20,6 +20,7 @@ type TaskDef struct {
 	Env                map[string]string `json:"task.env"`
 	Stdin              string            `json:"task.stdin"`
 	Stdout             string            `json:"task.stdout"`
+	Stderr             string            `json:"task.stderr"`
 	Vwd                map[string]string `json:"task.vwd"`
 	SuccessCodes       []int             `json:"task.successCodes"`
 	PermanentFailCodes []int             `json:"task.permanentFailCodes"`
@@ -80,13 +81,13 @@ func checkOutputFilename(outdir, fn string) error {
 	return nil
 }
 
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
 	if taskp.Vwd != nil {
 		for k, v := range taskp.Vwd {
 			v = substitute(v, replacements)
 			err = checkOutputFilename(outdir, k)
 			if err != nil {
-				return "", "", err
+				return "", "", "", err
 			}
 			os.Symlink(v, outdir+"/"+k)
 		}
@@ -97,26 +98,39 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
 		stdin = substitute(taskp.Stdin, replacements)
 		cmd.Stdin, err = os.Open(stdin)
 		if err != nil {
-			return "", "", err
+			return "", "", "", err
 		}
 	}
 
 	if taskp.Stdout != "" {
 		err = checkOutputFilename(outdir, taskp.Stdout)
 		if err != nil {
-			return "", "", err
+			return "", "", "", err
 		}
 		// Set up stdout redirection
 		stdout = outdir + "/" + taskp.Stdout
 		cmd.Stdout, err = os.Create(stdout)
 		if err != nil {
-			return "", "", err
+			return "", "", "", err
 		}
 	} else {
 		cmd.Stdout = os.Stdout
 	}
 
-	cmd.Stderr = os.Stderr
+	if taskp.Stderr != "" {
+		err = checkOutputFilename(outdir, taskp.Stderr)
+		if err != nil {
+			return "", "", "", err
+		}
+		// Set up stderr redirection
+		stderr = outdir + "/" + taskp.Stderr
+		cmd.Stderr, err = os.Create(stderr)
+		if err != nil {
+			return "", "", "", err
+		}
+	} else {
+		cmd.Stderr = os.Stderr
+	}
 
 	if taskp.Env != nil {
 		// Set up subprocess environment
@@ -126,7 +140,7 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
 			cmd.Env = append(cmd.Env, k+"="+v)
 		}
 	}
-	return stdin, stdout, nil
+	return stdin, stdout, stderr, nil
 }
 
 // Set up signal handlers.  Go sends signal notifications to a "signal
@@ -227,8 +241,8 @@ func runner(api IArvadosClient,
 
 	cmd.Dir = outdir
 
-	var stdin, stdout string
-	stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+	var stdin, stdout, stderr string
+	stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
 	if err != nil {
 		return err
 	}
@@ -240,7 +254,10 @@ func runner(api IArvadosClient,
 	if stdout != "" {
 		stdout = " > " + stdout
 	}
-	log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+	if stderr != "" {
+		stderr = " 2> " + stderr
+	}
+	log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
 
 	var caughtSignal os.Signal
 	sigChan := setupSignals(cmd)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list