[ARVADOS] created: 2435a72a0a0c0058d2825ea0840836e95ebe5f25

Git user git at public.curoverse.com
Wed Jun 15 16:10:05 EDT 2016

        at  2435a72a0a0c0058d2825ea0840836e95ebe5f25 (commit)

commit 2435a72a0a0c0058d2825ea0840836e95ebe5f25
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 15 16:09:22 2016 -0400

    8442: Generatefiles encode utf-8

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 397b6d5..a816441 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -42,7 +42,7 @@ class ArvadosJob(object):
                     vwd.copy(rest, t, source_collection=src)
                     with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
+                        f.write(self.generatefiles[t].encode('utf-8'))
             for t in self.generatefiles:
                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)

commit b507fa11a9acfd15217d0972b0d6b41fd41b9958
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 15 14:51:14 2016 -0400

    8442: Submit CommandLineTool containers directly without an intermediate
    workflow runner.  Handle unsupported feature gracefully.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 050b7b9..e14eb4b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -37,6 +37,7 @@ class ArvCwlRunner(object):
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
+        self.final_status = None
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
@@ -106,9 +107,29 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
+        self.debug = kwargs.get("debug")
+        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+        self.fs_access = CollectionFsAccess(kwargs["basedir"])
+        kwargs["fs_access"] = self.fs_access
+        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        if self.crunch2:
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
+        runnerjob = None
         if kwargs.get("submit"):
             if self.crunch2:
-                runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+                if tool.tool["class"] == "CommandLineTool":
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         **kwargs).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -122,30 +143,16 @@ class ArvCwlRunner(object):
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
-        if kwargs.get("submit") and not kwargs.get("wait"):
-                runnerjob.run()
-                return runnerjob.uuid
+        if runnerjob and not kwargs.get("wait"):
+            runnerjob.run()
+            return runnerjob.uuid
         if self.crunch2:
             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-        self.debug = kwargs.get("debug")
-        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.fs_access = CollectionFsAccess(kwargs["basedir"])
-        kwargs["fs_access"] = self.fs_access
-        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-        if self.crunch2:
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["tmpdir"] = "/tmp"
-        else:
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
-        if kwargs.get("submit"):
+        if runnerjob:
             jobiter = iter((runnerjob,))
             if "cwl_runner_job" in kwargs:
@@ -185,7 +192,7 @@ class ArvCwlRunner(object):
             if self.pipeline:
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and self.crunch2:
+            if runnerjob and runnerjob.uuid and self.crunch2:
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 002c0ca..639426a 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ class Runner(object):
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
+        self.uuid = None
     def update_pipeline_component(self, record):
@@ -51,8 +52,14 @@ class Runner(object):
             return path
         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+        loaded = set()
         def loadref(b, u):
-            return document_loader.fetch(urlparse.urljoin(b, u))
+            joined = urlparse.urljoin(b, u)
+            if joined not in loaded:
+                loaded.add(joined)
+                return document_loader.fetch(urlparse.urljoin(b, u))
+            else:
+                return {}
         sc = scandeps(uri, workflowobj,
                       set(("$import", "run")),

commit 032239408242c641b08df74f6a91984cbab610cd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 15 11:23:47 2016 -0400

    8442: Tweak internal handling of keep: paths, examine exit codes to determine

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index ba26816..050b7b9 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
     complete, and report output."""
-    def __init__(self, api_client, crunch2):
+    def __init__(self, api_client, crunch2=False):
         self.api = api_client
         self.jobs = {}
         self.lock = threading.Lock()
@@ -54,12 +54,12 @@ class ArvCwlRunner(object):
             if self.pipeline:
                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
             logger.warn("Overall job status is %s", processStatus)
             if self.pipeline:
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        self.final_status = processStatus
         self.final_output = out
     def on_message(self, event):
@@ -191,6 +191,9 @@ class ArvCwlRunner(object):
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
         if self.final_output is None:
             raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 414ce63..8b5ac5a 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -111,7 +111,17 @@ class ArvadosContainer(object):
     def done(self, record):
             if record["state"] == "Complete":
-                processStatus = "success"
+                rcode = record["exit_code"]
+                if self.successCodes and rcode in self.successCodes:
+                    processStatus = "success"
+                elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
+                    processStatus = "temporaryFail"
+                elif self.permanentFailCodes and rcode in self.permanentFailCodes:
+                    processStatus = "permanentFail"
+                elif rcode == 0:
+                    processStatus = "success"
+                else:
+                    processStatus = "permanentFail"
                 processStatus = "permanentFail"
@@ -152,7 +162,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[:workflowcollection.index('/')]
+        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -195,6 +205,7 @@ class RunnerContainer(Runner):
     def run(self, *args, **kwargs):
+        kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(*args, **kwargs)
         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index f754348..002c0ca 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -6,7 +6,7 @@ import json
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
 from cwltool.load_tool import fetch_document
 import arvados.collection
@@ -61,15 +61,16 @@ class Runner(object):
         adjustFiles(sc, partial(visitFiles, workflowfiles))
         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+        keepprefix = kwargs.get("keepprefix", "")
         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
+                                       keepprefix+"%s",
+                                       keepprefix+"%s/%s",
         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
+                                  keepprefix+"%s",
+                                  keepprefix+"%s/%s",
                                   name=os.path.basename(self.job_order.get("id", "#")),
@@ -83,7 +84,15 @@ class Runner(object):
     def done(self, record):
         if record["state"] == "Complete":
-            processStatus = "success"
+            if record.get("exit_code") is not None:
+                if record["exit_code"] == 33:
+                    processStatus = "UnsupportedRequirement"
+                elif record["exit_code"] == 0:
+                    processStatus = "success"
+                else:
+                    processStatus = "permanentFail"
+            else:
+                processStatus = "success"
             processStatus = "permanentFail"
@@ -96,6 +105,8 @@ class Runner(object):
                 def keepify(path):
                     if not path.startswith("keep:"):
                         return "keep:%s/%s" % (record["output"], path)
+                    else:
+                        return path
                 adjustFiles(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)

commit a2bed2f92f8aab7f41fef7b6cbd5d377fc2a78d5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:42:37 2016 -0400

    8442: more import fixups, import workflow file correctly.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c80896..414ce63 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[workflowcollection.index('/')]
+        workflowcollection = workflowcollection[:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -170,7 +170,7 @@ class RunnerContainer(Runner):
             "state": "Committed",
             "container_image": container_image,
             "mounts": {
-                workflowpath: {
+                "/var/lib/cwl/workflow": {
                     "kind": "collection",
                     "portable_data_hash": "%s" % workflowcollection
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index d48c93a..28b0fee 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,6 +1,11 @@
 import fnmatch
+import os
 import cwltool.process
+import arvados.util
+import arvados.collection
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""

commit 3a0ae61f658a7473219991bd3279b492b24ed875
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:28:45 2016 -0400

    8442: import fixups

diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 0cc23ab..f754348 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -1,15 +1,21 @@
 import os
 import urlparse
 from functools import partial
+import logging
+import json
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import get_feature, scandeps, adjustFiles
 from cwltool.load_tool import fetch_document
+import arvados.collection
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
+logger = logging.getLogger('arvados.cwl-runner')
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner

commit 49043f14cb72a6eb5825aea529e3477b73e297c7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:17:45 2016 -0400

    8442: Debugging container --submit with crunch2

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 136d5ae..ba26816 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,7 @@ import arvados.events
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
 from cwltool.process import shortname, UnsupportedRequirement
 from arvados.api import OrderedJsonModel
@@ -184,6 +185,9 @@ class ArvCwlRunner(object):
             if self.pipeline:
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if runnerjob and self.crunch2:
+                self.api.container_requests().update(uuid=runnerjob.uuid,
+                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index be11404..4c80896 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+        workflowcollection = workflowcollection[workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -181,11 +181,16 @@ class RunnerContainer(Runner):
                 "stdout": {
                     "kind": "file",
                     "path": "/var/spool/cwl/cwl.output.json"
+                },
+                "/var/spool/cwl": {
+                    "kind": "collection",
+                    "writable": True
             "runtime_constraints": {
                 "vcpus": 1,
-                "ram": 1024*1024*256
+                "ram": 1024*1024*256,
+                "API": True
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index c911895..d48c93a 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,4 +1,5 @@
 import fnmatch
+import cwltool.process
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""

commit 6c43be47cb3756a0e6ffc924572259d1a1c8f2c3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 15:45:29 2016 -0400

    8442: Adding --submit support with --crunch2.  General refactoring into more/smaller files.

diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock
index 1b177ec..2618e47 100644
--- a/apps/workbench/Gemfile.lock
+++ b/apps/workbench/Gemfile.lock
@@ -309,6 +309,3 @@ DEPENDENCIES
   uglifier (>= 1.0.3)
-   1.12.1
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index af74808..136d5ae 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -3,397 +3,28 @@
 # Implement cwl-runner interface for submitting and running jobs on Arvados.
 import argparse
-import arvados
-import arvados.collection
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.events
-import arvados.util
-import copy
-import cwltool.docker
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
-from cwltool.errors import WorkflowException
-import cwltool.main
-import cwltool.workflow
-import fnmatch
-from functools import partial
-import json
 import logging
 import os
-import pkg_resources  # part of setuptools
-import re
 import sys
 import threading
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-import urlparse
-from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
-from .arvdocker import arv_docker_get_image
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
-from arvados.api import OrderedJsonModel
-logger = logging.getLogger('arvados.cwl-runner')
-class CollectionFsAccess(cwltool.process.StdFsAccess):
-    """Implement the cwltool FsAccess interface for Arvados Collections."""
-    def __init__(self, basedir):
-        super(CollectionFsAccess, self).__init__(basedir)
-        self.collections = {}
-    def get_collection(self, path):
-        p = path.split("/")
-        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
-            pdh = p[0][5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
-            return (self.collections[pdh], "/".join(p[1:]))
-        else:
-            return (None, path)
-    def _match(self, collection, patternsegments, parent):
-        if not patternsegments:
-            return []
-        if not isinstance(collection, arvados.collection.RichCollectionBase):
-            return []
-        ret = []
-        # iterate over the files and subcollections in 'collection'
-        for filename in collection:
-            if patternsegments[0] == '.':
-                # Pattern contains something like "./foo" so just shift
-                # past the "./"
-                ret.extend(self._match(collection, patternsegments[1:], parent))
-            elif fnmatch.fnmatch(filename, patternsegments[0]):
-                cur = os.path.join(parent, filename)
-                if len(patternsegments) == 1:
-                    ret.append(cur)
-                else:
-                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
-        return ret
-    def glob(self, pattern):
-        collection, rest = self.get_collection(pattern)
-        patternsegments = rest.split("/")
-        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-    def open(self, fn, mode):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.open(rest, mode)
-        else:
-            return open(self._abs(fn), mode)
-    def exists(self, fn):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.exists(rest)
-        else:
-            return os.path.exists(self._abs(fn))
-class RunnerJob(object):
-    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-    def __init__(self, runner, tool, job_order, enable_reuse):
-        self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
-        self.running = False
-        self.enable_reuse = enable_reuse
-    def update_pipeline_component(self, record):
-        pass
-    def upload_docker(self, tool):
-        if isinstance(tool, CommandLineTool):
-            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
-            if docker_req:
-                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
-        elif isinstance(tool, cwltool.workflow.Workflow):
-            for s in tool.steps:
-                self.upload_docker(s.embedded_tool)
-    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
-        """Create an Arvados job specification for this workflow.
-        The returned dict can be used to create a job (i.e., passed as
-        the +body+ argument to jobs().create()), or as a component in
-        a pipeline template or pipeline instance.
-        """
-        self.upload_docker(self.tool)
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(self.tool.tool["id"])
-        self.name = os.path.basename(self.tool.tool["id"])
-        def visitFiles(files, path):
-            files.add(path)
-            return path
-        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
-        def loadref(b, u):
-            return document_loader.fetch(urlparse.urljoin(b, u))
-        sc = scandeps(uri, workflowobj,
-                      set(("$import", "run")),
-                      set(("$include", "$schemas", "path")),
-                      loadref)
-        adjustFiles(sc, partial(visitFiles, workflowfiles))
-        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
-                                       name=self.name,
-                                       **kwargs)
-        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
-                                  name=os.path.basename(self.job_order.get("id", "#")),
-                                  **kwargs)
-        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-        if "id" in self.job_order:
-            del self.job_order["id"]
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
-        return {
-            "script": "cwl-runner",
-            "script_version": "master",
-            "repository": "arvados",
-            "script_parameters": self.job_order,
-            "runtime_constraints": {
-                "docker_image": "arvados/jobs"
-            }
-        }
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
-        self.uuid = response["uuid"]
-        self.arvrunner.jobs[self.uuid] = self
-        logger.info("Submitted job %s", response["uuid"])
-        if kwargs.get("submit"):
-            self.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(response)
-    def done(self, record):
-        if record["state"] == "Complete":
-            processStatus = "success"
-        else:
-            processStatus = "permanentFail"
-        outputs = None
-        try:
-            try:
-                outc = arvados.collection.Collection(record["output"])
-                with outc.open("cwl.output.json") as f:
-                    outputs = json.load(f)
-                def keepify(path):
-                    if not path.startswith("keep:"):
-                        return "keep:%s/%s" % (record["output"], path)
-                adjustFiles(outputs, keepify)
-            except Exception as e:
-                logger.error("While getting final output object: %s", e)
-            self.arvrunner.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
-class RunnerTemplate(object):
-    """An Arvados pipeline template that invokes a CWL workflow."""
-    type_to_dataclass = {
-        'boolean': 'boolean',
-        'File': 'File',
-        'float': 'number',
-        'int': 'number',
-        'string': 'text',
-    }
-    def __init__(self, runner, tool, job_order, enable_reuse):
-        self.runner = runner
-        self.tool = tool
-        self.job = RunnerJob(
-            runner=runner,
-            tool=tool,
-            job_order=job_order,
-            enable_reuse=enable_reuse)
-    def pipeline_component_spec(self):
-        """Return a component that Workbench and a-r-p-i will understand.
-        Specifically, translate CWL input specs to Arvados pipeline
-        format, like {"dataclass":"File","value":"xyz"}.
-        """
-        spec = self.job.arvados_job_spec()
-        # Most of the component spec is exactly the same as the job
-        # spec (script, script_version, etc.).
-        # spec['script_parameters'] isn't right, though. A component
-        # spec's script_parameters hash is a translation of
-        # self.tool.tool['inputs'] with defaults/overrides taken from
-        # the job order. So we move the job parameters out of the way
-        # and build a new spec['script_parameters'].
-        job_params = spec['script_parameters']
-        spec['script_parameters'] = {}
-        for param in self.tool.tool['inputs']:
-            param = copy.deepcopy(param)
-            # Data type and "required" flag...
-            types = param['type']
-            if not isinstance(types, list):
-                types = [types]
-            param['required'] = 'null' not in types
-            non_null_types = set(types) - set(['null'])
-            if len(non_null_types) == 1:
-                the_type = [c for c in non_null_types][0]
-                dataclass = self.type_to_dataclass.get(the_type)
-                if dataclass:
-                    param['dataclass'] = dataclass
-            # Note: If we didn't figure out a single appropriate
-            # dataclass, we just left that attribute out.  We leave
-            # the "type" attribute there in any case, which might help
-            # downstream.
-            # Title and description...
-            title = param.pop('label', '')
-            descr = param.pop('description', '').rstrip('\n')
-            if title:
-                param['title'] = title
-            if descr:
-                param['description'] = descr
-            # Fill in the value from the current job order, if any.
-            param_id = shortname(param.pop('id'))
-            value = job_params.get(param_id)
-            if value is None:
-                pass
-            elif not isinstance(value, dict):
-                param['value'] = value
-            elif param.get('dataclass') == 'File' and value.get('path'):
-                param['value'] = value['path']
-            spec['script_parameters'][param_id] = param
-        spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
-        return spec
-    def save(self):
-        job_spec = self.pipeline_component_spec()
-        response = self.runner.api.pipeline_templates().create(body={
-            "components": {
-                self.job.name: job_spec,
-            },
-            "name": self.job.name,
-            "owner_uuid": self.runner.project_uuid,
-        }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
-        self.uuid = response["uuid"]
-        logger.info("Created template %s", self.uuid)
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    """Convert container-local paths to and from Keep collection ids."""
-    def __init__(self, arvrunner, referenced_files, input_basedir,
-                 collection_pattern, file_pattern, name=None, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = set()
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, collection_pattern % src[5:])
-            if "#" in src:
-                src = src[:src.index("#")]
-            if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.add((src, ab, st))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-        if uploadfiles:
-            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
-                                             num_retries=3,
-                                             fnPattern=file_pattern,
-                                             name=name,
-                                             project=arvrunner.project_uuid)
-        for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
-        self.keepdir = None
-    def reversemap(self, target):
-        if target.startswith("keep:"):
-            return (target, target)
-        elif self.keepdir and target.startswith(self.keepdir):
-            return (target, "keep:" + target[len(self.keepdir)+1:])
-        else:
-            return super(ArvPathMapper, self).reversemap(target)
+import pkg_resources  # part of setuptools
-class ArvadosCommandTool(CommandLineTool):
-    """Wrap cwltool CommandLineTool to override selected methods."""
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
-    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
-        self.arvrunner = arvrunner
-        self.crunch2 = crunch2
+import arvados
+import arvados.events
-    def makeJobRunner(self):
-        if self.crunch2:
-            return ArvadosContainer(self.arvrunner)
-        else:
-            return ArvadosJob(self.arvrunner)
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
-    def makePathMapper(self, reffiles, **kwargs):
-        if self.crunch2:
-            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                                 "/keep/%s",
-                                 "/keep/%s/%s",
-                                 **kwargs)
-        else:
-            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                                 "$(task.keep)/%s",
-                                 "$(task.keep)/%s/%s",
-                                 **kwargs)
+from cwltool.process import shortname, UnsupportedRequirement
+from arvados.api import OrderedJsonModel
+logger = logging.getLogger('arvados.cwl-runner')
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
@@ -475,7 +106,10 @@ class ArvCwlRunner(object):
             return tmpl.uuid
         if kwargs.get("submit"):
-            runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+            if self.crunch2:
+                runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+            else:
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
             # Create pipeline for local run
@@ -510,56 +144,54 @@ class ArvCwlRunner(object):
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
-        if kwargs.get("conformance_test"):
-            return cwltool.main.single_job_executor(tool, job_order, **kwargs)
+        if kwargs.get("submit"):
+            jobiter = iter((runnerjob,))
-            if kwargs.get("submit"):
-                jobiter = iter((runnerjob,))
-            else:
-                if "cwl_runner_job" in kwargs:
-                    self.uuid = kwargs.get("cwl_runner_job").get('uuid')
-                jobiter = tool.job(job_order,
-                                   self.output_callback,
-                                   docker_outdir="$(task.outdir)",
-                                   **kwargs)
-            try:
-                self.cond.acquire()
-                # Will continue to hold the lock for the duration of this code
-                # except when in cond.wait(), at which point on_message can update
-                # job state and process output callbacks.
-                for runnable in jobiter:
-                    if runnable:
-                        runnable.run(**kwargs)
-                    else:
-                        if self.jobs:
-                            self.cond.wait(1)
-                        else:
-                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
-                            break
-                while self.jobs:
-                    self.cond.wait(1)
-                events.close()
-            except UnsupportedRequirement:
-                raise
-            except:
-                if sys.exc_info()[0] is KeyboardInterrupt:
-                    logger.error("Interrupted, marking pipeline as failed")
+            if "cwl_runner_job" in kwargs:
+                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+            jobiter = tool.job(job_order,
+                               self.output_callback,
+                               docker_outdir="$(task.outdir)",
+                               **kwargs)
+        try:
+            self.cond.acquire()
+            # Will continue to hold the lock for the duration of this code
+            # except when in cond.wait(), at which point on_message can update
+            # job state and process output callbacks.
+            for runnable in jobiter:
+                if runnable:
+                    runnable.run(**kwargs)
-                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-                if self.pipeline:
-                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            finally:
-                self.cond.release()
+                    if self.jobs:
+                        self.cond.wait(1)
+                    else:
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        break
+            while self.jobs:
+                self.cond.wait(1)
+            events.close()
+        except UnsupportedRequirement:
+            raise
+        except:
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                logger.error("Interrupted, marking pipeline as failed")
+            else:
+                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        finally:
+            self.cond.release()
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+        if self.final_output is None:
+            raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+        return self.final_output
-            return self.final_output
 def versionstring():
     """Print version string of key packages for provenance and debugging."""
@@ -572,6 +204,7 @@ def versionstring():
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 def arg_parser():  # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
@@ -634,6 +267,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     return parser
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b0e2c1f..be11404 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,10 +1,15 @@
 import logging
+import json
+import os
+from cwltool.errors import WorkflowException
+from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
 import arvados.collection
-from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
 from . import done
-from cwltool.errors import WorkflowException
-from cwltool.process import UnsupportedRequirement
+from .runner import Runner
 logger = logging.getLogger('arvados.cwl-runner')
@@ -45,7 +50,7 @@ class ArvadosContainer(object):
         if self.generatefiles:
             raise UnsupportedRequirement("Generate files not supported")
-            vwd = arvados.collection.Collection()
+            vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
             container_request["task.vwd"] = {}
             for t in self.generatefiles:
                 if isinstance(self.generatefiles[t], dict):
@@ -124,3 +129,78 @@ class ArvadosContainer(object):
             self.output_callback(outputs, processStatus)
             del self.arvrunner.jobs[record["uuid"]]
+class RunnerContainer(Runner):
+    """Submit and manage a container that runs arvados-cwl-runner."""
+    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+        """Create an Arvados job specification for this workflow.
+        The returned dict can be used to create a job (i.e., passed as
+        the +body+ argument to jobs().create()), or as a component in
+        a pipeline template or pipeline instance.
+        """
+        workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
+            with jobobj.open("cwl.input.json", "w") as f:
+                json.dump(self.job_order, f, sort_keys=True, indent=4)
+            jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
+        workflowname = os.path.basename(self.tool.tool["id"])
+        workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+        jobpath = "/var/lib/cwl/job/cwl.input.json"
+        container_image = arv_docker_get_image(self.arvrunner.api,
+                                               {"dockerImageId": "arvados/jobs"},
+                                               pull_image,
+                                               self.arvrunner.project_uuid)
+        return {
+            "command": ["arvados-cwl-runner", "--local", "--crunch2", workflowpath, jobpath],
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path": "/var/spool/cwl",
+            "cwd": "/var/spool/cwl",
+            "priority": 1,
+            "state": "Committed",
+            "container_image": container_image,
+            "mounts": {
+                workflowpath: {
+                    "kind": "collection",
+                    "portable_data_hash": "%s" % workflowcollection
+                },
+                jobpath: {
+                    "kind": "collection",
+                    "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
+                },
+                "stdout": {
+                    "kind": "file",
+                    "path": "/var/spool/cwl/cwl.output.json"
+                }
+            },
+            "runtime_constraints": {
+                "vcpus": 1,
+                "ram": 1024*1024*256
+            }
+        }
+    def run(self, *args, **kwargs):
+        job_spec = self.arvados_job_spec(*args, **kwargs)
+        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+        response = self.arvrunner.api.container_requests().create(
+            body=job_spec
+        ).execute(num_retries=self.arvrunner.num_retries)
+        self.uuid = response["uuid"]
+        self.arvrunner.jobs[response["container_uuid"]] = self
+        logger.info("Submitted container %s", response["uuid"])
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88a8eeb..397b6d5 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -1,11 +1,19 @@
 import logging
 import re
-from . import done
-from .arvdocker import arv_docker_get_image
-from cwltool.process import get_feature
+import copy
+from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
+from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.load_tool import fetch_document
+from cwltool.builder import Builder
 import arvados.collection
+from .arvdocker import arv_docker_get_image
+from .runner import Runner
+from . import done
 logger = logging.getLogger('arvados.cwl-runner')
 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
@@ -164,3 +172,145 @@ class ArvadosJob(object):
             self.output_callback(outputs, processStatus)
             del self.arvrunner.jobs[record["uuid"]]
+class RunnerJob(Runner):
+    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+        """Create an Arvados job specification for this workflow.
+        The returned dict can be used to create a job (i.e., passed as
+        the +body+ argument to jobs().create()), or as a component in
+        a pipeline template or pipeline instance.
+        """
+        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+        return {
+            "script": "cwl-runner",
+            "script_version": "master",
+            "repository": "arvados",
+            "script_parameters": self.job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }
+    def run(self, *args, **kwargs):
+        job_spec = self.arvados_job_spec(*args, **kwargs)
+        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+        response = self.arvrunner.api.jobs().create(
+            body=job_spec,
+            find_or_create=self.enable_reuse
+        ).execute(num_retries=self.arvrunner.num_retries)
+        self.uuid = response["uuid"]
+        self.arvrunner.jobs[self.uuid] = self
+        logger.info("Submitted job %s", response["uuid"])
+        if kwargs.get("submit"):
+            self.pipeline = self.arvrunner.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "name": shortname(self.tool.tool["id"]),
+                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
+                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+class RunnerTemplate(object):
+    """An Arvados pipeline template that invokes a CWL workflow."""
+    type_to_dataclass = {
+        'boolean': 'boolean',
+        'File': 'File',
+        'float': 'number',
+        'int': 'number',
+        'string': 'text',
+    }
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.runner = runner
+        self.tool = tool
+        self.job = RunnerJob(
+            runner=runner,
+            tool=tool,
+            job_order=job_order,
+            enable_reuse=enable_reuse)
+    def pipeline_component_spec(self):
+        """Return a component that Workbench and a-r-p-i will understand.
+        Specifically, translate CWL input specs to Arvados pipeline
+        format, like {"dataclass":"File","value":"xyz"}.
+        """
+        spec = self.job.arvados_job_spec()
+        # Most of the component spec is exactly the same as the job
+        # spec (script, script_version, etc.).
+        # spec['script_parameters'] isn't right, though. A component
+        # spec's script_parameters hash is a translation of
+        # self.tool.tool['inputs'] with defaults/overrides taken from
+        # the job order. So we move the job parameters out of the way
+        # and build a new spec['script_parameters'].
+        job_params = spec['script_parameters']
+        spec['script_parameters'] = {}
+        for param in self.tool.tool['inputs']:
+            param = copy.deepcopy(param)
+            # Data type and "required" flag...
+            types = param['type']
+            if not isinstance(types, list):
+                types = [types]
+            param['required'] = 'null' not in types
+            non_null_types = set(types) - set(['null'])
+            if len(non_null_types) == 1:
+                the_type = [c for c in non_null_types][0]
+                dataclass = self.type_to_dataclass.get(the_type)
+                if dataclass:
+                    param['dataclass'] = dataclass
+            # Note: If we didn't figure out a single appropriate
+            # dataclass, we just left that attribute out.  We leave
+            # the "type" attribute there in any case, which might help
+            # downstream.
+            # Title and description...
+            title = param.pop('label', '')
+            descr = param.pop('description', '').rstrip('\n')
+            if title:
+                param['title'] = title
+            if descr:
+                param['description'] = descr
+            # Fill in the value from the current job order, if any.
+            param_id = shortname(param.pop('id'))
+            value = job_params.get(param_id)
+            if value is None:
+                pass
+            elif not isinstance(value, dict):
+                param['value'] = value
+            elif param.get('dataclass') == 'File' and value.get('path'):
+                param['value'] = value['path']
+            spec['script_parameters'][param_id] = param
+        spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+        return spec
+    def save(self):
+        job_spec = self.pipeline_component_spec()
+        response = self.runner.api.pipeline_templates().create(body={
+            "components": {
+                self.job.name: job_spec,
+            },
+            "name": self.job.name,
+            "owner_uuid": self.runner.project_uuid,
+        }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
+        self.uuid = response["uuid"]
+        logger.info("Created template %s", self.uuid)
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
new file mode 100644
index 0000000..a2c5c9e
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -0,0 +1,30 @@
+from cwltool.draft2tool import CommandLineTool
+from .arvjob import ArvadosJob
+from .arvcontainer import ArvadosContainer
+from .pathmapper import ArvPathMapper
+class ArvadosCommandTool(CommandLineTool):
+    """Wrap cwltool CommandLineTool to override selected methods."""
+    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+        self.arvrunner = arvrunner
+        self.crunch2 = crunch2
+    def makeJobRunner(self):
+        if self.crunch2:
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
+    def makePathMapper(self, reffiles, **kwargs):
+        if self.crunch2:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+        else:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "$(task.keep)/%s",
+                                 "$(task.keep)/%s/%s",
+                                 **kwargs)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
new file mode 100644
index 0000000..c911895
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -0,0 +1,59 @@
+import fnmatch
+class CollectionFsAccess(cwltool.process.StdFsAccess):
+    """Implement the cwltool FsAccess interface for Arvados Collections."""
+    def __init__(self, basedir):
+        super(CollectionFsAccess, self).__init__(basedir)
+        self.collections = {}
+    def get_collection(self, path):
+        p = path.split("/")
+        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
+            pdh = p[0][5:]
+            if pdh not in self.collections:
+                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+            return (self.collections[pdh], "/".join(p[1:]))
+        else:
+            return (None, path)
+    def _match(self, collection, patternsegments, parent):
+        if not patternsegments:
+            return []
+        if not isinstance(collection, arvados.collection.RichCollectionBase):
+            return []
+        ret = []
+        # iterate over the files and subcollections in 'collection'
+        for filename in collection:
+            if patternsegments[0] == '.':
+                # Pattern contains something like "./foo" so just shift
+                # past the "./"
+                ret.extend(self._match(collection, patternsegments[1:], parent))
+            elif fnmatch.fnmatch(filename, patternsegments[0]):
+                cur = os.path.join(parent, filename)
+                if len(patternsegments) == 1:
+                    ret.append(cur)
+                else:
+                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
+        return ret
+    def glob(self, pattern):
+        collection, rest = self.get_collection(pattern)
+        patternsegments = rest.split("/")
+        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
+    def open(self, fn, mode):
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return collection.open(rest, mode)
+        else:
+            return open(self._abs(fn), mode)
+    def exists(self, fn):
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return collection.exists(rest)
+        else:
+            return os.path.exists(self._abs(fn))
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
new file mode 100644
index 0000000..9538a91
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -0,0 +1,55 @@
+import re
+import arvados.commands.run
+import arvados.collection
+import cwltool.pathmapper
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+    """Convert container-local paths to and from Keep collection ids."""
+    def __init__(self, arvrunner, referenced_files, input_basedir,
+                 collection_pattern, file_pattern, name=None, **kwargs):
+        self._pathmap = arvrunner.get_uploaded()
+        uploadfiles = set()
+        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+        for src in referenced_files:
+            if isinstance(src, basestring) and pdh_path.match(src):
+                self._pathmap[src] = (src, collection_pattern % src[5:])
+            if "#" in src:
+                src = src[:src.index("#")]
+            if src not in self._pathmap:
+                ab = cwltool.pathmapper.abspath(src, input_basedir)
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
+                if kwargs.get("conformance_test"):
+                    self._pathmap[src] = (src, ab)
+                elif isinstance(st, arvados.commands.run.UploadFile):
+                    uploadfiles.add((src, ab, st))
+                elif isinstance(st, arvados.commands.run.ArvFile):
+                    self._pathmap[src] = (ab, st.fn)
+                else:
+                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+        if uploadfiles:
+            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
+                                             arvrunner.api,
+                                             dry_run=kwargs.get("dry_run"),
+                                             num_retries=3,
+                                             fnPattern=file_pattern,
+                                             name=name,
+                                             project=arvrunner.project_uuid)
+        for src, ab, st in uploadfiles:
+            arvrunner.add_uploaded(src, (ab, st.fn))
+            self._pathmap[src] = (ab, st.fn)
+        self.keepdir = None
+    def reversemap(self, target):
+        if target.startswith("keep:"):
+            return (target, target)
+        elif self.keepdir and target.startswith(self.keepdir):
+            return (target, "keep:" + target[len(self.keepdir)+1:])
+        else:
+            return super(ArvPathMapper, self).reversemap(target)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
new file mode 100644
index 0000000..0cc23ab
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -0,0 +1,98 @@
+import os
+import urlparse
+from functools import partial
+from cwltool.draft2tool import CommandLineTool
+import cwltool.workflow
+from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.load_tool import fetch_document
+from .arvdocker import arv_docker_get_image
+from .pathmapper import ArvPathMapper
+class Runner(object):
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.arvrunner = runner
+        self.tool = tool
+        self.job_order = job_order
+        self.running = False
+        self.enable_reuse = enable_reuse
+    def update_pipeline_component(self, record):
+        pass
+    def upload_docker(self, tool):
+        if isinstance(tool, CommandLineTool):
+            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+            if docker_req:
+                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+        elif isinstance(tool, cwltool.workflow.Workflow):
+            for s in tool.steps:
+                self.upload_docker(s.embedded_tool)
+    def arvados_job_spec(self, *args, **kwargs):
+        self.upload_docker(self.tool)
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(self.tool.tool["id"])
+        self.name = os.path.basename(self.tool.tool["id"])
+        def visitFiles(files, path):
+            files.add(path)
+            return path
+        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+        def loadref(b, u):
+            return document_loader.fetch(urlparse.urljoin(b, u))
+        sc = scandeps(uri, workflowobj,
+                      set(("$import", "run")),
+                      set(("$include", "$schemas", "path")),
+                      loadref)
+        adjustFiles(sc, partial(visitFiles, workflowfiles))
+        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=self.name,
+                                       **kwargs)
+        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(self.job_order.get("id", "#")),
+                                  **kwargs)
+        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+        if "id" in self.job_order:
+            del self.job_order["id"]
+        return workflowmapper
+    def done(self, record):
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+        outputs = None
+        try:
+            try:
+                outc = arvados.collection.Collection(record["output"])
+                with outc.open("cwl.output.json") as f:
+                    outputs = json.load(f)
+                def keepify(path):
+                    if not path.startswith("keep:"):
+                        return "keep:%s/%s" % (record["output"], path)
+                adjustFiles(outputs, keepify)
+            except Exception as e:
+                logger.error("While getting final output object: %s", e)
+            self.arvrunner.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_container.py
similarity index 64%
copy from sdk/cwl/tests/test_job.py
copy to sdk/cwl/tests/test_container.py
index dba65b0..0237e80 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_container.py
@@ -10,14 +10,20 @@ if not os.getenv('ARVADOS_DEBUG'):
-class TestJob(unittest.TestCase):
+class TestContainer(unittest.TestCase):
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
-    def test_run(self):
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_run(self, keepdocker):
         runner = mock.MagicMock()
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
         tool = {
@@ -25,46 +31,44 @@ class TestJob(unittest.TestCase):
             "outputs": [],
             "baseCommand": "ls"
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names, basedir="")
         arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run"):
-            runner.api.jobs().create.assert_called_with(
+            runner.api.container_requests().create.assert_called_with(
-                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                    'runtime_constraints': {},
-                    'script_parameters': {
-                        'tasks': [{
-                            'task.env': {'TMPDIR': '$(task.tmpdir)'},
-                            'command': ['ls']
-                        }],
+                    'environment': {
+                        'TMPDIR': '/tmp'
-                    'script_version': 'master',
-                    'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                    'repository': 'arvados',
-                    'script': 'crunchrunner',
+                    'name': 'test_run',
                     'runtime_constraints': {
-                        'docker_image': 'arvados/jobs',
-                        'min_cores_per_node': 1,
-                        'min_ram_mb_per_node': 1024,
-                        'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
-                    }
-                },
-                find_or_create=True,
-                filters=[['repository', '=', 'arvados'],
-                         ['script', '=', 'crunchrunner'],
-                         ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                         ['docker_image_locator', 'in docker', 'arvados/jobs']]
-            )
+                        'vcpus': 1,
+                        'ram': 1073741824
+                    }, 'priority': 1,
+                    'mounts': {
+                        '/var/spool/cwl': {'kind': 'tmp'}
+                    },
+                    'state': 'Committed',
+                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                    'output_path': '/var/spool/cwl',
+                    'container_image': '99999999999999999999999999999993+99',
+                    'command': ['ls'],
+                    'cwd': '/var/spool/cwl'
+                })
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
-    def test_resource_requirements(self):
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_resource_requirements(self, keepdocker):
         runner = mock.MagicMock()
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
         tool = {
             "inputs": [],
             "outputs": [],
@@ -76,36 +80,31 @@ class TestJob(unittest.TestCase):
             "baseCommand": "ls"
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names)
         arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements"):
-        runner.api.jobs().create.assert_called_with(
+        runner.api.container_requests().create.assert_called_with(
-                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                'runtime_constraints': {},
-                'script_parameters': {
-                    'tasks': [{
-                        'task.env': {'TMPDIR': '$(task.tmpdir)'},
-                        'command': ['ls']
-                    }]
-            },
-            'script_version': 'master',
-                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                'repository': 'arvados',
-                'script': 'crunchrunner',
+                'environment': {
+                    'TMPDIR': '/tmp'
+                },
+                'name': 'test_resource_requirements',
                 'runtime_constraints': {
-                    'docker_image': 'arvados/jobs',
-                    'min_cores_per_node': 3,
-                    'min_ram_mb_per_node': 3000,
-                    'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
-                }
-            },
-            find_or_create=True,
-            filters=[['repository', '=', 'arvados'],
-                     ['script', '=', 'crunchrunner'],
-                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                     ['docker_image_locator', 'in docker', 'arvados/jobs']])
+                    'vcpus': 3,
+                    'ram': 3145728000
+                }, 'priority': 1,
+                'mounts': {
+                    '/var/spool/cwl': {'kind': 'tmp'}
+                },
+                'state': 'Committed',
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                'output_path': '/var/spool/cwl',
+                'container_image': '99999999999999999999999999999993+99',
+                'command': ['ls'],
+                'cwd': '/var/spool/cwl'
+            })
     def test_done(self, col):
@@ -121,7 +120,7 @@ class TestJob(unittest.TestCase):
         api.collections().list().execute.side_effect = ({"items": []},
                                                         {"items": [{"manifest_text": "XYZ"}]})
-        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob = arvados_cwl.ArvadosContainer(runner)
         arvjob.name = "testjob"
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
@@ -163,7 +162,7 @@ class TestJob(unittest.TestCase):
         col().open.return_value = []
         api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
-        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob = arvados_cwl.ArvadosContainer(runner)
         arvjob.name = "testjob"
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index dba65b0..701afcb 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -25,7 +25,7 @@ class TestJob(unittest.TestCase):
             "outputs": [],
             "baseCommand": "ls"
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names, basedir="")
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir=""):
@@ -76,7 +76,7 @@ class TestJob(unittest.TestCase):
             "baseCommand": "ls"
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names)
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir=""):
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 38741eb..48e6ed2 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -30,7 +30,7 @@ def stubs(func):
             return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
         stubs.KeepClient().put.side_effect = putstub
-        stubs.keepdocker.return_value = True
+        stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
         stubs.api = mock.MagicMock()
@@ -44,12 +44,28 @@ def stubs(func):
         }, {
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "portable_data_hash": "99999999999999999999999999999992+99",
+        },
+        {
+            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
+            "portable_data_hash": "99999999999999999999999999999994+99",
+            "manifest_text": ""
+        stubs.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
         stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         stubs.api.jobs().create().execute.return_value = {
             "uuid": stubs.expect_job_uuid,
             "state": "Queued",
+        stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+        stubs.api.container_requests().create().execute.return_value = {
+            "uuid": stubs.expect_container_request_uuid,
+            "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
         stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
         stubs.api.pipeline_templates().create().execute.return_value = {
             "uuid": stubs.expect_pipeline_template_uuid,
@@ -70,6 +86,35 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
+        stubs.expect_container_spec = {
+            'priority': 1,
+            'mounts': {
+                'stdout': {
+                    'path': '/var/spool/cwl/cwl.output.json',
+                    'kind': 'file'
+                },
+                '/var/lib/cwl/workflow/submit_wf.cwl': {
+                    'portable_data_hash': '999999999999999999999999991+99',
+                    'kind': 'collection'
+                },
+                '/var/lib/cwl/job/cwl.input.json': {
+                    'portable_data_hash': '102435082199e5229f99b01165b67096+60/cwl.input.json',
+                    'kind': 'collection'
+                }
+            },
+            'state': 'Committed',
+            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+            'command': ['arvados-cwl-runner', '--local', '--crunch2', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+            'name': 'submit_wf.cwl',
+            'container_image': '99999999999999999999999999999993+99',
+            'output_path': '/var/spool/cwl',
+            'cwd': '/var/spool/cwl',
+            'runtime_constraints': {
+                'vcpus': 1,
+                'ram': 268435456
+            }
+        }
         return func(self, stubs, *args, **kwargs)
     return wrapped
@@ -128,6 +173,41 @@ class TestSubmit(unittest.TestCase):
+    @stubs
+    def test_submit_container(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--crunch2", "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+        stubs.api.collections().create.assert_has_calls([
+            mock.call(),
+            mock.call(body={
+                'manifest_text':
+                './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
+                '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
+                'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'name': 'submit_wf.cwl',
+            }, ensure_unique_name=True),
+            mock.call().execute(),
+            mock.call(body={
+                'manifest_text':
+                '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'name': '#',
+            }, ensure_unique_name=True),
+            mock.call().execute()])
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
 class TestCreateTemplate(unittest.TestCase):
diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock
index 7be4e0f..3715718 100644
--- a/services/api/Gemfile.lock
+++ b/services/api/Gemfile.lock
@@ -257,6 +257,3 @@ DEPENDENCIES
   uglifier (>= 1.0.3)
-   1.12.1

commit 92ce0b4ffb51c957ddc0a6e36f7dfba2c819b38f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 10:36:05 2016 -0400

    8442: Fix message

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 17fe8cb..b0e2c1f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -43,7 +43,7 @@ class ArvadosContainer(object):
         if self.generatefiles:
-            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+            raise UnsupportedRequirement("Generate files not supported")
             vwd = arvados.collection.Collection()
             container_request["task.vwd"] = {}

commit bea445d5d02adb035a126582e5c0358ec5db5c75
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 15:16:09 2016 -0400

    8442: raise UnsupportedRequirement for unsupported features in the conformance
    tests.  Bump cwltool dependency.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 171d92d..af74808 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ from .arvcontainer import ArvadosContainer
 from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
 from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
@@ -477,7 +477,7 @@ class ArvCwlRunner(object):
         if kwargs.get("submit"):
             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
@@ -543,11 +543,13 @@ class ArvCwlRunner(object):
+            except UnsupportedRequirement:
+                raise
                 if sys.exc_info()[0] is KeyboardInterrupt:
                     logger.error("Interrupted, marking pipeline as failed")
-                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
                 if self.pipeline:
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0cb885f..17fe8cb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -4,6 +4,7 @@ from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
 from . import done
 from cwltool.errors import WorkflowException
+from cwltool.process import UnsupportedRequirement
 logger = logging.getLogger('arvados.cwl-runner')
@@ -42,6 +43,8 @@ class ArvadosContainer(object):
         if self.generatefiles:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
             vwd = arvados.collection.Collection()
             container_request["task.vwd"] = {}
             for t in self.generatefiles:
@@ -60,13 +63,12 @@ class ArvadosContainer(object):
         if self.environment:
-        # TODO, not supported by crunchv2 yet
-        #if self.stdin:
-        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+        if self.stdin:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
-                                "path": self.stdout}
+                                "path": "/var/spool/cwl/%s" % (self.stdout)}
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 591bdde..b1ff7f3 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
-          'cwltool==1.0.20160519182434',
+          'cwltool==1.0.20160609160402',

commit a82231ef8bdf251a5f243461a4fafaf3b3ad5579
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 08:21:34 2016 -0400

    8442: Setting up mount points works.  Capturing output works.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 387bc4a..171d92d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
 from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
@@ -36,10 +37,6 @@ from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -98,194 +95,6 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
             return os.path.exists(self._abs(fn))
-class ArvadosJob(object):
-    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-    def __init__(self, runner):
-        self.arvrunner = runner
-        self.running = False
-    def run(self, dry_run=False, pull_image=True, **kwargs):
-        script_parameters = {
-            "command": self.command_line
-        }
-        runtime_constraints = {}
-        if self.generatefiles:
-            vwd = arvados.collection.Collection()
-            script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
-        if self.environment:
-            script_parameters["task.env"].update(self.environment)
-        if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-        if self.stdout:
-            script_parameters["task.stdout"] = self.stdout
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-        if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-        else:
-            runtime_constraints["docker_image"] = "arvados/jobs"
-        resources = self.builder.resources
-        if resources is not None:
-            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
-            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
-            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-        filters = [["repository", "=", "arvados"],
-                   ["script", "=", "crunchrunner"],
-                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
-        if not self.arvrunner.ignore_docker_for_reuse:
-            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-        try:
-            response = self.arvrunner.api.jobs().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "script": "crunchrunner",
-                    "repository": "arvados",
-                    "script_version": "master",
-                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                    "script_parameters": {"tasks": [script_parameters]},
-                    "runtime_constraints": runtime_constraints
-                },
-                filters=filters,
-                find_or_create=kwargs.get("enable_reuse", True)
-            ).execute(num_retries=self.arvrunner.num_retries)
-            self.arvrunner.jobs[response["uuid"]] = self
-            self.update_pipeline_component(response)
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
-        except Exception as e:
-            logger.error("Got error %s" % str(e))
-            self.output_callback({}, "permanentFail")
-    def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                 body={
-                                                                                    "components": self.arvrunner.pipeline["components"]
-                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
-                        body={
-                            "components": components
-                        }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
-    def done(self, record):
-        try:
-            self.update_pipeline_component(record)
-        except:
-            pass
-        try:
-            if record["state"] == "Complete":
-                processStatus = "success"
-            else:
-                processStatus = "permanentFail"
-            try:
-                outputs = {}
-                if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
-            except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
-                processStatus = "permanentFail"
-            except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
-                processStatus = "permanentFail"
-            self.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
 class RunnerJob(object):
@@ -574,10 +383,16 @@ class ArvadosCommandTool(CommandLineTool):
             return ArvadosJob(self.arvrunner)
     def makePathMapper(self, reffiles, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                             "$(task.keep)/%s",
-                             "$(task.keep)/%s/%s",
-                             **kwargs)
+        if self.crunch2:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+        else:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "$(task.keep)/%s",
+                                 "$(task.keep)/%s/%s",
+                                 **kwargs)
 class ArvCwlRunner(object):
@@ -676,7 +491,10 @@ class ArvCwlRunner(object):
                 return runnerjob.uuid
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        if self.crunch2:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
+        else:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 2179398..0cb885f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,7 +1,9 @@
 import logging
 import arvados.collection
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
+from . import done
+from cwltool.errors import WorkflowException
 logger = logging.getLogger('arvados.cwl-runner')
@@ -12,6 +14,9 @@ class ArvadosContainer(object):
         self.arvrunner = runner
         self.running = False
+    def update_pipeline_component(self, r):
+        pass
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
             "command": self.command_line,
@@ -26,13 +31,15 @@ class ArvadosContainer(object):
         mounts = {
             "/var/spool/cwl": {
                 "kind": "tmp"
-            },
-            "/tmp": {
-                "kind": "tmp"
-        # TODO mount normal inputs...
+        for f in self.pathmapper.files():
+            _, p = self.pathmapper.mapper(f)
+            mounts[p] = {
+                "kind": "collection",
+                "portable_data_hash": p[6:]
+            }
         if self.generatefiles:
             vwd = arvados.collection.Collection()
@@ -53,7 +60,7 @@ class ArvadosContainer(object):
         if self.environment:
-        # TODO, not supported
+        # TODO, not supported by crunchv2 yet
         #if self.stdin:
         #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
@@ -84,11 +91,11 @@ class ArvadosContainer(object):
-            self.arvrunner.jobs[response["uuid"]] = self
+            self.arvrunner.jobs[response["container_uuid"]] = self
-            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+            logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
-            if response["state"] in ("Complete", "Cancelled"):
+            if response["state"] == "Final":
         except Exception as e:
             logger.error("Got error %s" % str(e))
@@ -104,69 +111,9 @@ class ArvadosContainer(object):
                 outputs = {}
                 if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
+                    outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
             except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting job outputs:")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
new file mode 100644
index 0000000..88a8eeb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -0,0 +1,166 @@
+import logging
+import re
+from . import done
+from .arvdocker import arv_docker_get_image
+from cwltool.process import get_feature
+from cwltool.errors import WorkflowException
+import arvados.collection
+logger = logging.getLogger('arvados.cwl-runner')
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+class ArvadosJob(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        script_parameters = {
+            "command": self.command_line
+        }
+        runtime_constraints = {}
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            script_parameters["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            for t in self.generatefiles:
+                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+        if self.environment:
+            script_parameters["task.env"].update(self.environment)
+        if self.stdin:
+            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+        if self.stdout:
+            script_parameters["task.stdout"] = self.stdout
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if docker_req and kwargs.get("use_container") is not False:
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+        else:
+            runtime_constraints["docker_image"] = "arvados/jobs"
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+        filters = [["repository", "=", "arvados"],
+                   ["script", "=", "crunchrunner"],
+                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+        if not self.arvrunner.ignore_docker_for_reuse:
+            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+        try:
+            response = self.arvrunner.api.jobs().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "script": "crunchrunner",
+                    "repository": "arvados",
+                    "script_version": "master",
+                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                    "script_parameters": {"tasks": [script_parameters]},
+                    "runtime_constraints": runtime_constraints
+                },
+                filters=filters,
+                find_or_create=kwargs.get("enable_reuse", True)
+            ).execute(num_retries=self.arvrunner.num_retries)
+            self.arvrunner.jobs[response["uuid"]] = self
+            self.update_pipeline_component(response)
+            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+            if response["state"] in ("Complete", "Failed", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+    def update_pipeline_component(self, record):
+        if self.arvrunner.pipeline:
+            self.arvrunner.pipeline["components"][self.name] = {"job": record}
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+                                                                                 body={
+                                                                                    "components": self.arvrunner.pipeline["components"]
+                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
+        if self.arvrunner.uuid:
+            try:
+                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                if job:
+                    components = job["components"]
+                    components[self.name] = record["uuid"]
+                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+                        body={
+                            "components": components
+                        }).execute(num_retries=self.arvrunner.num_retries)
+            except Exception as e:
+                logger.info("Error adding to components: %s", e)
+    def done(self, record):
+        try:
+            self.update_pipeline_component(record)
+        except:
+            pass
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+            outputs = {}
+            try:
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+                    outputs = done.done(self, record, tmpdir, outdir, keepdir)
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
new file mode 100644
index 0000000..8a6fc9d
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -0,0 +1,38 @@
+def done(self, record, tmpdir, outdir, keepdir):
+    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+    # check if collection already exists with same owner, name and content
+    collection_exists = self.arvrunner.api.collections().list(
+        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                 ['portable_data_hash', '=', record["output"]],
+                 ["name", "=", colname]]
+    ).execute(num_retries=self.arvrunner.num_retries)
+    if not collection_exists["items"]:
+        # Create a collection located in the same project as the
+        # pipeline with the contents of the output.
+        # First, get output record.
+        collections = self.arvrunner.api.collections().list(
+            limit=1,
+            filters=[['portable_data_hash', '=', record["output"]]],
+            select=["manifest_text"]
+        ).execute(num_retries=self.arvrunner.num_retries)
+        if not collections["items"]:
+            raise WorkflowException(
+                "Job output '%s' cannot be found on API server" % (
+                    record["output"]))
+        # Create new collection in the parent project
+        # with the output contents.
+        self.arvrunner.api.collections().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": colname,
+            "portable_data_hash": record["output"],
+            "manifest_text": collections["items"][0]["manifest_text"]
+        }, ensure_unique_name=True).execute(
+            num_retries=self.arvrunner.num_retries)
+    self.builder.outdir = outdir
+    self.builder.pathmapper.keepdir = keepdir
+    return self.collect_outputs("keep:" + record["output"])

commit ad8c74a419972e173bd5fd75595d9893b58fc154
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:45:00 2016 -0400

    8442: Return PDH for Docker container.  Working on setting up mount points.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5a1ff07..387bc4a 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,9 +40,6 @@ tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4ea63a0..2179398 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -23,7 +23,16 @@ class ArvadosContainer(object):
             "state": "Committed"
         runtime_constraints = {}
-        mounts = {}
+        mounts = {
+            "/var/spool/cwl": {
+                "kind": "tmp"
+            },
+            "/tmp": {
+                "kind": "tmp"
+            }
+        }
+        # TODO mount normal inputs...
         if self.generatefiles:
             vwd = arvados.collection.Collection()
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 8f76bbf..253df99 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -26,6 +26,11 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-    # XXX return PDH instead
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+    #return dockerRequirement["dockerImageId"]
-    return dockerRequirement["dockerImageId"]
+    pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+    return pdh

commit 01a3a3cd88f56635a0f5d7938c70a3884c6b85bb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:15:58 2016 -0400

    8442: Submit containers Work in progess.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0878998..5a1ff07 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -27,6 +27,8 @@ import threading
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
+from .arvcontainer import ArvadosContainer
+from .arvdocker import arv_docker_get_image
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
 from arvados.api import OrderedJsonModel
@@ -39,31 +41,6 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
-    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
-    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
-        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-    sp = dockerRequirement["dockerImageId"].split(":")
-    image_name = sp[0]
-    image_tag = sp[1] if len(sp) > 1 else None
-    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                            image_name=image_name,
-                                                            image_tag=image_tag)
-    if not images:
-        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = ["--project-uuid="+project_uuid, image_name]
-        if image_tag:
-            args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-    # XXX return PDH instead
-    return dockerRequirement["dockerImageId"]
 class CollectionFsAccess(cwltool.process.StdFsAccess):
@@ -588,12 +565,13 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
+    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
+        self.crunch2 = crunch2
     def makeJobRunner(self):
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             return ArvadosContainer(self.arvrunner)
             return ArvadosJob(self.arvrunner)
@@ -609,7 +587,7 @@ class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
     complete, and report output."""
-    def __init__(self, api_client):
+    def __init__(self, api_client, crunch2):
         self.api = api_client
         self.jobs = {}
         self.lock = threading.Lock()
@@ -618,10 +596,11 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
+        self.crunch2 = crunch2
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
@@ -709,7 +688,7 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
@@ -849,7 +828,7 @@ def main(args, stdout, stderr, api_client=None):
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client)
+        runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
     except Exception as e:
         return 1
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index fe0f7ca..4ea63a0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,3 +1,10 @@
+import logging
+import arvados.collection
+from cwltool.process import get_feature
+from .arvdocker import arv_docker_get_image
+logger = logging.getLogger('arvados.cwl-runner')
 class ArvadosContainer(object):
     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
@@ -7,12 +14,13 @@ class ArvadosContainer(object):
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
-            "command": self.command_line
+            "command": self.command_line,
             "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
-            "output_path", "/var/spool/cwl",
-            "cwd", "/var/spool/cwl",
-            "priority": 1
+            "output_path": "/var/spool/cwl",
+            "cwd": "/var/spool/cwl",
+            "priority": 1,
+            "state": "Committed"
         runtime_constraints = {}
         mounts = {}
@@ -46,7 +54,7 @@ class ArvadosContainer(object):
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
-            docker_req = "arvados/jobs"
+            docker_req = {"dockerImageId": "arvados/jobs"}
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
new file mode 100644
index 0000000..8f76bbf
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -0,0 +1,31 @@
+import logging
+import cwltool.docker
+import arvados.commands.keepdocker
+logger = logging.getLogger('arvados.cwl-runner')
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+    sp = dockerRequirement["dockerImageId"].split(":")
+    image_name = sp[0]
+    image_tag = sp[1] if len(sp) > 1 else None
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+    if not images:
+        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+        args = ["--project-uuid="+project_uuid, image_name]
+        if image_tag:
+            args.append(image_tag)
+        logger.info("Uploading Docker image %s", ":".join(args[1:]))
+        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+    # XXX return PDH instead
+    return dockerRequirement["dockerImageId"]

commit b70924ee743d537a0c2c31202b9a730dea555856
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:54:35 2016 -0400

    8442: Add --crunch1/--crunch2 switch

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index bf7ccc0..0878998 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -824,6 +824,15 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--crunch1", action="store_false",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v1 Jobs API")
+    exgroup.add_argument("--crunch2", action="store_true",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v2 Containers API")
     parser.add_argument("workflow", type=str, nargs="?", default=None)
     parser.add_argument("job_order", nargs=argparse.REMAINDER)

commit 6cd1dd7a2a55a80dd207b70fbb10a72e68bc7ea4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:24:50 2016 -0400

    8442: CWL create crunch2 containers WIP

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 371dd4f..bf7ccc0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+    # XXX return PDH instead
     return dockerRequirement["dockerImageId"]
@@ -591,7 +593,10 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
     def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
+        if kwargs.get("crunch2"):
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
     def makePathMapper(self, reffiles, **kwargs):
         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -704,8 +709,12 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-        kwargs["outdir"] = "$(task.outdir)"
-        kwargs["tmpdir"] = "$(task.tmpdir)"
+        if kwargs.get("crunch2"):
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644
index 0000000..fe0f7ca
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        container_request = {
+            "command": self.command_line
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path", "/var/spool/cwl",
+            "cwd", "/var/spool/cwl",
+            "priority": 1
+        }
+        runtime_constraints = {}
+        mounts = {}
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            container_request["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            # TODO
+            # for t in self.generatefiles:
+            #     container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+        container_request["environment"] = {"TMPDIR": "/tmp"}
+        if self.environment:
+            container_request["environment"].update(self.environment)
+        # TODO, not supported
+        #if self.stdin:
+        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+        if self.stdout:
+            mounts["stdout"] = {"kind": "file",
+                                "path": self.stdout}
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if not docker_req:
+            docker_req = "arvados/jobs"
+        container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+                                                                     docker_req,
+                                                                     pull_image,
+                                                                     self.arvrunner.project_uuid)
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["vcpus"] = resources.get("cores", 1)
+            runtime_constraints["ram"] = resources.get("ram") * 2**20
+            #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+        container_request["mounts"] = mounts
+        container_request["runtime_constraints"] = runtime_constraints
+        try:
+            response = self.arvrunner.api.container_requests().create(
+                body=container_request
+            ).execute(num_retries=self.arvrunner.num_retries)
+            self.arvrunner.jobs[response["uuid"]] = self
+            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+            if response["state"] in ("Complete", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+    def done(self, record):
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+            try:
+                outputs = {}
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
+                        # First, get output record.
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
+                        ).execute(num_retries=self.arvrunner.num_retries)
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
+                    outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]



More information about the arvados-commits mailing list