[ARVADOS] created: 5d80321ca20152859653a24ce1ddfe85d40fcb3b

Git user git at public.curoverse.com
Tue Jun 14 16:21:09 EDT 2016


        at  5d80321ca20152859653a24ce1ddfe85d40fcb3b (commit)


commit 5d80321ca20152859653a24ce1ddfe85d40fcb3b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:42:37 2016 -0400

    8442: more import fixups, import workflow file correctly.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c80896..414ce63 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[workflowcollection.index('/')]
+        workflowcollection = workflowcollection[:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
 
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -170,7 +170,7 @@ class RunnerContainer(Runner):
             "state": "Committed",
             "container_image": container_image,
             "mounts": {
-                workflowpath: {
+                "/var/lib/cwl/workflow": {
                     "kind": "collection",
                     "portable_data_hash": "%s" % workflowcollection
                 },
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index d48c93a..28b0fee 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,6 +1,11 @@
 import fnmatch
+import os
+
 import cwltool.process
 
+import arvados.util
+import arvados.collection
+
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 

commit 5a2c15eef8b984c03628e74266e08d9118266e79
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:28:45 2016 -0400

    8442: import fixups

diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 0cc23ab..f754348 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -1,15 +1,21 @@
 import os
 import urlparse
 from functools import partial
+import logging
+import json
 
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import get_feature, scandeps, adjustFiles
 from cwltool.load_tool import fetch_document
 
+import arvados.collection
+
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 
+logger = logging.getLogger('arvados.cwl-runner')
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner

commit c05ed1b87e4e792fba00edb0372ce29caa9b4453
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:22:46 2016 -0400

    8442: Introduce a RuntimeConstraints struct instead of using a map

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 54d596f..c0e4f14 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -31,13 +31,19 @@ type apiClientAuthorizationList struct {
 	Items []apiClientAuthorization `json:"items"`
 }
 
+type RuntimeConstraints struct {
+	API   *bool `json:"API"`
+	Ram   int64 `json:"ram"`
+	Vcpus int64 `json:"vcpus"`
+}
+
 // Represents an Arvados container record
 type Container struct {
-	UUID               string           `json:"uuid"`
-	State              string           `json:"state"`
-	Priority           int              `json:"priority"`
-	RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-	LockedByUUID       string           `json:"locked_by_uuid"`
+	UUID               string             `json:"uuid"`
+	State              string             `json:"state"`
+	Priority           int                `json:"priority"`
+	RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+	LockedByUUID       string             `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f718fbc..9b2cf0c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -70,11 +70,11 @@ func doMain() error {
 
 // sbatchCmd
 func sbatchFunc(container dispatch.Container) *exec.Cmd {
-	memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+	memPerCPU := math.Ceil((float64(container.RuntimeConstraints.Ram)) / (float64(container.RuntimeConstraints.Vcpus * 1048576)))
 	return exec.Command("sbatch", "--share", "--parsable",
 		fmt.Sprintf("--job-name=%s", container.UUID),
 		fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
-		fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+		fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints.Vcpus)),
 		fmt.Sprintf("--priority=%d", container.Priority))
 }
 

commit 8d865875100f11748d8d862c753119f8002ddb4a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 16:17:45 2016 -0400

    8442: Debugging container --submit with crunch2

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 136d5ae..ba26816 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,7 @@ import arvados.events
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
 
 from cwltool.process import shortname, UnsupportedRequirement
 from arvados.api import OrderedJsonModel
@@ -184,6 +185,9 @@ class ArvCwlRunner(object):
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if runnerjob and self.crunch2:
+                self.api.container_requests().update(uuid=runnerjob.uuid,
+                                                     body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
 
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index be11404..4c80896 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
         workflowname = os.path.basename(self.tool.tool["id"])
         workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
         workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+        workflowcollection = workflowcollection[workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
 
         container_image = arv_docker_get_image(self.arvrunner.api,
@@ -181,11 +181,16 @@ class RunnerContainer(Runner):
                 "stdout": {
                     "kind": "file",
                     "path": "/var/spool/cwl/cwl.output.json"
+                },
+                "/var/spool/cwl": {
+                    "kind": "collection",
+                    "writable": True
                 }
             },
             "runtime_constraints": {
                 "vcpus": 1,
-                "ram": 1024*1024*256
+                "ram": 1024*1024*256,
+                "API": True
             }
         }
 
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index c911895..d48c93a 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,4 +1,5 @@
 import fnmatch
+import cwltool.process
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""

commit 3bd797155c7631d75c76069c427f24cb0e1e1413
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 15:45:29 2016 -0400

    8442: Adding --submit support with --crunch2.  General refactoring into more/smaller files.

diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock
index 1b177ec..2618e47 100644
--- a/apps/workbench/Gemfile.lock
+++ b/apps/workbench/Gemfile.lock
@@ -309,6 +309,3 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
-
-BUNDLED WITH
-   1.12.1
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index af74808..136d5ae 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -3,397 +3,28 @@
 # Implement cwl-runner interface for submitting and running jobs on Arvados.
 
 import argparse
-import arvados
-import arvados.collection
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.events
-import arvados.util
-import copy
-import cwltool.docker
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
-from cwltool.errors import WorkflowException
-import cwltool.main
-import cwltool.workflow
-import fnmatch
-from functools import partial
-import json
 import logging
 import os
-import pkg_resources  # part of setuptools
-import re
 import sys
 import threading
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-import urlparse
-from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
-from .arvdocker import arv_docker_get_image
-
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
-from arvados.api import OrderedJsonModel
-
-logger = logging.getLogger('arvados.cwl-runner')
-logger.setLevel(logging.INFO)
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
-    """Implement the cwltool FsAccess interface for Arvados Collections."""
-
-    def __init__(self, basedir):
-        super(CollectionFsAccess, self).__init__(basedir)
-        self.collections = {}
-
-    def get_collection(self, path):
-        p = path.split("/")
-        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
-            pdh = p[0][5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
-            return (self.collections[pdh], "/".join(p[1:]))
-        else:
-            return (None, path)
-
-    def _match(self, collection, patternsegments, parent):
-        if not patternsegments:
-            return []
-
-        if not isinstance(collection, arvados.collection.RichCollectionBase):
-            return []
-
-        ret = []
-        # iterate over the files and subcollections in 'collection'
-        for filename in collection:
-            if patternsegments[0] == '.':
-                # Pattern contains something like "./foo" so just shift
-                # past the "./"
-                ret.extend(self._match(collection, patternsegments[1:], parent))
-            elif fnmatch.fnmatch(filename, patternsegments[0]):
-                cur = os.path.join(parent, filename)
-                if len(patternsegments) == 1:
-                    ret.append(cur)
-                else:
-                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
-        return ret
-
-    def glob(self, pattern):
-        collection, rest = self.get_collection(pattern)
-        patternsegments = rest.split("/")
-        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
-    def open(self, fn, mode):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.open(rest, mode)
-        else:
-            return open(self._abs(fn), mode)
-
-    def exists(self, fn):
-        collection, rest = self.get_collection(fn)
-        if collection:
-            return collection.exists(rest)
-        else:
-            return os.path.exists(self._abs(fn))
-
-
-
-class RunnerJob(object):
-    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
-    def __init__(self, runner, tool, job_order, enable_reuse):
-        self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
-        self.running = False
-        self.enable_reuse = enable_reuse
-
-    def update_pipeline_component(self, record):
-        pass
-
-    def upload_docker(self, tool):
-        if isinstance(tool, CommandLineTool):
-            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
-            if docker_req:
-                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
-        elif isinstance(tool, cwltool.workflow.Workflow):
-            for s in tool.steps:
-                self.upload_docker(s.embedded_tool)
-
-    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
-        """Create an Arvados job specification for this workflow.
-
-        The returned dict can be used to create a job (i.e., passed as
-        the +body+ argument to jobs().create()), or as a component in
-        a pipeline template or pipeline instance.
-        """
-        self.upload_docker(self.tool)
-
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(self.tool.tool["id"])
-
-        self.name = os.path.basename(self.tool.tool["id"])
-
-        def visitFiles(files, path):
-            files.add(path)
-            return path
-
-        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
-        def loadref(b, u):
-            return document_loader.fetch(urlparse.urljoin(b, u))
-
-        sc = scandeps(uri, workflowobj,
-                      set(("$import", "run")),
-                      set(("$include", "$schemas", "path")),
-                      loadref)
-        adjustFiles(sc, partial(visitFiles, workflowfiles))
-        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-
-        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
-                                       name=self.name,
-                                       **kwargs)
-
-        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
-                                  name=os.path.basename(self.job_order.get("id", "#")),
-                                  **kwargs)
-
-        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-
-        if "id" in self.job_order:
-            del self.job_order["id"]
-
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
-        return {
-            "script": "cwl-runner",
-            "script_version": "master",
-            "repository": "arvados",
-            "script_parameters": self.job_order,
-            "runtime_constraints": {
-                "docker_image": "arvados/jobs"
-            }
-        }
-
-    def run(self, *args, **kwargs):
-        job_spec = self.arvados_job_spec(*args, **kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
-        response = self.arvrunner.api.jobs().create(
-            body=job_spec,
-            find_or_create=self.enable_reuse
-        ).execute(num_retries=self.arvrunner.num_retries)
-
-        self.uuid = response["uuid"]
-        self.arvrunner.jobs[self.uuid] = self
-
-        logger.info("Submitted job %s", response["uuid"])
-
-        if kwargs.get("submit"):
-            self.pipeline = self.arvrunner.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "name": shortname(self.tool.tool["id"]),
-                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
-                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
-            self.done(response)
-
-    def done(self, record):
-        if record["state"] == "Complete":
-            processStatus = "success"
-        else:
-            processStatus = "permanentFail"
-
-        outputs = None
-        try:
-            try:
-                outc = arvados.collection.Collection(record["output"])
-                with outc.open("cwl.output.json") as f:
-                    outputs = json.load(f)
-                def keepify(path):
-                    if not path.startswith("keep:"):
-                        return "keep:%s/%s" % (record["output"], path)
-                adjustFiles(outputs, keepify)
-            except Exception as e:
-                logger.error("While getting final output object: %s", e)
-            self.arvrunner.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerTemplate(object):
-    """An Arvados pipeline template that invokes a CWL workflow."""
-
-    type_to_dataclass = {
-        'boolean': 'boolean',
-        'File': 'File',
-        'float': 'number',
-        'int': 'number',
-        'string': 'text',
-    }
-
-    def __init__(self, runner, tool, job_order, enable_reuse):
-        self.runner = runner
-        self.tool = tool
-        self.job = RunnerJob(
-            runner=runner,
-            tool=tool,
-            job_order=job_order,
-            enable_reuse=enable_reuse)
-
-    def pipeline_component_spec(self):
-        """Return a component that Workbench and a-r-p-i will understand.
-
-        Specifically, translate CWL input specs to Arvados pipeline
-        format, like {"dataclass":"File","value":"xyz"}.
-        """
-        spec = self.job.arvados_job_spec()
-
-        # Most of the component spec is exactly the same as the job
-        # spec (script, script_version, etc.).
-        # spec['script_parameters'] isn't right, though. A component
-        # spec's script_parameters hash is a translation of
-        # self.tool.tool['inputs'] with defaults/overrides taken from
-        # the job order. So we move the job parameters out of the way
-        # and build a new spec['script_parameters'].
-        job_params = spec['script_parameters']
-        spec['script_parameters'] = {}
-
-        for param in self.tool.tool['inputs']:
-            param = copy.deepcopy(param)
-
-            # Data type and "required" flag...
-            types = param['type']
-            if not isinstance(types, list):
-                types = [types]
-            param['required'] = 'null' not in types
-            non_null_types = set(types) - set(['null'])
-            if len(non_null_types) == 1:
-                the_type = [c for c in non_null_types][0]
-                dataclass = self.type_to_dataclass.get(the_type)
-                if dataclass:
-                    param['dataclass'] = dataclass
-            # Note: If we didn't figure out a single appropriate
-            # dataclass, we just left that attribute out.  We leave
-            # the "type" attribute there in any case, which might help
-            # downstream.
-
-            # Title and description...
-            title = param.pop('label', '')
-            descr = param.pop('description', '').rstrip('\n')
-            if title:
-                param['title'] = title
-            if descr:
-                param['description'] = descr
-
-            # Fill in the value from the current job order, if any.
-            param_id = shortname(param.pop('id'))
-            value = job_params.get(param_id)
-            if value is None:
-                pass
-            elif not isinstance(value, dict):
-                param['value'] = value
-            elif param.get('dataclass') == 'File' and value.get('path'):
-                param['value'] = value['path']
-
-            spec['script_parameters'][param_id] = param
-        spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
-        return spec
-
-    def save(self):
-        job_spec = self.pipeline_component_spec()
-        response = self.runner.api.pipeline_templates().create(body={
-            "components": {
-                self.job.name: job_spec,
-            },
-            "name": self.job.name,
-            "owner_uuid": self.runner.project_uuid,
-        }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
-        self.uuid = response["uuid"]
-        logger.info("Created template %s", self.uuid)
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    """Convert container-local paths to and from Keep collection ids."""
-
-    def __init__(self, arvrunner, referenced_files, input_basedir,
-                 collection_pattern, file_pattern, name=None, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = set()
-
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, collection_pattern % src[5:])
-            if "#" in src:
-                src = src[:src.index("#")]
-            if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.add((src, ab, st))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
-        if uploadfiles:
-            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
-                                             num_retries=3,
-                                             fnPattern=file_pattern,
-                                             name=name,
-                                             project=arvrunner.project_uuid)
-
-        for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
-
-        self.keepdir = None
-
-    def reversemap(self, target):
-        if target.startswith("keep:"):
-            return (target, target)
-        elif self.keepdir and target.startswith(self.keepdir):
-            return (target, "keep:" + target[len(self.keepdir)+1:])
-        else:
-            return super(ArvPathMapper, self).reversemap(target)
-
+import pkg_resources  # part of setuptools
 
-class ArvadosCommandTool(CommandLineTool):
-    """Wrap cwltool CommandLineTool to override selected methods."""
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
 
-    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
-        self.arvrunner = arvrunner
-        self.crunch2 = crunch2
+import arvados
+import arvados.events
 
-    def makeJobRunner(self):
-        if self.crunch2:
-            return ArvadosContainer(self.arvrunner)
-        else:
-            return ArvadosJob(self.arvrunner)
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
 
-    def makePathMapper(self, reffiles, **kwargs):
-        if self.crunch2:
-            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                                 "/keep/%s",
-                                 "/keep/%s/%s",
-                                 **kwargs)
-        else:
-            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                                 "$(task.keep)/%s",
-                                 "$(task.keep)/%s/%s",
-                                 **kwargs)
+from cwltool.process import shortname, UnsupportedRequirement
+from arvados.api import OrderedJsonModel
 
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
@@ -475,7 +106,10 @@ class ArvCwlRunner(object):
             return tmpl.uuid
 
         if kwargs.get("submit"):
-            runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+            if self.crunch2:
+                runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+            else:
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
             # Create pipeline for local run
@@ -510,56 +144,54 @@ class ArvCwlRunner(object):
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        if kwargs.get("conformance_test"):
-            return cwltool.main.single_job_executor(tool, job_order, **kwargs)
+        if kwargs.get("submit"):
+            jobiter = iter((runnerjob,))
         else:
-            if kwargs.get("submit"):
-                jobiter = iter((runnerjob,))
-            else:
-                if "cwl_runner_job" in kwargs:
-                    self.uuid = kwargs.get("cwl_runner_job").get('uuid')
-                jobiter = tool.job(job_order,
-                                   self.output_callback,
-                                   docker_outdir="$(task.outdir)",
-                                   **kwargs)
-
-            try:
-                self.cond.acquire()
-                # Will continue to hold the lock for the duration of this code
-                # except when in cond.wait(), at which point on_message can update
-                # job state and process output callbacks.
-
-                for runnable in jobiter:
-                    if runnable:
-                        runnable.run(**kwargs)
-                    else:
-                        if self.jobs:
-                            self.cond.wait(1)
-                        else:
-                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
-                            break
-
-                while self.jobs:
-                    self.cond.wait(1)
-
-                events.close()
-            except UnsupportedRequirement:
-                raise
-            except:
-                if sys.exc_info()[0] is KeyboardInterrupt:
-                    logger.error("Interrupted, marking pipeline as failed")
+            if "cwl_runner_job" in kwargs:
+                self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+            jobiter = tool.job(job_order,
+                               self.output_callback,
+                               docker_outdir="$(task.outdir)",
+                               **kwargs)
+
+        try:
+            self.cond.acquire()
+            # Will continue to hold the lock for the duration of this code
+            # except when in cond.wait(), at which point on_message can update
+            # job state and process output callbacks.
+
+            for runnable in jobiter:
+                if runnable:
+                    runnable.run(**kwargs)
                 else:
-                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
-                if self.pipeline:
-                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            finally:
-                self.cond.release()
+                    if self.jobs:
+                        self.cond.wait(1)
+                    else:
+                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+                        break
+
+            while self.jobs:
+                self.cond.wait(1)
+
+            events.close()
+        except UnsupportedRequirement:
+            raise
+        except:
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                logger.error("Interrupted, marking pipeline as failed")
+            else:
+                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        finally:
+            self.cond.release()
 
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+        if self.final_output is None:
+            raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+        return self.final_output
 
-            return self.final_output
 
 def versionstring():
     """Print version string of key packages for provenance and debugging."""
@@ -572,6 +204,7 @@ def versionstring():
                                     "arvados-python-client", arvpkg[0].version,
                                     "cwltool", cwlpkg[0].version)
 
+
 def arg_parser():  # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
 
@@ -634,6 +267,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     return parser
 
+
 def main(args, stdout, stderr, api_client=None):
     parser = arg_parser()
 
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b0e2c1f..be11404 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,10 +1,15 @@
 import logging
+import json
+import os
+
+from cwltool.errors import WorkflowException
+from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+
 import arvados.collection
-from cwltool.process import get_feature, adjustFiles
+
 from .arvdocker import arv_docker_get_image
 from . import done
-from cwltool.errors import WorkflowException
-from cwltool.process import UnsupportedRequirement
+from .runner import Runner
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -45,7 +50,7 @@ class ArvadosContainer(object):
         if self.generatefiles:
             raise UnsupportedRequirement("Generate files not supported")
 
-            vwd = arvados.collection.Collection()
+            vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
             container_request["task.vwd"] = {}
             for t in self.generatefiles:
                 if isinstance(self.generatefiles[t], dict):
@@ -124,3 +129,78 @@ class ArvadosContainer(object):
             self.output_callback(outputs, processStatus)
         finally:
             del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerContainer(Runner):
+    """Submit and manage a container that runs arvados-cwl-runner."""
+
+    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+        """Create an Arvados job specification for this workflow.
+
+        The returned dict can be used to create a job (i.e., passed as
+        the +body+ argument to jobs().create()), or as a component in
+        a pipeline template or pipeline instance.
+        """
+
+        workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+        with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
+            with jobobj.open("cwl.input.json", "w") as f:
+                json.dump(self.job_order, f, sort_keys=True, indent=4)
+            jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+        workflowname = os.path.basename(self.tool.tool["id"])
+        workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+        jobpath = "/var/lib/cwl/job/cwl.input.json"
+
+        container_image = arv_docker_get_image(self.arvrunner.api,
+                                               {"dockerImageId": "arvados/jobs"},
+                                               pull_image,
+                                               self.arvrunner.project_uuid)
+
+        return {
+            "command": ["arvados-cwl-runner", "--local", "--crunch2", workflowpath, jobpath],
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path": "/var/spool/cwl",
+            "cwd": "/var/spool/cwl",
+            "priority": 1,
+            "state": "Committed",
+            "container_image": container_image,
+            "mounts": {
+                workflowpath: {
+                    "kind": "collection",
+                    "portable_data_hash": "%s" % workflowcollection
+                },
+                jobpath: {
+                    "kind": "collection",
+                    "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
+                },
+                "stdout": {
+                    "kind": "file",
+                    "path": "/var/spool/cwl/cwl.output.json"
+                }
+            },
+            "runtime_constraints": {
+                "vcpus": 1,
+                "ram": 1024*1024*256
+            }
+        }
+
+    def run(self, *args, **kwargs):
+        job_spec = self.arvados_job_spec(*args, **kwargs)
+        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+        response = self.arvrunner.api.container_requests().create(
+            body=job_spec
+        ).execute(num_retries=self.arvrunner.num_retries)
+
+        self.uuid = response["uuid"]
+        self.arvrunner.jobs[response["container_uuid"]] = self
+
+        logger.info("Submitted container %s", response["uuid"])
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88a8eeb..397b6d5 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -1,11 +1,19 @@
 import logging
 import re
-from . import done
-from .arvdocker import arv_docker_get_image
-from cwltool.process import get_feature
+import copy
+
+from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
+from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.load_tool import fetch_document
+from cwltool.builder import Builder
+
 import arvados.collection
 
+from .arvdocker import arv_docker_get_image
+from .runner import Runner
+from . import done
+
 logger = logging.getLogger('arvados.cwl-runner')
 
 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
@@ -164,3 +172,145 @@ class ArvadosJob(object):
             self.output_callback(outputs, processStatus)
         finally:
             del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerJob(Runner):
+    """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+    def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+        """Create an Arvados job specification for this workflow.
+
+        The returned dict can be used to create a job (i.e., passed as
+        the +body+ argument to jobs().create()), or as a component in
+        a pipeline template or pipeline instance.
+        """
+
+        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+        return {
+            "script": "cwl-runner",
+            "script_version": "master",
+            "repository": "arvados",
+            "script_parameters": self.job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }
+
+    def run(self, *args, **kwargs):
+        job_spec = self.arvados_job_spec(*args, **kwargs)
+        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+        response = self.arvrunner.api.jobs().create(
+            body=job_spec,
+            find_or_create=self.enable_reuse
+        ).execute(num_retries=self.arvrunner.num_retries)
+
+        self.uuid = response["uuid"]
+        self.arvrunner.jobs[self.uuid] = self
+
+        logger.info("Submitted job %s", response["uuid"])
+
+        if kwargs.get("submit"):
+            self.pipeline = self.arvrunner.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "name": shortname(self.tool.tool["id"]),
+                    "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
+                    "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+
+
+class RunnerTemplate(object):
+    """An Arvados pipeline template that invokes a CWL workflow."""
+
+    type_to_dataclass = {
+        'boolean': 'boolean',
+        'File': 'File',
+        'float': 'number',
+        'int': 'number',
+        'string': 'text',
+    }
+
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.runner = runner
+        self.tool = tool
+        self.job = RunnerJob(
+            runner=runner,
+            tool=tool,
+            job_order=job_order,
+            enable_reuse=enable_reuse)
+
+    def pipeline_component_spec(self):
+        """Return a component that Workbench and a-r-p-i will understand.
+
+        Specifically, translate CWL input specs to Arvados pipeline
+        format, like {"dataclass":"File","value":"xyz"}.
+        """
+        spec = self.job.arvados_job_spec()
+
+        # Most of the component spec is exactly the same as the job
+        # spec (script, script_version, etc.).
+        # spec['script_parameters'] isn't right, though. A component
+        # spec's script_parameters hash is a translation of
+        # self.tool.tool['inputs'] with defaults/overrides taken from
+        # the job order. So we move the job parameters out of the way
+        # and build a new spec['script_parameters'].
+        job_params = spec['script_parameters']
+        spec['script_parameters'] = {}
+
+        for param in self.tool.tool['inputs']:
+            param = copy.deepcopy(param)
+
+            # Data type and "required" flag...
+            types = param['type']
+            if not isinstance(types, list):
+                types = [types]
+            param['required'] = 'null' not in types
+            non_null_types = set(types) - set(['null'])
+            if len(non_null_types) == 1:
+                the_type = [c for c in non_null_types][0]
+                dataclass = self.type_to_dataclass.get(the_type)
+                if dataclass:
+                    param['dataclass'] = dataclass
+            # Note: If we didn't figure out a single appropriate
+            # dataclass, we just left that attribute out.  We leave
+            # the "type" attribute there in any case, which might help
+            # downstream.
+
+            # Title and description...
+            title = param.pop('label', '')
+            descr = param.pop('description', '').rstrip('\n')
+            if title:
+                param['title'] = title
+            if descr:
+                param['description'] = descr
+
+            # Fill in the value from the current job order, if any.
+            param_id = shortname(param.pop('id'))
+            value = job_params.get(param_id)
+            if value is None:
+                pass
+            elif not isinstance(value, dict):
+                param['value'] = value
+            elif param.get('dataclass') == 'File' and value.get('path'):
+                param['value'] = value['path']
+
+            spec['script_parameters'][param_id] = param
+        spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+        return spec
+
+    def save(self):
+        job_spec = self.pipeline_component_spec()
+        response = self.runner.api.pipeline_templates().create(body={
+            "components": {
+                self.job.name: job_spec,
+            },
+            "name": self.job.name,
+            "owner_uuid": self.runner.project_uuid,
+        }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
+        self.uuid = response["uuid"]
+        logger.info("Created template %s", self.uuid)
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
new file mode 100644
index 0000000..a2c5c9e
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -0,0 +1,30 @@
+from cwltool.draft2tool import CommandLineTool
+from .arvjob import ArvadosJob
+from .arvcontainer import ArvadosContainer
+from .pathmapper import ArvPathMapper
+
+class ArvadosCommandTool(CommandLineTool):
+    """Wrap cwltool CommandLineTool to override selected methods."""
+
+    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+        self.arvrunner = arvrunner
+        self.crunch2 = crunch2
+
+    def makeJobRunner(self):
+        if self.crunch2:
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
+
+    def makePathMapper(self, reffiles, **kwargs):
+        if self.crunch2:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+        else:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "$(task.keep)/%s",
+                                 "$(task.keep)/%s/%s",
+                                 **kwargs)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
new file mode 100644
index 0000000..c911895
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -0,0 +1,59 @@
+import fnmatch
+
+class CollectionFsAccess(cwltool.process.StdFsAccess):
+    """Implement the cwltool FsAccess interface for Arvados Collections."""
+
+    def __init__(self, basedir):
+        super(CollectionFsAccess, self).__init__(basedir)
+        self.collections = {}
+
+    def get_collection(self, path):
+        p = path.split("/")
+        if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
+            pdh = p[0][5:]
+            if pdh not in self.collections:
+                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+            return (self.collections[pdh], "/".join(p[1:]))
+        else:
+            return (None, path)
+
+    def _match(self, collection, patternsegments, parent):
+        if not patternsegments:
+            return []
+
+        if not isinstance(collection, arvados.collection.RichCollectionBase):
+            return []
+
+        ret = []
+        # iterate over the files and subcollections in 'collection'
+        for filename in collection:
+            if patternsegments[0] == '.':
+                # Pattern contains something like "./foo" so just shift
+                # past the "./"
+                ret.extend(self._match(collection, patternsegments[1:], parent))
+            elif fnmatch.fnmatch(filename, patternsegments[0]):
+                cur = os.path.join(parent, filename)
+                if len(patternsegments) == 1:
+                    ret.append(cur)
+                else:
+                    ret.extend(self._match(collection[filename], patternsegments[1:], cur))
+        return ret
+
+    def glob(self, pattern):
+        collection, rest = self.get_collection(pattern)
+        patternsegments = rest.split("/")
+        return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
+
+    def open(self, fn, mode):
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return collection.open(rest, mode)
+        else:
+            return open(self._abs(fn), mode)
+
+    def exists(self, fn):
+        collection, rest = self.get_collection(fn)
+        if collection:
+            return collection.exists(rest)
+        else:
+            return os.path.exists(self._abs(fn))
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
new file mode 100644
index 0000000..9538a91
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -0,0 +1,55 @@
+import re
+
+import arvados.commands.run
+import arvados.collection
+import cwltool.pathmapper
+
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+    """Convert container-local paths to and from Keep collection ids."""
+
+    def __init__(self, arvrunner, referenced_files, input_basedir,
+                 collection_pattern, file_pattern, name=None, **kwargs):
+        self._pathmap = arvrunner.get_uploaded()
+        uploadfiles = set()
+
+        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+
+        for src in referenced_files:
+            if isinstance(src, basestring) and pdh_path.match(src):
+                self._pathmap[src] = (src, collection_pattern % src[5:])
+            if "#" in src:
+                src = src[:src.index("#")]
+            if src not in self._pathmap:
+                ab = cwltool.pathmapper.abspath(src, input_basedir)
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
+                if kwargs.get("conformance_test"):
+                    self._pathmap[src] = (src, ab)
+                elif isinstance(st, arvados.commands.run.UploadFile):
+                    uploadfiles.add((src, ab, st))
+                elif isinstance(st, arvados.commands.run.ArvFile):
+                    self._pathmap[src] = (ab, st.fn)
+                else:
+                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+
+        if uploadfiles:
+            arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
+                                             arvrunner.api,
+                                             dry_run=kwargs.get("dry_run"),
+                                             num_retries=3,
+                                             fnPattern=file_pattern,
+                                             name=name,
+                                             project=arvrunner.project_uuid)
+
+        for src, ab, st in uploadfiles:
+            arvrunner.add_uploaded(src, (ab, st.fn))
+            self._pathmap[src] = (ab, st.fn)
+
+        self.keepdir = None
+
+    def reversemap(self, target):
+        if target.startswith("keep:"):
+            return (target, target)
+        elif self.keepdir and target.startswith(self.keepdir):
+            return (target, "keep:" + target[len(self.keepdir)+1:])
+        else:
+            return super(ArvPathMapper, self).reversemap(target)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
new file mode 100644
index 0000000..0cc23ab
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -0,0 +1,98 @@
+import os
+import urlparse
+from functools import partial
+
+from cwltool.draft2tool import CommandLineTool
+import cwltool.workflow
+from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.load_tool import fetch_document
+
+from .arvdocker import arv_docker_get_image
+from .pathmapper import ArvPathMapper
+
+class Runner(object):
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.arvrunner = runner
+        self.tool = tool
+        self.job_order = job_order
+        self.running = False
+        self.enable_reuse = enable_reuse
+
+    def update_pipeline_component(self, record):
+        pass
+
+    def upload_docker(self, tool):
+        if isinstance(tool, CommandLineTool):
+            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+            if docker_req:
+                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+        elif isinstance(tool, cwltool.workflow.Workflow):
+            for s in tool.steps:
+                self.upload_docker(s.embedded_tool)
+
+
+    def arvados_job_spec(self, *args, **kwargs):
+        self.upload_docker(self.tool)
+
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(self.tool.tool["id"])
+
+        self.name = os.path.basename(self.tool.tool["id"])
+
+        def visitFiles(files, path):
+            files.add(path)
+            return path
+
+        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+        def loadref(b, u):
+            return document_loader.fetch(urlparse.urljoin(b, u))
+
+        sc = scandeps(uri, workflowobj,
+                      set(("$import", "run")),
+                      set(("$include", "$schemas", "path")),
+                      loadref)
+        adjustFiles(sc, partial(visitFiles, workflowfiles))
+        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+
+        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=self.name,
+                                       **kwargs)
+
+        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(self.job_order.get("id", "#")),
+                                  **kwargs)
+
+        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+        if "id" in self.job_order:
+            del self.job_order["id"]
+
+        return workflowmapper
+
+
+    def done(self, record):
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+
+        outputs = None
+        try:
+            try:
+                outc = arvados.collection.Collection(record["output"])
+                with outc.open("cwl.output.json") as f:
+                    outputs = json.load(f)
+                def keepify(path):
+                    if not path.startswith("keep:"):
+                        return "keep:%s/%s" % (record["output"], path)
+                adjustFiles(outputs, keepify)
+            except Exception as e:
+                logger.error("While getting final output object: %s", e)
+            self.arvrunner.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_container.py
similarity index 64%
copy from sdk/cwl/tests/test_job.py
copy to sdk/cwl/tests/test_container.py
index dba65b0..0237e80 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_container.py
@@ -10,14 +10,20 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
 
-class TestJob(unittest.TestCase):
+class TestContainer(unittest.TestCase):
 
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
-    def test_run(self):
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_run(self, keepdocker):
         runner = mock.MagicMock()
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
+
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
 
         tool = {
@@ -25,46 +31,44 @@ class TestJob(unittest.TestCase):
             "outputs": [],
             "baseCommand": "ls"
         }
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names, basedir="")
         arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run"):
             j.run()
-            runner.api.jobs().create.assert_called_with(
+            runner.api.container_requests().create.assert_called_with(
                 body={
-                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                    'runtime_constraints': {},
-                    'script_parameters': {
-                        'tasks': [{
-                            'task.env': {'TMPDIR': '$(task.tmpdir)'},
-                            'command': ['ls']
-                        }],
+                    'environment': {
+                        'TMPDIR': '/tmp'
                     },
-                    'script_version': 'master',
-                    'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                    'repository': 'arvados',
-                    'script': 'crunchrunner',
+                    'name': 'test_run',
                     'runtime_constraints': {
-                        'docker_image': 'arvados/jobs',
-                        'min_cores_per_node': 1,
-                        'min_ram_mb_per_node': 1024,
-                        'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
-                    }
-                },
-                find_or_create=True,
-                filters=[['repository', '=', 'arvados'],
-                         ['script', '=', 'crunchrunner'],
-                         ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                         ['docker_image_locator', 'in docker', 'arvados/jobs']]
-            )
+                        'vcpus': 1,
+                        'ram': 1073741824
+                    }, 'priority': 1,
+                    'mounts': {
+                        '/var/spool/cwl': {'kind': 'tmp'}
+                    },
+                    'state': 'Committed',
+                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                    'output_path': '/var/spool/cwl',
+                    'container_image': '99999999999999999999999999999993+99',
+                    'command': ['ls'],
+                    'cwd': '/var/spool/cwl'
+                })
 
     # The test passes some fields in builder.resources
     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
-    def test_resource_requirements(self):
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_resource_requirements(self, keepdocker):
         runner = mock.MagicMock()
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
 
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
         tool = {
             "inputs": [],
             "outputs": [],
@@ -76,36 +80,31 @@ class TestJob(unittest.TestCase):
             }],
             "baseCommand": "ls"
         }
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names)
         arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements"):
             j.run()
-        runner.api.jobs().create.assert_called_with(
+
+        runner.api.container_requests().create.assert_called_with(
             body={
-                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
-                'runtime_constraints': {},
-                'script_parameters': {
-                    'tasks': [{
-                        'task.env': {'TMPDIR': '$(task.tmpdir)'},
-                        'command': ['ls']
-                    }]
-            },
-            'script_version': 'master',
-                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
-                'repository': 'arvados',
-                'script': 'crunchrunner',
+                'environment': {
+                    'TMPDIR': '/tmp'
+                },
+                'name': 'test_resource_requirements',
                 'runtime_constraints': {
-                    'docker_image': 'arvados/jobs',
-                    'min_cores_per_node': 3,
-                    'min_ram_mb_per_node': 3000,
-                    'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
-                }
-            },
-            find_or_create=True,
-            filters=[['repository', '=', 'arvados'],
-                     ['script', '=', 'crunchrunner'],
-                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
-                     ['docker_image_locator', 'in docker', 'arvados/jobs']])
+                    'vcpus': 3,
+                    'ram': 3145728000
+                }, 'priority': 1,
+                'mounts': {
+                    '/var/spool/cwl': {'kind': 'tmp'}
+                },
+                'state': 'Committed',
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                'output_path': '/var/spool/cwl',
+                'container_image': '99999999999999999999999999999993+99',
+                'command': ['ls'],
+                'cwd': '/var/spool/cwl'
+            })
 
     @mock.patch("arvados.collection.Collection")
     def test_done(self, col):
@@ -121,7 +120,7 @@ class TestJob(unittest.TestCase):
         api.collections().list().execute.side_effect = ({"items": []},
                                                         {"items": [{"manifest_text": "XYZ"}]})
 
-        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob = arvados_cwl.ArvadosContainer(runner)
         arvjob.name = "testjob"
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
@@ -163,7 +162,7 @@ class TestJob(unittest.TestCase):
         col().open.return_value = []
         api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
 
-        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob = arvados_cwl.ArvadosContainer(runner)
         arvjob.name = "testjob"
         arvjob.builder = mock.MagicMock()
         arvjob.output_callback = mock.MagicMock()
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index dba65b0..701afcb 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -25,7 +25,7 @@ class TestJob(unittest.TestCase):
             "outputs": [],
             "baseCommand": "ls"
         }
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names, basedir="")
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir=""):
             j.run()
@@ -76,7 +76,7 @@ class TestJob(unittest.TestCase):
             }],
             "baseCommand": "ls"
         }
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names)
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir=""):
             j.run()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 38741eb..48e6ed2 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -30,7 +30,7 @@ def stubs(func):
             return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
         stubs.KeepClient().put.side_effect = putstub
 
-        stubs.keepdocker.return_value = True
+        stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
         stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
 
         stubs.api = mock.MagicMock()
@@ -44,12 +44,28 @@ def stubs(func):
         }, {
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "portable_data_hash": "99999999999999999999999999999992+99",
+        },
+        {
+            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
+            "portable_data_hash": "99999999999999999999999999999994+99",
+            "manifest_text": ""
         })
+        stubs.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
         stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         stubs.api.jobs().create().execute.return_value = {
             "uuid": stubs.expect_job_uuid,
             "state": "Queued",
         }
+
+        stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+        stubs.api.container_requests().create().execute.return_value = {
+            "uuid": stubs.expect_container_request_uuid,
+            "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+
         stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
         stubs.api.pipeline_templates().create().execute.return_value = {
             "uuid": stubs.expect_pipeline_template_uuid,
@@ -70,6 +86,35 @@ def stubs(func):
             'script_version': 'master',
             'script': 'cwl-runner'
         }
+
+        stubs.expect_container_spec = {
+            'priority': 1,
+            'mounts': {
+                'stdout': {
+                    'path': '/var/spool/cwl/cwl.output.json',
+                    'kind': 'file'
+                },
+                '/var/lib/cwl/workflow/submit_wf.cwl': {
+                    'portable_data_hash': '999999999999999999999999991+99',
+                    'kind': 'collection'
+                },
+                '/var/lib/cwl/job/cwl.input.json': {
+                    'portable_data_hash': '102435082199e5229f99b01165b67096+60/cwl.input.json',
+                    'kind': 'collection'
+                }
+            },
+            'state': 'Committed',
+            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+            'command': ['arvados-cwl-runner', '--local', '--crunch2', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+            'name': 'submit_wf.cwl',
+            'container_image': '99999999999999999999999999999993+99',
+            'output_path': '/var/spool/cwl',
+            'cwd': '/var/spool/cwl',
+            'runtime_constraints': {
+                'vcpus': 1,
+                'ram': 268435456
+            }
+        }
         return func(self, stubs, *args, **kwargs)
     return wrapped
 
@@ -128,6 +173,41 @@ class TestSubmit(unittest.TestCase):
             body=expect_body,
             find_or_create=True)
 
+    @stubs
+    def test_submit_container(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--crunch2", "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.collections().create.assert_has_calls([
+            mock.call(),
+            mock.call(body={
+                'manifest_text':
+                './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
+                '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
+                'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'name': 'submit_wf.cwl',
+            }, ensure_unique_name=True),
+            mock.call().execute(),
+            mock.call(body={
+                'manifest_text':
+                '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'name': '#',
+            }, ensure_unique_name=True),
+            mock.call().execute()])
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = stubs.fake_user_uuid
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
 
 class TestCreateTemplate(unittest.TestCase):
     @stubs
diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock
index 7be4e0f..3715718 100644
--- a/services/api/Gemfile.lock
+++ b/services/api/Gemfile.lock
@@ -257,6 +257,3 @@ DEPENDENCIES
   therubyracer
   trollop
   uglifier (>= 1.0.3)
-
-BUNDLED WITH
-   1.12.1

commit c3f8057d79f36b36e297d6015fa951d47739b1e6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 13 10:36:05 2016 -0400

    8442: Fix message

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 17fe8cb..b0e2c1f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -43,7 +43,7 @@ class ArvadosContainer(object):
             }
 
         if self.generatefiles:
-            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+            raise UnsupportedRequirement("Generate files not supported")
 
             vwd = arvados.collection.Collection()
             container_request["task.vwd"] = {}

commit 8e95162337fc38c61c26d6afafcecd9985014fbf
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 10 10:45:19 2016 -0400

    9388: Record every log id sent and don't send duplicates.

diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 4988d59..58188ef 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -7,6 +7,7 @@ require 'oj'
 require 'faye/websocket'
 require 'record_filters'
 require 'load_param'
+require 'set'
 
 # Patch in user, last_log_id and filters fields into the Faye::Websocket class.
 module Faye
@@ -14,6 +15,7 @@ module Faye
     attr_accessor :user
     attr_accessor :last_log_id
     attr_accessor :filters
+    attr_accessor :sent_ids
   end
 end
 
@@ -114,7 +116,9 @@ class EventBus
 
         lastid = nil
         logs.limit(limit).each do |l|
-          ws.send(l.as_api_response.to_json)
+          if ws.sent_ids.add?(l.id) != nil
+            ws.send(l.as_api_response.to_json)
+          end
           lastid = l.id
           count += 1
         end
@@ -225,6 +229,7 @@ class EventBus
     ws.user = current_user
     ws.filters = []
     ws.last_log_id = nil
+    ws.sent_ids = Set.new
 
     # Subscribe to internal postgres notifications through @channel.  This will
     # call push_events when a notification comes through.

commit ae06d06086f675eb312b7aac2bb9f33d6d2b4ac2
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 17:55:37 2016 -0400

    9388: Process each notify individually instead attempting to batch them up.
    
    Prior to this commit, websockets used to try to send log events out in batches,
    by getting all logs with an id greater than last log that was sent.
    Unfortunately, under concurrent database writes, logs from uncommited
    transactions may not appear in the query even if logs with larger ids do
    appear.  This results in the uncommitted log never being sent out because
    subsequent batch sends would not consider logs prior to the last log id that
    was sent (which, in this case, is higher than the log that was missed.)
    
    This commit eliminates the batching behavior.  Because NOTIFY includes the log
    id of a specific record that was committed, consider only the log record with
    that id and process events in the order that the NOTIFY events arrive.  This
    means events may be delivered out of numeric order (although they now more
    closely reflect the "actual" order, e.g. the order that the events were
    actually committed to the database).
    
    "Catch ups" where the client has specified a last_log_id and needs to have past
    logs replayed continue to be sent in batches.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 347075d..988ec96 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -15,6 +15,7 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
+    parser.add_argument('-i', '--id', type=int, default="", help="Start from given log id.")
 
     group = parser.add_mutually_exclusive_group()
     group.add_argument('--poll-interval', default=15, type=int, help="If websockets is not available, specify the polling interval, default is every 15 seconds")
@@ -67,6 +68,9 @@ def main(arguments=None):
     else:
         last_log_id = None
 
+    if args.id:
+        last_log_id = args.id-1
+
     def on_message(ev):
         global filters
         global ws
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 9bf95f5..4988d59 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -67,11 +67,6 @@ class EventBus
   # push_events call if there are more log rows to send.
   def push_events ws, notify_id
     begin
-      if !notify_id.nil? and !ws.last_log_id.nil? and notify_id <= ws.last_log_id
-        # This notify is for a row we've handled already.
-        return
-      end
-
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
         # Start with log rows readable by user, sorted in ascending order
@@ -82,13 +77,12 @@ class EventBus
         param_out = []
 
         if !ws.last_log_id.nil?
-          # Client is only interested in log rows that are newer than the
-          # last log row seen by the client.
+          # We are catching up from some starting point.
           cond_id = "logs.id > ?"
           param_out << ws.last_log_id
         elsif !notify_id.nil?
-          # No last log id, so look at rows starting with notify id
-          cond_id = "logs.id >= ?"
+          # Get new row being notified.
+          cond_id = "logs.id = ?"
           param_out << notify_id
         else
           # No log id to start from, nothing to do, return
@@ -118,9 +112,10 @@ class EventBus
         count = 0
         limit = 10
 
+        lastid = nil
         logs.limit(limit).each do |l|
           ws.send(l.as_api_response.to_json)
-          ws.last_log_id = l.id
+          lastid = l.id
           count += 1
         end
 
@@ -128,13 +123,13 @@ class EventBus
           # Number of rows returned was capped by limit(), we need to schedule
           # another query to get more logs (will start from last_log_id
           # reported by current query)
+          ws.last_log_id = lastid
           EventMachine::next_tick do
-            push_events ws, nil
+            push_events ws, notify_id
           end
-        elsif !notify_id.nil? and (ws.last_log_id.nil? or notify_id > ws.last_log_id)
-          # Number of rows returned was less than cap, but the notify id is
-          # higher than the last id visible to the client, so update last_log_id
-          ws.last_log_id = notify_id
+        elsif !ws.last_log_id.nil?
+          # Done catching up
+          ws.last_log_id = nil
         end
       elsif !notify_id.nil?
         # No filters set up, so just record the sequence number

commit 8efd6be8cd2d75d0a3b74a703391affa0e7459f1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 15:16:09 2016 -0400

    8442: raise UnsupportedRequirement for unsupported features in the conformance
    tests.  Bump cwltool dependency.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 171d92d..af74808 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ from .arvcontainer import ArvadosContainer
 from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
 
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -477,7 +477,7 @@ class ArvCwlRunner(object):
         if kwargs.get("submit"):
             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
@@ -543,11 +543,13 @@ class ArvCwlRunner(object):
                     self.cond.wait(1)
 
                 events.close()
+            except UnsupportedRequirement:
+                raise
             except:
                 if sys.exc_info()[0] is KeyboardInterrupt:
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
-                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
                 if self.pipeline:
                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0cb885f..17fe8cb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -4,6 +4,7 @@ from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
 from . import done
 from cwltool.errors import WorkflowException
+from cwltool.process import UnsupportedRequirement
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -42,6 +43,8 @@ class ArvadosContainer(object):
             }
 
         if self.generatefiles:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+
             vwd = arvados.collection.Collection()
             container_request["task.vwd"] = {}
             for t in self.generatefiles:
@@ -60,13 +63,12 @@ class ArvadosContainer(object):
         if self.environment:
             container_request["environment"].update(self.environment)
 
-        # TODO, not supported by crunchv2 yet
-        #if self.stdin:
-        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+        if self.stdin:
+            raise UnsupportedRequirement("Stdin redirection currently not suppported")
 
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
-                                "path": self.stdout}
+                                "path": "/var/spool/cwl/%s" % (self.stdout)}
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 591bdde..b1ff7f3 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool==1.0.20160519182434',
+          'cwltool==1.0.20160609160402',
           'arvados-python-client>=0.1.20160322001610'
       ],
       data_files=[

commit e9aaeacb806e66af43f69f53ad8d0bb8468d28f7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 9 08:21:34 2016 -0400

    8442: Setting up mount points works.  Capturing output works.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 387bc4a..171d92d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
 from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
 from .arvdocker import arv_docker_get_image
 
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
@@ -36,10 +37,6 @@ from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
@@ -98,194 +95,6 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
         else:
             return os.path.exists(self._abs(fn))
 
-class ArvadosJob(object):
-    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
-    def __init__(self, runner):
-        self.arvrunner = runner
-        self.running = False
-
-    def run(self, dry_run=False, pull_image=True, **kwargs):
-        script_parameters = {
-            "command": self.command_line
-        }
-        runtime_constraints = {}
-
-        if self.generatefiles:
-            vwd = arvados.collection.Collection()
-            script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t])
-            vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
-        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
-        if self.environment:
-            script_parameters["task.env"].update(self.environment)
-
-        if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
-        if self.stdout:
-            script_parameters["task.stdout"] = self.stdout
-
-        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
-        if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
-        else:
-            runtime_constraints["docker_image"] = "arvados/jobs"
-
-        resources = self.builder.resources
-        if resources is not None:
-            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
-            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
-            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
-        filters = [["repository", "=", "arvados"],
-                   ["script", "=", "crunchrunner"],
-                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
-        if not self.arvrunner.ignore_docker_for_reuse:
-            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
-        try:
-            response = self.arvrunner.api.jobs().create(
-                body={
-                    "owner_uuid": self.arvrunner.project_uuid,
-                    "script": "crunchrunner",
-                    "repository": "arvados",
-                    "script_version": "master",
-                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                    "script_parameters": {"tasks": [script_parameters]},
-                    "runtime_constraints": runtime_constraints
-                },
-                filters=filters,
-                find_or_create=kwargs.get("enable_reuse", True)
-            ).execute(num_retries=self.arvrunner.num_retries)
-
-            self.arvrunner.jobs[response["uuid"]] = self
-
-            self.update_pipeline_component(response)
-
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
-            if response["state"] in ("Complete", "Failed", "Cancelled"):
-                self.done(response)
-        except Exception as e:
-            logger.error("Got error %s" % str(e))
-            self.output_callback({}, "permanentFail")
-
-    def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
-                                                                                 body={
-                                                                                    "components": self.arvrunner.pipeline["components"]
-                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
-                        body={
-                            "components": components
-                        }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
-
-    def done(self, record):
-        try:
-            self.update_pipeline_component(record)
-        except:
-            pass
-
-        try:
-            if record["state"] == "Complete":
-                processStatus = "success"
-            else:
-                processStatus = "permanentFail"
-
-            try:
-                outputs = {}
-                if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
-            except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
-                processStatus = "permanentFail"
-            except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
-                processStatus = "permanentFail"
-
-            self.output_callback(outputs, processStatus)
-        finally:
-            del self.arvrunner.jobs[record["uuid"]]
 
 
 class RunnerJob(object):
@@ -574,10 +383,16 @@ class ArvadosCommandTool(CommandLineTool):
             return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
-                             "$(task.keep)/%s",
-                             "$(task.keep)/%s/%s",
-                             **kwargs)
+        if self.crunch2:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "/keep/%s",
+                                 "/keep/%s/%s",
+                                 **kwargs)
+        else:
+            return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+                                 "$(task.keep)/%s",
+                                 "$(task.keep)/%s/%s",
+                                 **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -676,7 +491,10 @@ class ArvCwlRunner(object):
                 runnerjob.run()
                 return runnerjob.uuid
 
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        if self.crunch2:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
+        else:
+            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 2179398..0cb885f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,7 +1,9 @@
 import logging
 import arvados.collection
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles
 from .arvdocker import arv_docker_get_image
+from . import done
+from cwltool.errors import WorkflowException
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -12,6 +14,9 @@ class ArvadosContainer(object):
         self.arvrunner = runner
         self.running = False
 
+    def update_pipeline_component(self, r):
+        pass
+
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
             "command": self.command_line,
@@ -26,13 +31,15 @@ class ArvadosContainer(object):
         mounts = {
             "/var/spool/cwl": {
                 "kind": "tmp"
-            },
-            "/tmp": {
-                "kind": "tmp"
             }
         }
 
-        # TODO mount normal inputs...
+        for f in self.pathmapper.files():
+            _, p = self.pathmapper.mapper(f)
+            mounts[p] = {
+                "kind": "collection",
+                "portable_data_hash": p[6:]
+            }
 
         if self.generatefiles:
             vwd = arvados.collection.Collection()
@@ -53,7 +60,7 @@ class ArvadosContainer(object):
         if self.environment:
             container_request["environment"].update(self.environment)
 
-        # TODO, not supported
+        # TODO, not supported by crunchv2 yet
         #if self.stdin:
         #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
 
@@ -84,11 +91,11 @@ class ArvadosContainer(object):
                 body=container_request
             ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.jobs[response["uuid"]] = self
+            self.arvrunner.jobs[response["container_uuid"]] = self
 
-            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+            logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
 
-            if response["state"] in ("Complete", "Cancelled"):
+            if response["state"] == "Final":
                 self.done(response)
         except Exception as e:
             logger.error("Got error %s" % str(e))
@@ -104,69 +111,9 @@ class ArvadosContainer(object):
             try:
                 outputs = {}
                 if record["output"]:
-                    logc = arvados.collection.Collection(record["log"])
-                    log = logc.open(logc.keys()[0])
-                    tmpdir = None
-                    outdir = None
-                    keepdir = None
-                    for l in log:
-                        # Determine the tmpdir, outdir and keepdir paths from
-                        # the job run.  Unfortunately, we can't take the first
-                        # values we find (which are expected to be near the
-                        # top) and stop scanning because if the node fails and
-                        # the job restarts on a different node these values
-                        # will different runs, and we need to know about the
-                        # final run that actually produced output.
-
-                        g = tmpdirre.match(l)
-                        if g:
-                            tmpdir = g.group(1)
-                        g = outdirre.match(l)
-                        if g:
-                            outdir = g.group(1)
-                        g = keepre.match(l)
-                        if g:
-                            keepdir = g.group(1)
-
-                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                    # check if collection already exists with same owner, name and content
-                    collection_exists = self.arvrunner.api.collections().list(
-                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                 ['portable_data_hash', '=', record["output"]],
-                                 ["name", "=", colname]]
-                    ).execute(num_retries=self.arvrunner.num_retries)
-
-                    if not collection_exists["items"]:
-                        # Create a collection located in the same project as the
-                        # pipeline with the contents of the output.
-                        # First, get output record.
-                        collections = self.arvrunner.api.collections().list(
-                            limit=1,
-                            filters=[['portable_data_hash', '=', record["output"]]],
-                            select=["manifest_text"]
-                        ).execute(num_retries=self.arvrunner.num_retries)
-
-                        if not collections["items"]:
-                            raise WorkflowException(
-                                "Job output '%s' cannot be found on API server" % (
-                                    record["output"]))
-
-                        # Create new collection in the parent project
-                        # with the output contents.
-                        self.arvrunner.api.collections().create(body={
-                            "owner_uuid": self.arvrunner.project_uuid,
-                            "name": colname,
-                            "portable_data_hash": record["output"],
-                            "manifest_text": collections["items"][0]["manifest_text"]
-                        }, ensure_unique_name=True).execute(
-                            num_retries=self.arvrunner.num_retries)
-
-                    self.builder.outdir = outdir
-                    self.builder.pathmapper.keepdir = keepdir
-                    outputs = self.collect_outputs("keep:" + record["output"])
+                    outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
             except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting job outputs:")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
new file mode 100644
index 0000000..88a8eeb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -0,0 +1,166 @@
+import logging
+import re
+from . import done
+from .arvdocker import arv_docker_get_image
+from cwltool.process import get_feature
+from cwltool.errors import WorkflowException
+import arvados.collection
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+class ArvadosJob(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        script_parameters = {
+            "command": self.command_line
+        }
+        runtime_constraints = {}
+
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            script_parameters["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            for t in self.generatefiles:
+                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+        script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+        if self.environment:
+            script_parameters["task.env"].update(self.environment)
+
+        if self.stdin:
+            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+        if self.stdout:
+            script_parameters["task.stdout"] = self.stdout
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if docker_req and kwargs.get("use_container") is not False:
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+        else:
+            runtime_constraints["docker_image"] = "arvados/jobs"
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+        filters = [["repository", "=", "arvados"],
+                   ["script", "=", "crunchrunner"],
+                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+        if not self.arvrunner.ignore_docker_for_reuse:
+            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
+        try:
+            response = self.arvrunner.api.jobs().create(
+                body={
+                    "owner_uuid": self.arvrunner.project_uuid,
+                    "script": "crunchrunner",
+                    "repository": "arvados",
+                    "script_version": "master",
+                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                    "script_parameters": {"tasks": [script_parameters]},
+                    "runtime_constraints": runtime_constraints
+                },
+                filters=filters,
+                find_or_create=kwargs.get("enable_reuse", True)
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            self.arvrunner.jobs[response["uuid"]] = self
+
+            self.update_pipeline_component(response)
+
+            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+            if response["state"] in ("Complete", "Failed", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+
+    def update_pipeline_component(self, record):
+        if self.arvrunner.pipeline:
+            self.arvrunner.pipeline["components"][self.name] = {"job": record}
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+                                                                                 body={
+                                                                                    "components": self.arvrunner.pipeline["components"]
+                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
+        if self.arvrunner.uuid:
+            try:
+                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                if job:
+                    components = job["components"]
+                    components[self.name] = record["uuid"]
+                    self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+                        body={
+                            "components": components
+                        }).execute(num_retries=self.arvrunner.num_retries)
+            except Exception as e:
+                logger.info("Error adding to components: %s", e)
+
+    def done(self, record):
+        try:
+            self.update_pipeline_component(record)
+        except:
+            pass
+
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            outputs = {}
+            try:
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+
+                    outputs = done.done(self, record, tmpdir, outdir, keepdir)
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
new file mode 100644
index 0000000..8a6fc9d
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -0,0 +1,38 @@
+def done(self, record, tmpdir, outdir, keepdir):
+    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+    # check if collection already exists with same owner, name and content
+    collection_exists = self.arvrunner.api.collections().list(
+        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                 ['portable_data_hash', '=', record["output"]],
+                 ["name", "=", colname]]
+    ).execute(num_retries=self.arvrunner.num_retries)
+
+    if not collection_exists["items"]:
+        # Create a collection located in the same project as the
+        # pipeline with the contents of the output.
+        # First, get output record.
+        collections = self.arvrunner.api.collections().list(
+            limit=1,
+            filters=[['portable_data_hash', '=', record["output"]]],
+            select=["manifest_text"]
+        ).execute(num_retries=self.arvrunner.num_retries)
+
+        if not collections["items"]:
+            raise WorkflowException(
+                "Job output '%s' cannot be found on API server" % (
+                    record["output"]))
+
+        # Create new collection in the parent project
+        # with the output contents.
+        self.arvrunner.api.collections().create(body={
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": colname,
+            "portable_data_hash": record["output"],
+            "manifest_text": collections["items"][0]["manifest_text"]
+        }, ensure_unique_name=True).execute(
+            num_retries=self.arvrunner.num_retries)
+
+    self.builder.outdir = outdir
+    self.builder.pathmapper.keepdir = keepdir
+    return self.collect_outputs("keep:" + record["output"])

commit 31f4757bd8efe51b27b596ac08845e701c55ea56
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:45:00 2016 -0400

    8442: Return PDH for Docker container.  Working on setting up mount points.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5a1ff07..387bc4a 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,9 +40,6 @@ tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
-
-
-
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4ea63a0..2179398 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -23,7 +23,16 @@ class ArvadosContainer(object):
             "state": "Committed"
         }
         runtime_constraints = {}
-        mounts = {}
+        mounts = {
+            "/var/spool/cwl": {
+                "kind": "tmp"
+            },
+            "/tmp": {
+                "kind": "tmp"
+            }
+        }
+
+        # TODO mount normal inputs...
 
         if self.generatefiles:
             vwd = arvados.collection.Collection()
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 8f76bbf..253df99 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -26,6 +26,11 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
-    # XXX return PDH instead
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+
+    #return dockerRequirement["dockerImageId"]
 
-    return dockerRequirement["dockerImageId"]
+    pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+    return pdh

commit 70672f1ae9fa85f4c36c082788c5abdea1781ed9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 8 11:15:58 2016 -0400

    8442: Submit containers Work in progess.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0878998..5a1ff07 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -27,6 +27,8 @@ import threading
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 import urlparse
+from .arvcontainer import ArvadosContainer
+from .arvdocker import arv_docker_get_image
 
 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
 from arvados.api import OrderedJsonModel
@@ -39,31 +41,6 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
 
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
-    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
-
-    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
-        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
-    sp = dockerRequirement["dockerImageId"].split(":")
-    image_name = sp[0]
-    image_tag = sp[1] if len(sp) > 1 else None
-
-    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
-                                                            image_name=image_name,
-                                                            image_tag=image_tag)
-
-    if not images:
-        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = ["--project-uuid="+project_uuid, image_name]
-        if image_tag:
-            args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args[1:]))
-        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-
-    # XXX return PDH instead
-
-    return dockerRequirement["dockerImageId"]
 
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
@@ -588,12 +565,13 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
 
-    def __init__(self, arvrunner, toolpath_object, **kwargs):
+    def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
+        self.crunch2 = crunch2
 
     def makeJobRunner(self):
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             return ArvadosContainer(self.arvrunner)
         else:
             return ArvadosJob(self.arvrunner)
@@ -609,7 +587,7 @@ class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
     complete, and report output."""
 
-    def __init__(self, api_client):
+    def __init__(self, api_client, crunch2):
         self.api = api_client
         self.jobs = {}
         self.lock = threading.Lock()
@@ -618,10 +596,11 @@ class ArvCwlRunner(object):
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
+        self.crunch2 = crunch2
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+            return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -709,7 +688,7 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
-        if kwargs.get("crunch2"):
+        if self.crunch2:
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
         else:
@@ -849,7 +828,7 @@ def main(args, stdout, stderr, api_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client)
+        runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
     except Exception as e:
         logger.error(e)
         return 1
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index fe0f7ca..4ea63a0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,3 +1,10 @@
+import logging
+import arvados.collection
+from cwltool.process import get_feature
+from .arvdocker import arv_docker_get_image
+
+logger = logging.getLogger('arvados.cwl-runner')
+
 class ArvadosContainer(object):
     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
 
@@ -7,12 +14,13 @@ class ArvadosContainer(object):
 
     def run(self, dry_run=False, pull_image=True, **kwargs):
         container_request = {
-            "command": self.command_line
+            "command": self.command_line,
             "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
-            "output_path", "/var/spool/cwl",
-            "cwd", "/var/spool/cwl",
-            "priority": 1
+            "output_path": "/var/spool/cwl",
+            "cwd": "/var/spool/cwl",
+            "priority": 1,
+            "state": "Committed"
         }
         runtime_constraints = {}
         mounts = {}
@@ -46,7 +54,7 @@ class ArvadosContainer(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
-            docker_req = "arvados/jobs"
+            docker_req = {"dockerImageId": "arvados/jobs"}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                      docker_req,
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
new file mode 100644
index 0000000..8f76bbf
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -0,0 +1,31 @@
+import logging
+import cwltool.docker
+import arvados.commands.keepdocker
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+    """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
+    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+    sp = dockerRequirement["dockerImageId"].split(":")
+    image_name = sp[0]
+    image_tag = sp[1] if len(sp) > 1 else None
+
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+
+    if not images:
+        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+        args = ["--project-uuid="+project_uuid, image_name]
+        if image_tag:
+            args.append(image_tag)
+        logger.info("Uploading Docker image %s", ":".join(args[1:]))
+        arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+
+    # XXX return PDH instead
+
+    return dockerRequirement["dockerImageId"]

commit 9bff1ecbf7adc10d7cdea7cdc88058579de0a7a0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:54:35 2016 -0400

    8442: Add --crunch1/--crunch2 switch

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index bf7ccc0..0878998 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -824,6 +824,15 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--crunch1", action="store_false",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v1 Jobs API")
+
+    exgroup.add_argument("--crunch2", action="store_true",
+                        default=False, dest="crunch2",
+                        help="Use Crunch v2 Containers API")
+
     parser.add_argument("workflow", type=str, nargs="?", default=None)
     parser.add_argument("job_order", nargs=argparse.REMAINDER)
 

commit 6a36c411de668ca921b27a1221eab3d09dbc5be1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 6 15:24:50 2016 -0400

    8442: CWL create crunch2 containers WIP

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 371dd4f..bf7ccc0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
+    # XXX return PDH instead
+
     return dockerRequirement["dockerImageId"]
 
 
@@ -591,7 +593,10 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
+        if kwargs.get("crunch2"):
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, **kwargs):
         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -704,8 +709,12 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
-        kwargs["outdir"] = "$(task.outdir)"
-        kwargs["tmpdir"] = "$(task.tmpdir)"
+        if kwargs.get("crunch2"):
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
 
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644
index 0000000..fe0f7ca
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        container_request = {
+            "command": self.command_line
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path", "/var/spool/cwl",
+            "cwd", "/var/spool/cwl",
+            "priority": 1
+        }
+        runtime_constraints = {}
+        mounts = {}
+
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            container_request["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            # TODO
+            # for t in self.generatefiles:
+            #     container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+        container_request["environment"] = {"TMPDIR": "/tmp"}
+        if self.environment:
+            container_request["environment"].update(self.environment)
+
+        # TODO, not supported
+        #if self.stdin:
+        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+        if self.stdout:
+            mounts["stdout"] = {"kind": "file",
+                                "path": self.stdout}
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if not docker_req:
+            docker_req = "arvados/jobs"
+
+        container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+                                                                     docker_req,
+                                                                     pull_image,
+                                                                     self.arvrunner.project_uuid)
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["vcpus"] = resources.get("cores", 1)
+            runtime_constraints["ram"] = resources.get("ram") * 2**20
+            #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+        container_request["mounts"] = mounts
+        container_request["runtime_constraints"] = runtime_constraints
+
+        try:
+            response = self.arvrunner.api.container_requests().create(
+                body=container_request
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            self.arvrunner.jobs[response["uuid"]] = self
+
+            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+            if response["state"] in ("Complete", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+
+    def done(self, record):
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            try:
+                outputs = {}
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
+                        # First, get output record.
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
+                        ).execute(num_retries=self.arvrunner.num_retries)
+
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
+
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
+                    outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list