[ARVADOS] created: b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296

Git user git at public.curoverse.com
Thu Jun 9 16:33:10 EDT 2016


        at  b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296 (commit)


commit b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296
Merge: 3333173 84b538b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 16:23:00 2016 -0400

    Merge branch 'master' into 8442-cwl-crunch2


commit 3333173032fffece1a6b092ae307ae3fe6e8a2b0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 15:16:09 2016 -0400

    8442: raise UnsupportedRequirement for unsupported features in the conformance
    tests.  Bump cwltool dependency.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 171d92d..af74808 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ from .arvcontainer import ArvadosContainer
 from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
 
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -477,7 +477,7 @@ class ArvCwlRunner(object):
         if kwargs.get("submit"):
             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
@@ -543,11 +543,13 @@ class ArvCwlRunner(object):
                     self.cond.wait(1)
 
                 events.close()
+            except UnsupportedRequirement:
+                raise
             except:
                 if sys.exc_info()[0] is KeyboardInterrupt:
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
-                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
                 if self.pipeline:
                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0cb885f..17fe8cb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -4,6 +4,7 @@ from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
 from . import done
 from cwltool.errors import WorkflowException
+from cwltool.process import UnsupportedRequirement
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -42,6 +43,8 @@ class ArvadosContainer(object):
             }
 
         if self.generatefiles:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+
             vwd = arvados.collection.Collection()
             container_request["task.vwd"] = {}
             for t in self.generatefiles:
@@ -60,13 +63,12 @@ class ArvadosContainer(object):
         if self.environment:
             container_request["environment"].update(self.environment)
 
-        # TODO, not supported by crunchv2 yet
-        #if self.stdin:
-        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+        if self.stdin:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
 
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
-                                "path": self.stdout}
+                                "path": "/var/spool/cwl/%s" % (self.stdout)}
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 591bdde..b1ff7f3 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool==1.0.20160519182434',
+          'cwltool==1.0.20160609160402',
           'arvados-python-client>=0.1.20160322001610'
       ],
       data_files=[

commit 9cb3fb78406631a68533a85e8b1c8364d5ed900f
Merge: 1bcd619 427da80
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 10:58:00 2016 -0400

    Merge branch 'master' into 8442-cwl-crunch2


commit 1bcd61945d2265dae09cb402b393f46bcc800be3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 08:21:34 2016 -0400

    8442: Setting up mount points works.  Capturing output works.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 387bc4a..171d92d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
 from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
 
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
@@ -36,10 +37,6 @@ from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
@@ -98,194 +95,6 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
         else:
             return os.path.exists(self._abs(fn))
 
-class ArvadosJob(object):
-    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
-    def __init__(self, runner):
-        self.arvrunner = runner
-        self.running = False
-
-    def run(self, dry_run=False, pull_image=True, **kwargs):
-        script_parameters = {
-            "command": self.command_line
-        }
-        runtime_constraints = {}
-
-        if self.generatefiles:
-            vwd = arvados.collection.Collection()
-            script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
-        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
-        if self.environment:
-            script_parameters["task.env"].update(self.environment)
-
-        if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
-        if self.stdout:
-            script_parameters["task.stdout"] = self.stdout
-
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-        if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-        else:
-            runtime_constraints["docker_image"] = "arvados/jobs"
-
-        resources = self.builder.resources
-        if resources is not None:
-            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
-            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
-            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
-        filters = [["repository", "=", "arvados"],
-                   ["script", "=", "crunchrunner"],
-                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
-        if not self.arvrunner.ignore_docker_for_reuse:
-            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
-        try:
-            response = self.arvrunner.api.jobs().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "script": "crunchrunner",
-                    "repository": "arvados",
-                    "script_version": "master",
-                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                    "script_parameters": {"tasks": [script_parameters]},
-                    "runtime_constraints": runtime_constraints
-                },
-                filters=filters,
-                find_or_create=kwargs.get("enable_reuse", True)
-            ).execute(num_retries=self.arvrunner.num_retries)
-
-            self.arvrunner.jobs[response["uuid"]] = self
-
-            self.update_pipeline_component(response)
-
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
-        except Exception as e:
-            logger.error("Got error %s" % str(e))
-            self.output_callback({}, "permanentFail")
-
-    def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                 body={
-                                                                                    "components": self.arvrunner.pipeline["components"]
-                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
-                        body={
-                            "components": components
-                        }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
-
-    def done(self, record):
-        try:
-            self.update_pipeline_component(record)
-        except:
-            pass
-
-        try:
-            if record["state"] == "Complete":
-                processStatus = "success"
-            else:
-                processStatus = "permanentFail"
-
-            try:
-                outputs = {}
-                if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
-            except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
-                processStatus = "permanentFail"
-            except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
-                processStatus = "permanentFail"
-
-            self.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
 
 
 class RunnerJob(object):
@@ -574,10 +383,16 @@ class ArvadosCommandTool(CommandLineTool):
             return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                             "$(task.keep)/%s",
-                             "$(task.keep)/%s/%s",
-                             **kwargs)
+        if self.crunch2:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+        else:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "$(task.keep)/%s",
+                                 "$(task.keep)/%s/%s",
+                                 **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -676,7 +491,10 @@ class ArvCwlRunner(object):
                 runnerjob.run()
                 return runnerjob.uuid
 
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        if self.crunch2:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
+        else:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 2179398..0cb885f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,7 +1,9 @@
 import logging
 import arvados.collection
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
+from . import done
+from cwltool.errors import WorkflowException
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -12,6 +14,9 @@ class ArvadosContainer(object):
         self.arvrunner = runner
         self.running = False
 
+    def update_pipeline_component(self, r):
+        pass
+
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
             "command": self.command_line,
@@ -26,13 +31,15 @@ class ArvadosContainer(object):
         mounts = {
             "/var/spool/cwl": {
                 "kind": "tmp"
-            },
-            "/tmp": {
-                "kind": "tmp"
             }
         }
 
-        # TODO mount normal inputs...
+        for f in self.pathmapper.files():
+            _, p = self.pathmapper.mapper(f)
+            mounts[p] = {
+                "kind": "collection",
+                "portable_data_hash": p[6:]
+            }
 
         if self.generatefiles:
             vwd = arvados.collection.Collection()
@@ -53,7 +60,7 @@ class ArvadosContainer(object):
         if self.environment:
             container_request["environment"].update(self.environment)
 
-        # TODO, not supported
+        # TODO, not supported by crunchv2 yet
         #if self.stdin:
         #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
 
@@ -84,11 +91,11 @@ class ArvadosContainer(object):
                 body=container_request
             ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.jobs[response["uuid"]] = self
+            self.arvrunner.jobs[response["container_uuid"]] = self
 
-            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+            logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
 
-            if response["state"] in ("Complete", "Cancelled"):
+            if response["state"] == "Final":
                 self.done(response)
         except Exception as e:
             logger.error("Got error %s" % str(e))
@@ -104,69 +111,9 @@ class ArvadosContainer(object):
             try:
                 outputs = {}
                 if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
+                    outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
             except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting job outputs:")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
new file mode 100644
index 0000000..88a8eeb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -0,0 +1,166 @@
+import logging
+import re
+from . import done
+from .arvdocker import arv_docker_get_image
+from cwltool.process import get_feature
+from cwltool.errors import WorkflowException
+import arvados.collection
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+class ArvadosJob(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        script_parameters = {
+            "command": self.command_line
+        }
+        runtime_constraints = {}
+
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            script_parameters["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            for t in self.generatefiles:
+                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+        if self.environment:
+            script_parameters["task.env"].update(self.environment)
+
+        if self.stdin:
+            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+        if self.stdout:
+            script_parameters["task.stdout"] = self.stdout
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if docker_req and kwargs.get("use_container") is not False:
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+        else:
+            runtime_constraints["docker_image"] = "arvados/jobs"
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+        filters = [["repository", "=", "arvados"],
+                   ["script", "=", "crunchrunner"],
+                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+        if not self.arvrunner.ignore_docker_for_reuse:
+            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
+        try:
+            response = self.arvrunner.api.jobs().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "script": "crunchrunner",
+                    "repository": "arvados",
+                    "script_version": "master",
+                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                    "script_parameters": {"tasks": [script_parameters]},
+                    "runtime_constraints": runtime_constraints
+                },
+                filters=filters,
+                find_or_create=kwargs.get("enable_reuse", True)
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            self.arvrunner.jobs[response["uuid"]] = self
+
+            self.update_pipeline_component(response)
+
+            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+            if response["state"] in ("Complete", "Failed", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+
+    def update_pipeline_component(self, record):
+        if self.arvrunner.pipeline:
+            self.arvrunner.pipeline["components"][self.name] = {"job": record}
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+                                                                                 body={
+                                                                                    "components": self.arvrunner.pipeline["components"]
+                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
+        if self.arvrunner.uuid:
+            try:
+                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                if job:
+                    components = job["components"]
+                    components[self.name] = record["uuid"]
+                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+                        body={
+                            "components": components
+                        }).execute(num_retries=self.arvrunner.num_retries)
+            except Exception as e:
+                logger.info("Error adding to components: %s", e)
+
+    def done(self, record):
+        try:
+            self.update_pipeline_component(record)
+        except:
+            pass
+
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            outputs = {}
+            try:
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+
+                    outputs = done.done(self, record, tmpdir, outdir, keepdir)
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
new file mode 100644
index 0000000..8a6fc9d
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -0,0 +1,38 @@
+def done(self, record, tmpdir, outdir, keepdir):
+    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+    # check if collection already exists with same owner, name and content
+    collection_exists = self.arvrunner.api.collections().list(
+        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                 ['portable_data_hash', '=', record["output"]],
+                 ["name", "=", colname]]
+    ).execute(num_retries=self.arvrunner.num_retries)
+
+    if not collection_exists["items"]:
+        # Create a collection located in the same project as the
+        # pipeline with the contents of the output.
+        # First, get output record.
+        collections = self.arvrunner.api.collections().list(
+            limit=1,
+            filters=[['portable_data_hash', '=', record["output"]]],
+            select=["manifest_text"]
+        ).execute(num_retries=self.arvrunner.num_retries)
+
+        if not collections["items"]:
+            raise WorkflowException(
+                "Job output '%s' cannot be found on API server" % (
+                    record["output"]))
+
+        # Create new collection in the parent project
+        # with the output contents.
+        self.arvrunner.api.collections().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": colname,
+            "portable_data_hash": record["output"],
+            "manifest_text": collections["items"][0]["manifest_text"]
+        }, ensure_unique_name=True).execute(
+            num_retries=self.arvrunner.num_retries)
+
+    self.builder.outdir = outdir
+    self.builder.pathmapper.keepdir = keepdir
+    return self.collect_outputs("keep:" + record["output"])

commit 2e2efff6b6e7bff2880996953ed23a17e8757b7a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:45:00 2016 -0400

    8442: Return PDH for Docker container.  Working on setting up mount points.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5a1ff07..387bc4a 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,9 +40,6 @@ tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
-
-
-
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4ea63a0..2179398 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -23,7 +23,16 @@ class ArvadosContainer(object):
             "state": "Committed"
         }
         runtime_constraints = {}
-        mounts = {}
+        mounts = {
+            "/var/spool/cwl": {
+                "kind": "tmp"
+            },
+            "/tmp": {
+                "kind": "tmp"
+            }
+        }
+
+        # TODO mount normal inputs...
 
         if self.generatefiles:
             vwd = arvados.collection.Collection()
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 8f76bbf..253df99 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -26,6 +26,11 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
-    # XXX return PDH instead
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+
+    #return dockerRequirement["dockerImageId"]
 
-    return dockerRequirement["dockerImageId"]
+    pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+    return pdh

commit 79838b34419f3091e1562aabca9ee8a9b29c61db
Merge: 2f7418e 3c4bcfb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:26:11 2016 -0400

    Merge branch '9187-requeued-containers' into 8442-cwl-crunch2


commit 2f7418e07bae597216c8d69a93f3059ef51f972a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:15:58 2016 -0400

    8442: Submit containers Work in progess.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0878998..5a1ff07 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -27,6 +27,8 @@ import threading
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
+from .arvcontainer import ArvadosContainer
+from .arvdocker import arv_docker_get_image
 
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
 from arvados.api import OrderedJsonModel
@@ -39,31 +41,6 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
 
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
-    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
-
-    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
-        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
-    sp = dockerRequirement["dockerImageId"].split(":")
-    image_name = sp[0]
-    image_tag = sp[1] if len(sp) > 1 else None
-
-    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                            image_name=image_name,
-                                                            image_tag=image_tag)
-
-    if not images:
-        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = ["--project-uuid="+project_uuid, image_name]
-        if image_tag:
-            args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-
-    # XXX return PDH instead
-
-    return dockerRequirement["dockerImageId"]
 
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
@@ -588,12 +565,13 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
 
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
+    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
+        self.crunch2 = crunch2
 
     def makeJobRunner(self):
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             return ArvadosContainer(self.arvrunner)
         else:
             return ArvadosJob(self.arvrunner)
@@ -609,7 +587,7 @@ class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
     complete, and report output."""
 
-    def __init__(self, api_client):
+    def __init__(self, api_client, crunch2):
         self.api = api_client
         self.jobs = {}
         self.lock = threading.Lock()
@@ -618,10 +596,11 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
+        self.crunch2 = crunch2
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -709,7 +688,7 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
         else:
@@ -849,7 +828,7 @@ def main(args, stdout, stderr, api_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client)
+        runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
     except Exception as e:
         logger.error(e)
         return 1
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index fe0f7ca..4ea63a0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,3 +1,10 @@
+import logging
+import arvados.collection
+from cwltool.process import get_feature
+from .arvdocker import arv_docker_get_image
+
+logger = logging.getLogger('arvados.cwl-runner')
+
 class ArvadosContainer(object):
     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
 
@@ -7,12 +14,13 @@ class ArvadosContainer(object):
 
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
-            "command": self.command_line
+            "command": self.command_line,
             "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
-            "output_path", "/var/spool/cwl",
-            "cwd", "/var/spool/cwl",
-            "priority": 1
+            "output_path": "/var/spool/cwl",
+            "cwd": "/var/spool/cwl",
+            "priority": 1,
+            "state": "Committed"
         }
         runtime_constraints = {}
         mounts = {}
@@ -46,7 +54,7 @@ class ArvadosContainer(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
-            docker_req = "arvados/jobs"
+            docker_req = {"dockerImageId": "arvados/jobs"}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                      docker_req,
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
new file mode 100644
index 0000000..8f76bbf
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -0,0 +1,31 @@
+import logging
+import cwltool.docker
+import arvados.commands.keepdocker
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
+    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+    sp = dockerRequirement["dockerImageId"].split(":")
+    image_name = sp[0]
+    image_tag = sp[1] if len(sp) > 1 else None
+
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+
+    if not images:
+        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+        args = ["--project-uuid="+project_uuid, image_name]
+        if image_tag:
+            args.append(image_tag)
+        logger.info("Uploading Docker image %s", ":".join(args[1:]))
+        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+
+    # XXX return PDH instead
+
+    return dockerRequirement["dockerImageId"]

commit 268250130d4927d49dbac3497affb98fda158645
Merge: c46dd60 b7db50d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jun 7 16:59:39 2016 -0400

    Merge branch 'master' into 8442-cwl-crunch2


commit c46dd60f1dceac0e55da7bb328c3ad6313066993
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:54:35 2016 -0400

    8442: Add --crunch1/--crunch2 switch

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 9b97dfc..f774b66 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -806,6 +806,15 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--crunch1", action="store_false",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v1 Jobs API")
+
+    exgroup.add_argument("--crunch2", action="store_true",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v2 Containers API")
+
     parser.add_argument("workflow", type=str, nargs="?", default=None)
     parser.add_argument("job_order", nargs=argparse.REMAINDER)
 

commit 4503c419a8c752f4ff6c0c57c343989405cd8ebb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:24:50 2016 -0400

    8442: CWL create crunch2 containers WIP

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 1740a90..9b97dfc 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
+    # XXX return PDH instead
+
     return dockerRequirement["dockerImageId"]
 
 
@@ -574,7 +576,10 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
+        if kwargs.get("crunch2"):
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, **kwargs):
         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -675,8 +680,12 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
-        kwargs["outdir"] = "$(task.outdir)"
-        kwargs["tmpdir"] = "$(task.tmpdir)"
+        if kwargs.get("crunch2"):
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
 
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644
index 0000000..fe0f7ca
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        container_request = {
+            "command": self.command_line
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path", "/var/spool/cwl",
+            "cwd", "/var/spool/cwl",
+            "priority": 1
+        }
+        runtime_constraints = {}
+        mounts = {}
+
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            container_request["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            # TODO
+            # for t in self.generatefiles:
+            #     container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+        container_request["environment"] = {"TMPDIR": "/tmp"}
+        if self.environment:
+            container_request["environment"].update(self.environment)
+
+        # TODO, not supported
+        #if self.stdin:
+        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+        if self.stdout:
+            mounts["stdout"] = {"kind": "file",
+                                "path": self.stdout}
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if not docker_req:
+            docker_req = "arvados/jobs"
+
+        container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+                                                                     docker_req,
+                                                                     pull_image,
+                                                                     self.arvrunner.project_uuid)
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["vcpus"] = resources.get("cores", 1)
+            runtime_constraints["ram"] = resources.get("ram") * 2**20
+            #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+        container_request["mounts"] = mounts
+        container_request["runtime_constraints"] = runtime_constraints
+
+        try:
+            response = self.arvrunner.api.container_requests().create(
+                body=container_request
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            self.arvrunner.jobs[response["uuid"]] = self
+
+            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+            if response["state"] in ("Complete", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+
+    def done(self, record):
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            try:
+                outputs = {}
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
+                        # First, get output record.
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
+                        ).execute(num_retries=self.arvrunner.num_retries)
+
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
+
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
+                    outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list