[ARVADOS] created: 8c46fb908f5c0a816b9255a8c3e0dacc6ecac803
Git user
git at public.curoverse.com
Tue Jun 14 16:25:40 EDT 2016
at 8c46fb908f5c0a816b9255a8c3e0dacc6ecac803 (commit)
commit 8c46fb908f5c0a816b9255a8c3e0dacc6ecac803
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:42:37 2016 -0400
8442: more import fixups, import workflow file correctly.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4c80896..414ce63 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
workflowname = os.path.basename(self.tool.tool["id"])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[workflowcollection.index('/')]
+ workflowcollection = workflowcollection[:workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
container_image = arv_docker_get_image(self.arvrunner.api,
@@ -170,7 +170,7 @@ class RunnerContainer(Runner):
"state": "Committed",
"container_image": container_image,
"mounts": {
- workflowpath: {
+ "/var/lib/cwl/workflow": {
"kind": "collection",
"portable_data_hash": "%s" % workflowcollection
},
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index d48c93a..28b0fee 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,6 +1,11 @@
import fnmatch
+import os
+
import cwltool.process
+import arvados.util
+import arvados.collection
+
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
commit b5c2816d74381a55f5d951d1751c08cb4e3fb0a6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:28:45 2016 -0400
8442: import fixups
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 0cc23ab..f754348 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -1,15 +1,21 @@
import os
import urlparse
from functools import partial
+import logging
+import json
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
from cwltool.process import get_feature, scandeps, adjustFiles
from cwltool.load_tool import fetch_document
+import arvados.collection
+
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
+logger = logging.getLogger('arvados.cwl-runner')
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
commit 14a3af65ffbdef7d360a804a684a94b13c15697f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:22:46 2016 -0400
8442: Introduce a RuntimeConstraints struct instead of using a map
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 54d596f..c0e4f14 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -31,13 +31,19 @@ type apiClientAuthorizationList struct {
Items []apiClientAuthorization `json:"items"`
}
+type RuntimeConstraints struct {
+ API *bool `json:"API"`
+ Ram int64 `json:"ram"`
+ Vcpus int64 `json:"vcpus"`
+}
+
// Represents an Arvados container record
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f718fbc..9b2cf0c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -70,11 +70,11 @@ func doMain() error {
// sbatchCmd
func sbatchFunc(container dispatch.Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+ memPerCPU := math.Ceil((float64(container.RuntimeConstraints.Ram)) / (float64(container.RuntimeConstraints.Vcpus * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints.Vcpus)),
fmt.Sprintf("--priority=%d", container.Priority))
}
commit aab3cc9b7db72d5aeb16e6b00fb87c95617981e7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:17:45 2016 -0400
8442: Debugging container --submit with crunch2
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 136d5ae..ba26816 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,7 @@ import arvados.events
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
from cwltool.process import shortname, UnsupportedRequirement
from arvados.api import OrderedJsonModel
@@ -184,6 +185,9 @@ class ArvCwlRunner(object):
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if runnerjob and self.crunch2:
+ self.api.container_requests().update(uuid=runnerjob.uuid,
+ body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index be11404..4c80896 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
workflowname = os.path.basename(self.tool.tool["id"])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ workflowcollection = workflowcollection[workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
container_image = arv_docker_get_image(self.arvrunner.api,
@@ -181,11 +181,16 @@ class RunnerContainer(Runner):
"stdout": {
"kind": "file",
"path": "/var/spool/cwl/cwl.output.json"
+ },
+ "/var/spool/cwl": {
+ "kind": "collection",
+ "writable": True
}
},
"runtime_constraints": {
"vcpus": 1,
- "ram": 1024*1024*256
+ "ram": 1024*1024*256,
+ "API": True
}
}
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index c911895..d48c93a 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,4 +1,5 @@
import fnmatch
+import cwltool.process
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
commit b1554d8015038cbfc736b178f77465382ef61581
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 15:45:29 2016 -0400
8442: Adding --submit support with --crunch2. General refactoring into more/smaller files.
diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock
index 1b177ec..2618e47 100644
--- a/apps/workbench/Gemfile.lock
+++ b/apps/workbench/Gemfile.lock
@@ -309,6 +309,3 @@ DEPENDENCIES
therubyracer
uglifier (>= 1.0.3)
wiselinks
-
-BUNDLED WITH
- 1.12.1
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index af74808..136d5ae 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -3,397 +3,28 @@
# Implement cwl-runner interface for submitting and running jobs on Arvados.
import argparse
-import arvados
-import arvados.collection
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.events
-import arvados.util
-import copy
-import cwltool.docker
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
-from cwltool.errors import WorkflowException
-import cwltool.main
-import cwltool.workflow
-import fnmatch
-from functools import partial
-import json
import logging
import os
-import pkg_resources # part of setuptools
-import re
import sys
import threading
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-import urlparse
-from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
-from .arvdocker import arv_docker_get_image
-
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
-from arvados.api import OrderedJsonModel
-
-logger = logging.getLogger('arvados.cwl-runner')
-logger.setLevel(logging.INFO)
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
- """Implement the cwltool FsAccess interface for Arvados Collections."""
-
- def __init__(self, basedir):
- super(CollectionFsAccess, self).__init__(basedir)
- self.collections = {}
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
- return (self.collections[pdh], "/".join(p[1:]))
- else:
- return (None, path)
-
- def _match(self, collection, patternsegments, parent):
- if not patternsegments:
- return []
-
- if not isinstance(collection, arvados.collection.RichCollectionBase):
- return []
-
- ret = []
- # iterate over the files and subcollections in 'collection'
- for filename in collection:
- if patternsegments[0] == '.':
- # Pattern contains something like "./foo" so just shift
- # past the "./"
- ret.extend(self._match(collection, patternsegments[1:], parent))
- elif fnmatch.fnmatch(filename, patternsegments[0]):
- cur = os.path.join(parent, filename)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[filename], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_collection(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.open(rest, mode)
- else:
- return open(self._abs(fn), mode)
-
- def exists(self, fn):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.exists(rest)
- else:
- return os.path.exists(self._abs(fn))
-
-
-
-class RunnerJob(object):
- """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
- self.running = False
- self.enable_reuse = enable_reuse
-
- def update_pipeline_component(self, record):
- pass
-
- def upload_docker(self, tool):
- if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
- """Create an Arvados job specification for this workflow.
-
- The returned dict can be used to create a job (i.e., passed as
- the +body+ argument to jobs().create()), or as a component in
- a pipeline template or pipeline instance.
- """
- self.upload_docker(self.tool)
-
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
-
- self.name = os.path.basename(self.tool.tool["id"])
-
- def visitFiles(files, path):
- files.add(path)
- return path
-
- document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
- def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
-
- sc = scandeps(uri, workflowobj,
- set(("$import", "run")),
- set(("$include", "$schemas", "path")),
- loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
- return {
- "script": "cwl-runner",
- "script_version": "master",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": "arvados/jobs"
- }
- }
-
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
- self.arvrunner.jobs[self.uuid] = self
-
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
-
- def done(self, record):
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- outputs = None
- try:
- try:
- outc = arvados.collection.Collection(record["output"])
- with outc.open("cwl.output.json") as f:
- outputs = json.load(f)
- def keepify(path):
- if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- adjustFiles(outputs, keepify)
- except Exception as e:
- logger.error("While getting final output object: %s", e)
- self.arvrunner.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerTemplate(object):
- """An Arvados pipeline template that invokes a CWL workflow."""
-
- type_to_dataclass = {
- 'boolean': 'boolean',
- 'File': 'File',
- 'float': 'number',
- 'int': 'number',
- 'string': 'text',
- }
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.runner = runner
- self.tool = tool
- self.job = RunnerJob(
- runner=runner,
- tool=tool,
- job_order=job_order,
- enable_reuse=enable_reuse)
-
- def pipeline_component_spec(self):
- """Return a component that Workbench and a-r-p-i will understand.
-
- Specifically, translate CWL input specs to Arvados pipeline
- format, like {"dataclass":"File","value":"xyz"}.
- """
- spec = self.job.arvados_job_spec()
-
- # Most of the component spec is exactly the same as the job
- # spec (script, script_version, etc.).
- # spec['script_parameters'] isn't right, though. A component
- # spec's script_parameters hash is a translation of
- # self.tool.tool['inputs'] with defaults/overrides taken from
- # the job order. So we move the job parameters out of the way
- # and build a new spec['script_parameters'].
- job_params = spec['script_parameters']
- spec['script_parameters'] = {}
-
- for param in self.tool.tool['inputs']:
- param = copy.deepcopy(param)
-
- # Data type and "required" flag...
- types = param['type']
- if not isinstance(types, list):
- types = [types]
- param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
- if len(non_null_types) == 1:
- the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
- if dataclass:
- param['dataclass'] = dataclass
- # Note: If we didn't figure out a single appropriate
- # dataclass, we just left that attribute out. We leave
- # the "type" attribute there in any case, which might help
- # downstream.
-
- # Title and description...
- title = param.pop('label', '')
- descr = param.pop('description', '').rstrip('\n')
- if title:
- param['title'] = title
- if descr:
- param['description'] = descr
-
- # Fill in the value from the current job order, if any.
- param_id = shortname(param.pop('id'))
- value = job_params.get(param_id)
- if value is None:
- pass
- elif not isinstance(value, dict):
- param['value'] = value
- elif param.get('dataclass') == 'File' and value.get('path'):
- param['value'] = value['path']
-
- spec['script_parameters'][param_id] = param
- spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
- return spec
-
- def save(self):
- job_spec = self.pipeline_component_spec()
- response = self.runner.api.pipeline_templates().create(body={
- "components": {
- self.job.name: job_spec,
- },
- "name": self.job.name,
- "owner_uuid": self.runner.project_uuid,
- }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
- self.uuid = response["uuid"]
- logger.info("Created template %s", self.uuid)
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
- """Convert container-local paths to and from Keep collection ids."""
-
- def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
- uploadfiles = set()
-
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, collection_pattern % src[5:])
- if "#" in src:
- src = src[:src.index("#")]
- if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
- if uploadfiles:
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
- num_retries=3,
- fnPattern=file_pattern,
- name=name,
- project=arvrunner.project_uuid)
-
- for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
-
- self.keepdir = None
-
- def reversemap(self, target):
- if target.startswith("keep:"):
- return (target, target)
- elif self.keepdir and target.startswith(self.keepdir):
- return (target, "keep:" + target[len(self.keepdir)+1:])
- else:
- return super(ArvPathMapper, self).reversemap(target)
-
+import pkg_resources # part of setuptools
-class ArvadosCommandTool(CommandLineTool):
- """Wrap cwltool CommandLineTool to override selected methods."""
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
- def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
- self.arvrunner = arvrunner
- self.crunch2 = crunch2
+import arvados
+import arvados.events
- def makeJobRunner(self):
- if self.crunch2:
- return ArvadosContainer(self.arvrunner)
- else:
- return ArvadosJob(self.arvrunner)
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
- def makePathMapper(self, reffiles, **kwargs):
- if self.crunch2:
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
- else:
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+from cwltool.process import shortname, UnsupportedRequirement
+from arvados.api import OrderedJsonModel
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit crunch jobs, wait for them to
@@ -475,7 +106,10 @@ class ArvCwlRunner(object):
return tmpl.uuid
if kwargs.get("submit"):
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+ if self.crunch2:
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+ else:
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
# Create pipeline for local run
@@ -510,56 +144,54 @@ class ArvCwlRunner(object):
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- if kwargs.get("conformance_test"):
- return cwltool.main.single_job_executor(tool, job_order, **kwargs)
+ if kwargs.get("submit"):
+ jobiter = iter((runnerjob,))
else:
- if kwargs.get("submit"):
- jobiter = iter((runnerjob,))
- else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- docker_outdir="$(task.outdir)",
- **kwargs)
-
- try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
-
- for runnable in jobiter:
- if runnable:
- runnable.run(**kwargs)
- else:
- if self.jobs:
- self.cond.wait(1)
- else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
- break
-
- while self.jobs:
- self.cond.wait(1)
-
- events.close()
- except UnsupportedRequirement:
- raise
- except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
+ if "cwl_runner_job" in kwargs:
+ self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ docker_outdir="$(task.outdir)",
+ **kwargs)
+
+ try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
+ for runnable in jobiter:
+ if runnable:
+ runnable.run(**kwargs)
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- finally:
- self.cond.release()
+ if self.jobs:
+ self.cond.wait(1)
+ else:
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ break
+
+ while self.jobs:
+ self.cond.wait(1)
+
+ events.close()
+ except UnsupportedRequirement:
+ raise
+ except:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
+ logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
- if self.final_output is None:
- raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+ if self.final_output is None:
+ raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+ return self.final_output
- return self.final_output
def versionstring():
"""Print version string of key packages for provenance and debugging."""
@@ -572,6 +204,7 @@ def versionstring():
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
+
def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
@@ -634,6 +267,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
return parser
+
def main(args, stdout, stderr, api_client=None):
parser = arg_parser()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b0e2c1f..be11404 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,10 +1,15 @@
import logging
+import json
+import os
+
+from cwltool.errors import WorkflowException
+from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+
import arvados.collection
-from cwltool.process import get_feature, adjustFiles
+
from .arvdocker import arv_docker_get_image
from . import done
-from cwltool.errors import WorkflowException
-from cwltool.process import UnsupportedRequirement
+from .runner import Runner
logger = logging.getLogger('arvados.cwl-runner')
@@ -45,7 +50,7 @@ class ArvadosContainer(object):
if self.generatefiles:
raise UnsupportedRequirement("Generate files not supported")
- vwd = arvados.collection.Collection()
+ vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
container_request["task.vwd"] = {}
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
@@ -124,3 +129,78 @@ class ArvadosContainer(object):
self.output_callback(outputs, processStatus)
finally:
del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerContainer(Runner):
+ """Submit and manage a container that runs arvados-cwl-runner."""
+
+ def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ """Create an Arvados job specification for this workflow.
+
+ The returned dict can be used to create a job (i.e., passed as
+ the +body+ argument to jobs().create()), or as a component in
+ a pipeline template or pipeline instance.
+ """
+
+ workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+ with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
+ with jobobj.open("cwl.input.json", "w") as f:
+ json.dump(self.job_order, f, sort_keys=True, indent=4)
+ jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ workflowname = os.path.basename(self.tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+ workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ jobpath = "/var/lib/cwl/job/cwl.input.json"
+
+ container_image = arv_docker_get_image(self.arvrunner.api,
+ {"dockerImageId": "arvados/jobs"},
+ pull_image,
+ self.arvrunner.project_uuid)
+
+ return {
+ "command": ["arvados-cwl-runner", "--local", "--crunch2", workflowpath, jobpath],
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": self.name,
+ "output_path": "/var/spool/cwl",
+ "cwd": "/var/spool/cwl",
+ "priority": 1,
+ "state": "Committed",
+ "container_image": container_image,
+ "mounts": {
+ workflowpath: {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ },
+ jobpath: {
+ "kind": "collection",
+ "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/cwl.output.json"
+ }
+ },
+ "runtime_constraints": {
+ "vcpus": 1,
+ "ram": 1024*1024*256
+ }
+ }
+
+ def run(self, *args, **kwargs):
+ job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.uuid = response["uuid"]
+ self.arvrunner.jobs[response["container_uuid"]] = self
+
+ logger.info("Submitted container %s", response["uuid"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88a8eeb..397b6d5 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -1,11 +1,19 @@
import logging
import re
-from . import done
-from .arvdocker import arv_docker_get_image
-from cwltool.process import get_feature
+import copy
+
+from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
+from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.load_tool import fetch_document
+from cwltool.builder import Builder
+
import arvados.collection
+from .arvdocker import arv_docker_get_image
+from .runner import Runner
+from . import done
+
logger = logging.getLogger('arvados.cwl-runner')
tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
@@ -164,3 +172,145 @@ class ArvadosJob(object):
self.output_callback(outputs, processStatus)
finally:
del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerJob(Runner):
+ """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+ def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ """Create an Arvados job specification for this workflow.
+
+ The returned dict can be used to create a job (i.e., passed as
+ the +body+ argument to jobs().create()), or as a component in
+ a pipeline template or pipeline instance.
+ """
+
+ workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+ self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+ return {
+ "script": "cwl-runner",
+ "script_version": "master",
+ "repository": "arvados",
+ "script_parameters": self.job_order,
+ "runtime_constraints": {
+ "docker_image": "arvados/jobs"
+ }
+ }
+
+ def run(self, *args, **kwargs):
+ job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+ response = self.arvrunner.api.jobs().create(
+ body=job_spec,
+ find_or_create=self.enable_reuse
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.uuid = response["uuid"]
+ self.arvrunner.jobs[self.uuid] = self
+
+ logger.info("Submitted job %s", response["uuid"])
+
+ if kwargs.get("submit"):
+ self.pipeline = self.arvrunner.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": shortname(self.tool.tool["id"]),
+ "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
+ "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+
+
+class RunnerTemplate(object):
+ """An Arvados pipeline template that invokes a CWL workflow."""
+
+ type_to_dataclass = {
+ 'boolean': 'boolean',
+ 'File': 'File',
+ 'float': 'number',
+ 'int': 'number',
+ 'string': 'text',
+ }
+
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.runner = runner
+ self.tool = tool
+ self.job = RunnerJob(
+ runner=runner,
+ tool=tool,
+ job_order=job_order,
+ enable_reuse=enable_reuse)
+
+ def pipeline_component_spec(self):
+ """Return a component that Workbench and a-r-p-i will understand.
+
+ Specifically, translate CWL input specs to Arvados pipeline
+ format, like {"dataclass":"File","value":"xyz"}.
+ """
+ spec = self.job.arvados_job_spec()
+
+ # Most of the component spec is exactly the same as the job
+ # spec (script, script_version, etc.).
+ # spec['script_parameters'] isn't right, though. A component
+ # spec's script_parameters hash is a translation of
+ # self.tool.tool['inputs'] with defaults/overrides taken from
+ # the job order. So we move the job parameters out of the way
+ # and build a new spec['script_parameters'].
+ job_params = spec['script_parameters']
+ spec['script_parameters'] = {}
+
+ for param in self.tool.tool['inputs']:
+ param = copy.deepcopy(param)
+
+ # Data type and "required" flag...
+ types = param['type']
+ if not isinstance(types, list):
+ types = [types]
+ param['required'] = 'null' not in types
+ non_null_types = set(types) - set(['null'])
+ if len(non_null_types) == 1:
+ the_type = [c for c in non_null_types][0]
+ dataclass = self.type_to_dataclass.get(the_type)
+ if dataclass:
+ param['dataclass'] = dataclass
+ # Note: If we didn't figure out a single appropriate
+ # dataclass, we just left that attribute out. We leave
+ # the "type" attribute there in any case, which might help
+ # downstream.
+
+ # Title and description...
+ title = param.pop('label', '')
+ descr = param.pop('description', '').rstrip('\n')
+ if title:
+ param['title'] = title
+ if descr:
+ param['description'] = descr
+
+ # Fill in the value from the current job order, if any.
+ param_id = shortname(param.pop('id'))
+ value = job_params.get(param_id)
+ if value is None:
+ pass
+ elif not isinstance(value, dict):
+ param['value'] = value
+ elif param.get('dataclass') == 'File' and value.get('path'):
+ param['value'] = value['path']
+
+ spec['script_parameters'][param_id] = param
+ spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+ return spec
+
+ def save(self):
+ job_spec = self.pipeline_component_spec()
+ response = self.runner.api.pipeline_templates().create(body={
+ "components": {
+ self.job.name: job_spec,
+ },
+ "name": self.job.name,
+ "owner_uuid": self.runner.project_uuid,
+ }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
+ self.uuid = response["uuid"]
+ logger.info("Created template %s", self.uuid)
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
new file mode 100644
index 0000000..a2c5c9e
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -0,0 +1,30 @@
+from cwltool.draft2tool import CommandLineTool
+from .arvjob import ArvadosJob
+from .arvcontainer import ArvadosContainer
+from .pathmapper import ArvPathMapper
+
+class ArvadosCommandTool(CommandLineTool):
+ """Wrap cwltool CommandLineTool to override selected methods."""
+
+ def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ self.arvrunner = arvrunner
+ self.crunch2 = crunch2
+
+ def makeJobRunner(self):
+ if self.crunch2:
+ return ArvadosContainer(self.arvrunner)
+ else:
+ return ArvadosJob(self.arvrunner)
+
+ def makePathMapper(self, reffiles, **kwargs):
+ if self.crunch2:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s",
+ "/keep/%s/%s",
+ **kwargs)
+ else:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
new file mode 100644
index 0000000..c911895
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -0,0 +1,59 @@
+import fnmatch
+
+class CollectionFsAccess(cwltool.process.StdFsAccess):
+ """Implement the cwltool FsAccess interface for Arvados Collections."""
+
+ def __init__(self, basedir):
+ super(CollectionFsAccess, self).__init__(basedir)
+ self.collections = {}
+
+ def get_collection(self, path):
+ p = path.split("/")
+ if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
+ pdh = p[0][5:]
+ if pdh not in self.collections:
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+ return (self.collections[pdh], "/".join(p[1:]))
+ else:
+ return (None, path)
+
+ def _match(self, collection, patternsegments, parent):
+ if not patternsegments:
+ return []
+
+ if not isinstance(collection, arvados.collection.RichCollectionBase):
+ return []
+
+ ret = []
+ # iterate over the files and subcollections in 'collection'
+ for filename in collection:
+ if patternsegments[0] == '.':
+ # Pattern contains something like "./foo" so just shift
+ # past the "./"
+ ret.extend(self._match(collection, patternsegments[1:], parent))
+ elif fnmatch.fnmatch(filename, patternsegments[0]):
+ cur = os.path.join(parent, filename)
+ if len(patternsegments) == 1:
+ ret.append(cur)
+ else:
+ ret.extend(self._match(collection[filename], patternsegments[1:], cur))
+ return ret
+
+ def glob(self, pattern):
+ collection, rest = self.get_collection(pattern)
+ patternsegments = rest.split("/")
+ return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
+
+ def open(self, fn, mode):
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return collection.open(rest, mode)
+ else:
+ return open(self._abs(fn), mode)
+
+ def exists(self, fn):
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return collection.exists(rest)
+ else:
+ return os.path.exists(self._abs(fn))
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
new file mode 100644
index 0000000..9538a91
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -0,0 +1,55 @@
+import re
+
+import arvados.commands.run
+import arvados.collection
+import cwltool.pathmapper
+
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ """Convert container-local paths to and from Keep collection ids."""
+
+ def __init__(self, arvrunner, referenced_files, input_basedir,
+ collection_pattern, file_pattern, name=None, **kwargs):
+ self._pathmap = arvrunner.get_uploaded()
+ uploadfiles = set()
+
+ pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+
+ for src in referenced_files:
+ if isinstance(src, basestring) and pdh_path.match(src):
+ self._pathmap[src] = (src, collection_pattern % src[5:])
+ if "#" in src:
+ src = src[:src.index("#")]
+ if src not in self._pathmap:
+ ab = cwltool.pathmapper.abspath(src, input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
+ if kwargs.get("conformance_test"):
+ self._pathmap[src] = (src, ab)
+ elif isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = (ab, st.fn)
+ else:
+ raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+
+ if uploadfiles:
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
+ arvrunner.api,
+ dry_run=kwargs.get("dry_run"),
+ num_retries=3,
+ fnPattern=file_pattern,
+ name=name,
+ project=arvrunner.project_uuid)
+
+ for src, ab, st in uploadfiles:
+ arvrunner.add_uploaded(src, (ab, st.fn))
+ self._pathmap[src] = (ab, st.fn)
+
+ self.keepdir = None
+
+ def reversemap(self, target):
+ if target.startswith("keep:"):
+ return (target, target)
+ elif self.keepdir and target.startswith(self.keepdir):
+ return (target, "keep:" + target[len(self.keepdir)+1:])
+ else:
+ return super(ArvPathMapper, self).reversemap(target)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
new file mode 100644
index 0000000..0cc23ab
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -0,0 +1,98 @@
+import os
+import urlparse
+from functools import partial
+
+from cwltool.draft2tool import CommandLineTool
+import cwltool.workflow
+from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.load_tool import fetch_document
+
+from .arvdocker import arv_docker_get_image
+from .pathmapper import ArvPathMapper
+
+class Runner(object):
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.arvrunner = runner
+ self.tool = tool
+ self.job_order = job_order
+ self.running = False
+ self.enable_reuse = enable_reuse
+
+ def update_pipeline_component(self, record):
+ pass
+
+ def upload_docker(self, tool):
+ if isinstance(tool, CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ self.upload_docker(s.embedded_tool)
+
+
+ def arvados_job_spec(self, *args, **kwargs):
+ self.upload_docker(self.tool)
+
+ workflowfiles = set()
+ jobfiles = set()
+ workflowfiles.add(self.tool.tool["id"])
+
+ self.name = os.path.basename(self.tool.tool["id"])
+
+ def visitFiles(files, path):
+ files.add(path)
+ return path
+
+ document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+ def loadref(b, u):
+ return document_loader.fetch(urlparse.urljoin(b, u))
+
+ sc = scandeps(uri, workflowobj,
+ set(("$import", "run")),
+ set(("$include", "$schemas", "path")),
+ loadref)
+ adjustFiles(sc, partial(visitFiles, workflowfiles))
+ adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+
+ workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+ "%s",
+ "%s/%s",
+ name=self.name,
+ **kwargs)
+
+ jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(self.job_order.get("id", "#")),
+ **kwargs)
+
+ adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+ if "id" in self.job_order:
+ del self.job_order["id"]
+
+ return workflowmapper
+
+
+ def done(self, record):
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = None
+ try:
+ try:
+ outc = arvados.collection.Collection(record["output"])
+ with outc.open("cwl.output.json") as f:
+ outputs = json.load(f)
+ def keepify(path):
+ if not path.startswith("keep:"):
+ return "keep:%s/%s" % (record["output"], path)
+ adjustFiles(outputs, keepify)
+ except Exception as e:
+ logger.error("While getting final output object: %s", e)
+ self.arvrunner.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_container.py
similarity index 64%
copy from sdk/cwl/tests/test_job.py
copy to sdk/cwl/tests/test_container.py
index dba65b0..0237e80 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_container.py
@@ -10,14 +10,20 @@ if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-class TestJob(unittest.TestCase):
+class TestContainer(unittest.TestCase):
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- def test_run(self):
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_run(self, keepdocker):
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
tool = {
@@ -25,46 +31,44 @@ class TestJob(unittest.TestCase):
"outputs": [],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names, basedir="")
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run"):
j.run()
- runner.api.jobs().create.assert_called_with(
+ runner.api.container_requests().create.assert_called_with(
body={
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls']
- }],
+ 'environment': {
+ 'TMPDIR': '/tmp'
},
- 'script_version': 'master',
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
+ 'name': 'test_run',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 1,
- 'min_ram_mb_per_node': 1024,
- 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
- }
- },
- find_or_create=True,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']]
- )
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }, 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls'],
+ 'cwd': '/var/spool/cwl'
+ })
# The test passes some fields in builder.resources
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- def test_resource_requirements(self):
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_resource_requirements(self, keepdocker):
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
tool = {
"inputs": [],
"outputs": [],
@@ -76,36 +80,31 @@ class TestJob(unittest.TestCase):
}],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements"):
j.run()
- runner.api.jobs().create.assert_called_with(
+
+ runner.api.container_requests().create.assert_called_with(
body={
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls']
- }]
- },
- 'script_version': 'master',
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
+ 'environment': {
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_resource_requirements',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 3,
- 'min_ram_mb_per_node': 3000,
- 'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
- }
- },
- find_or_create=True,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']])
+ 'vcpus': 3,
+ 'ram': 3145728000
+ }, 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls'],
+ 'cwd': '/var/spool/cwl'
+ })
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
@@ -121,7 +120,7 @@ class TestJob(unittest.TestCase):
api.collections().list().execute.side_effect = ({"items": []},
{"items": [{"manifest_text": "XYZ"}]})
- arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob = arvados_cwl.ArvadosContainer(runner)
arvjob.name = "testjob"
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
@@ -163,7 +162,7 @@ class TestJob(unittest.TestCase):
col().open.return_value = []
api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
- arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob = arvados_cwl.ArvadosContainer(runner)
arvjob.name = "testjob"
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index dba65b0..701afcb 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -25,7 +25,7 @@ class TestJob(unittest.TestCase):
"outputs": [],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names, basedir="")
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir=""):
j.run()
@@ -76,7 +76,7 @@ class TestJob(unittest.TestCase):
}],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names)
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir=""):
j.run()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 38741eb..48e6ed2 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -30,7 +30,7 @@ def stubs(func):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
stubs.KeepClient().put.side_effect = putstub
- stubs.keepdocker.return_value = True
+ stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
stubs.api = mock.MagicMock()
@@ -44,12 +44,28 @@ def stubs(func):
}, {
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"portable_data_hash": "99999999999999999999999999999992+99",
+ },
+ {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ""
})
+ stubs.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
stubs.api.jobs().create().execute.return_value = {
"uuid": stubs.expect_job_uuid,
"state": "Queued",
}
+
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+ stubs.api.container_requests().create().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
stubs.api.pipeline_templates().create().execute.return_value = {
"uuid": stubs.expect_pipeline_template_uuid,
@@ -70,6 +86,35 @@ def stubs(func):
'script_version': 'master',
'script': 'cwl-runner'
}
+
+ stubs.expect_container_spec = {
+ 'priority': 1,
+ 'mounts': {
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow/submit_wf.cwl': {
+ 'portable_data_hash': '999999999999999999999999991+99',
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': '102435082199e5229f99b01165b67096+60/cwl.input.json',
+ 'kind': 'collection'
+ }
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'command': ['arvados-cwl-runner', '--local', '--crunch2', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+ 'name': 'submit_wf.cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'output_path': '/var/spool/cwl',
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 268435456
+ }
+ }
return func(self, stubs, *args, **kwargs)
return wrapped
@@ -128,6 +173,41 @@ class TestSubmit(unittest.TestCase):
body=expect_body,
find_or_create=True)
+ @stubs
+ def test_submit_container(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--crunch2", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.collections().create.assert_has_calls([
+ mock.call(),
+ mock.call(body={
+ 'manifest_text':
+ './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
+ '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
+ 'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': 'submit_wf.cwl',
+ }, ensure_unique_name=True),
+ mock.call().execute(),
+ mock.call(body={
+ 'manifest_text':
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': '#',
+ }, ensure_unique_name=True),
+ mock.call().execute()])
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
class TestCreateTemplate(unittest.TestCase):
@stubs
diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock
index 7be4e0f..3715718 100644
--- a/services/api/Gemfile.lock
+++ b/services/api/Gemfile.lock
@@ -257,6 +257,3 @@ DEPENDENCIES
therubyracer
trollop
uglifier (>= 1.0.3)
-
-BUNDLED WITH
- 1.12.1
commit bcff3bcef60cc9132a376443cf1a8dc8598625be
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 10:36:05 2016 -0400
8442: Fix message
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 17fe8cb..b0e2c1f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -43,7 +43,7 @@ class ArvadosContainer(object):
}
if self.generatefiles:
- raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ raise UnsupportedRequirement("Generate files not supported")
vwd = arvados.collection.Collection()
container_request["task.vwd"] = {}
commit 8a3d7a457a25278c42374e76057e60d1b0d90819
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 15:16:09 2016 -0400
8442: raise UnsupportedRequirement for unsupported features in the conformance
tests. Bump cwltool dependency.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 171d92d..af74808 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ from .arvcontainer import ArvadosContainer
from .arvjob import ArvadosJob
from .arvdocker import arv_docker_get_image
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
@@ -477,7 +477,7 @@ class ArvCwlRunner(object):
if kwargs.get("submit"):
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
@@ -543,11 +543,13 @@ class ArvCwlRunner(object):
self.cond.wait(1)
events.close()
+ except UnsupportedRequirement:
+ raise
except:
if sys.exc_info()[0] is KeyboardInterrupt:
logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0cb885f..17fe8cb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -4,6 +4,7 @@ from cwltool.process import get_feature, adjustFiles
from .arvdocker import arv_docker_get_image
from . import done
from cwltool.errors import WorkflowException
+from cwltool.process import UnsupportedRequirement
logger = logging.getLogger('arvados.cwl-runner')
@@ -42,6 +43,8 @@ class ArvadosContainer(object):
}
if self.generatefiles:
+ raise UnsupportedRequirement("Stdin redirection currently not suppported")
+
vwd = arvados.collection.Collection()
container_request["task.vwd"] = {}
for t in self.generatefiles:
@@ -60,13 +63,12 @@ class ArvadosContainer(object):
if self.environment:
container_request["environment"].update(self.environment)
- # TODO, not supported by crunchv2 yet
- #if self.stdin:
- # container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+ if self.stdin:
+ raise UnsupportedRequirement("Stdin redirection currently not suppported")
if self.stdout:
mounts["stdout"] = {"kind": "file",
- "path": self.stdout}
+ "path": "/var/spool/cwl/%s" % (self.stdout)}
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if not docker_req:
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 591bdde..b1ff7f3 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool==1.0.20160519182434',
+ 'cwltool==1.0.20160609160402',
'arvados-python-client>=0.1.20160322001610'
],
data_files=[
commit 8cef5b3ebb0f9f2fcf5e0886c719d7fe8c5a1a5c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 08:21:34 2016 -0400
8442: Setting up mount points works. Capturing output works.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 387bc4a..171d92d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
import urlparse
from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
from .arvdocker import arv_docker_get_image
from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
@@ -36,10 +37,6 @@ from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -98,194 +95,6 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
else:
return os.path.exists(self._abs(fn))
-class ArvadosJob(object):
- """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
- def __init__(self, runner):
- self.arvrunner = runner
- self.running = False
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- if self.generatefiles:
- vwd = arvados.collection.Collection()
- script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t])
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
- script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
- else:
- runtime_constraints["docker_image"] = "arvados/jobs"
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- filters = [["repository", "=", "arvados"],
- ["script", "=", "crunchrunner"],
- ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
- if not self.arvrunner.ignore_docker_for_reuse:
- filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
- try:
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=kwargs.get("enable_reuse", True)
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- self.update_pipeline_component(response)
-
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
- body={
- "components": components
- }).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- try:
- outputs = {}
- if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
- except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
- processStatus = "permanentFail"
-
- self.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
class RunnerJob(object):
@@ -574,10 +383,16 @@ class ArvadosCommandTool(CommandLineTool):
return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+ if self.crunch2:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s",
+ "/keep/%s/%s",
+ **kwargs)
+ else:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
class ArvCwlRunner(object):
@@ -676,7 +491,10 @@ class ArvCwlRunner(object):
runnerjob.run()
return runnerjob.uuid
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ if self.crunch2:
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
+ else:
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
self.debug = kwargs.get("debug")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 2179398..0cb885f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,7 +1,9 @@
import logging
import arvados.collection
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles
from .arvdocker import arv_docker_get_image
+from . import done
+from cwltool.errors import WorkflowException
logger = logging.getLogger('arvados.cwl-runner')
@@ -12,6 +14,9 @@ class ArvadosContainer(object):
self.arvrunner = runner
self.running = False
+ def update_pipeline_component(self, r):
+ pass
+
def run(self, dry_run=False, pull_image=True, **kwargs):
container_request = {
"command": self.command_line,
@@ -26,13 +31,15 @@ class ArvadosContainer(object):
mounts = {
"/var/spool/cwl": {
"kind": "tmp"
- },
- "/tmp": {
- "kind": "tmp"
}
}
- # TODO mount normal inputs...
+ for f in self.pathmapper.files():
+ _, p = self.pathmapper.mapper(f)
+ mounts[p] = {
+ "kind": "collection",
+ "portable_data_hash": p[6:]
+ }
if self.generatefiles:
vwd = arvados.collection.Collection()
@@ -53,7 +60,7 @@ class ArvadosContainer(object):
if self.environment:
container_request["environment"].update(self.environment)
- # TODO, not supported
+ # TODO, not supported by crunchv2 yet
#if self.stdin:
# container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
@@ -84,11 +91,11 @@ class ArvadosContainer(object):
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
- self.arvrunner.jobs[response["uuid"]] = self
+ self.arvrunner.jobs[response["container_uuid"]] = self
- logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+ logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
- if response["state"] in ("Complete", "Cancelled"):
+ if response["state"] == "Final":
self.done(response)
except Exception as e:
logger.error("Got error %s" % str(e))
@@ -104,69 +111,9 @@ class ArvadosContainer(object):
try:
outputs = {}
if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
+ outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
except Exception as e:
logger.exception("Got unknown exception while collecting job outputs:")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
new file mode 100644
index 0000000..88a8eeb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -0,0 +1,166 @@
+import logging
+import re
+from . import done
+from .arvdocker import arv_docker_get_image
+from cwltool.process import get_feature
+from cwltool.errors import WorkflowException
+import arvados.collection
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+class ArvadosJob(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+ def __init__(self, runner):
+ self.arvrunner = runner
+ self.running = False
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ script_parameters = {
+ "command": self.command_line
+ }
+ runtime_constraints = {}
+
+ if self.generatefiles:
+ vwd = arvados.collection.Collection()
+ script_parameters["task.vwd"] = {}
+ for t in self.generatefiles:
+ if isinstance(self.generatefiles[t], dict):
+ src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+ vwd.copy(rest, t, source_collection=src)
+ else:
+ with vwd.open(t, "w") as f:
+ f.write(self.generatefiles[t])
+ vwd.save_new()
+ for t in self.generatefiles:
+ script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+ script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+ if self.environment:
+ script_parameters["task.env"].update(self.environment)
+
+ if self.stdin:
+ script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+ if self.stdout:
+ script_parameters["task.stdout"] = self.stdout
+
+ (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ if docker_req and kwargs.get("use_container") is not False:
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+ else:
+ runtime_constraints["docker_image"] = "arvados/jobs"
+
+ resources = self.builder.resources
+ if resources is not None:
+ runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+ runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+ runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+ filters = [["repository", "=", "arvados"],
+ ["script", "=", "crunchrunner"],
+ ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+ if not self.arvrunner.ignore_docker_for_reuse:
+ filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
+ try:
+ response = self.arvrunner.api.jobs().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "script": "crunchrunner",
+ "repository": "arvados",
+ "script_version": "master",
+ "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+ "script_parameters": {"tasks": [script_parameters]},
+ "runtime_constraints": runtime_constraints
+ },
+ filters=filters,
+ find_or_create=kwargs.get("enable_reuse", True)
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ self.update_pipeline_component(response)
+
+ logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+ except Exception as e:
+ logger.error("Got error %s" % str(e))
+ self.output_callback({}, "permanentFail")
+
+ def update_pipeline_component(self, record):
+ if self.arvrunner.pipeline:
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
+ if self.arvrunner.uuid:
+ try:
+ job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+ if job:
+ components = job["components"]
+ components[self.name] = record["uuid"]
+ self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+ body={
+ "components": components
+ }).execute(num_retries=self.arvrunner.num_retries)
+ except Exception as e:
+ logger.info("Error adding to components: %s", e)
+
+ def done(self, record):
+ try:
+ self.update_pipeline_component(record)
+ except:
+ pass
+
+ try:
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = {}
+ try:
+ if record["output"]:
+ logc = arvados.collection.Collection(record["log"])
+ log = logc.open(logc.keys()[0])
+ tmpdir = None
+ outdir = None
+ keepdir = None
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
+ g = tmpdirre.match(l)
+ if g:
+ tmpdir = g.group(1)
+ g = outdirre.match(l)
+ if g:
+ outdir = g.group(1)
+ g = keepre.match(l)
+ if g:
+ keepdir = g.group(1)
+
+ outputs = done.done(self, record, tmpdir, outdir, keepdir)
+ except WorkflowException as e:
+ logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ processStatus = "permanentFail"
+ except Exception as e:
+ logger.exception("Got unknown exception while collecting job outputs:")
+ processStatus = "permanentFail"
+
+ self.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
new file mode 100644
index 0000000..8a6fc9d
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -0,0 +1,38 @@
+def done(self, record, tmpdir, outdir, keepdir):
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
+ # First, get output record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
+
+ self.builder.outdir = outdir
+ self.builder.pathmapper.keepdir = keepdir
+ return self.collect_outputs("keep:" + record["output"])
commit 74a8823c39d5b465504be2de85f22d49a45f548d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 8 11:45:00 2016 -0400
8442: Return PDH for Docker container. Working on setting up mount points.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5a1ff07..387bc4a 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,9 +40,6 @@ tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4ea63a0..2179398 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -23,7 +23,16 @@ class ArvadosContainer(object):
"state": "Committed"
}
runtime_constraints = {}
- mounts = {}
+ mounts = {
+ "/var/spool/cwl": {
+ "kind": "tmp"
+ },
+ "/tmp": {
+ "kind": "tmp"
+ }
+ }
+
+ # TODO mount normal inputs...
if self.generatefiles:
vwd = arvados.collection.Collection()
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 8f76bbf..253df99 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -26,6 +26,11 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
logger.info("Uploading Docker image %s", ":".join(args[1:]))
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
- # XXX return PDH instead
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag)
+
+ #return dockerRequirement["dockerImageId"]
- return dockerRequirement["dockerImageId"]
+ pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+ return pdh
commit a137e05cae9fe0f1d8ea90e0e2fd16fc76167b63
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 8 11:15:58 2016 -0400
8442: Submit containers Work in progess.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0878998..5a1ff07 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -27,6 +27,8 @@ import threading
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
import urlparse
+from .arvcontainer import ArvadosContainer
+from .arvdocker import arv_docker_get_image
from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
from arvados.api import OrderedJsonModel
@@ -39,31 +41,6 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
- """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
-
- if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
- dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
- sp = dockerRequirement["dockerImageId"].split(":")
- image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
-
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
- image_name=image_name,
- image_tag=image_tag)
-
- if not images:
- imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = ["--project-uuid="+project_uuid, image_name]
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
- arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-
- # XXX return PDH instead
-
- return dockerRequirement["dockerImageId"]
class CollectionFsAccess(cwltool.process.StdFsAccess):
@@ -588,12 +565,13 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
+ def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
+ self.crunch2 = crunch2
def makeJobRunner(self):
- if kwargs.get("crunch2"):
+ if self.crunch2:
return ArvadosContainer(self.arvrunner)
else:
return ArvadosJob(self.arvrunner)
@@ -609,7 +587,7 @@ class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit crunch jobs, wait for them to
complete, and report output."""
- def __init__(self, api_client):
+ def __init__(self, api_client, crunch2):
self.api = api_client
self.jobs = {}
self.lock = threading.Lock()
@@ -618,10 +596,11 @@ class ArvCwlRunner(object):
self.uploaded = {}
self.num_retries = 4
self.uuid = None
+ self.crunch2 = crunch2
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
else:
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
@@ -709,7 +688,7 @@ class ArvCwlRunner(object):
kwargs["fs_access"] = self.fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- if kwargs.get("crunch2"):
+ if self.crunch2:
kwargs["outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
else:
@@ -849,7 +828,7 @@ def main(args, stdout, stderr, api_client=None):
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client)
+ runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
except Exception as e:
logger.error(e)
return 1
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index fe0f7ca..4ea63a0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,3 +1,10 @@
+import logging
+import arvados.collection
+from cwltool.process import get_feature
+from .arvdocker import arv_docker_get_image
+
+logger = logging.getLogger('arvados.cwl-runner')
+
class ArvadosContainer(object):
"""Submit and manage a Crunch job for executing a CWL CommandLineTool."""
@@ -7,12 +14,13 @@ class ArvadosContainer(object):
def run(self, dry_run=False, pull_image=True, **kwargs):
container_request = {
- "command": self.command_line
+ "command": self.command_line,
"owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
- "output_path", "/var/spool/cwl",
- "cwd", "/var/spool/cwl",
- "priority": 1
+ "output_path": "/var/spool/cwl",
+ "cwd": "/var/spool/cwl",
+ "priority": 1,
+ "state": "Committed"
}
runtime_constraints = {}
mounts = {}
@@ -46,7 +54,7 @@ class ArvadosContainer(object):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if not docker_req:
- docker_req = "arvados/jobs"
+ docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
new file mode 100644
index 0000000..8f76bbf
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -0,0 +1,31 @@
+import logging
+import cwltool.docker
+import arvados.commands.keepdocker
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+ """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
+ if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+ dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+ sp = dockerRequirement["dockerImageId"].split(":")
+ image_name = sp[0]
+ image_tag = sp[1] if len(sp) > 1 else None
+
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag)
+
+ if not images:
+ imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+ args = ["--project-uuid="+project_uuid, image_name]
+ if image_tag:
+ args.append(image_tag)
+ logger.info("Uploading Docker image %s", ":".join(args[1:]))
+ arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+
+ # XXX return PDH instead
+
+ return dockerRequirement["dockerImageId"]
commit f892414e96571ff5941d36e41582d255b9bb7b17
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 15:54:35 2016 -0400
8442: Add --crunch1/--crunch2 switch
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index bf7ccc0..0878998 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -824,6 +824,15 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
default=True, dest="wait")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--crunch1", action="store_false",
+ default=False, dest="crunch2",
+ help="Use Crunch v1 Jobs API")
+
+ exgroup.add_argument("--crunch2", action="store_true",
+ default=False, dest="crunch2",
+ help="Use Crunch v2 Containers API")
+
parser.add_argument("workflow", type=str, nargs="?", default=None)
parser.add_argument("job_order", nargs=argparse.REMAINDER)
commit 533929a3e8629ad92aefde14fc63d16789d1cf7b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 15:24:50 2016 -0400
8442: CWL create crunch2 containers WIP
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 371dd4f..bf7ccc0 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
logger.info("Uploading Docker image %s", ":".join(args[1:]))
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+ # XXX return PDH instead
+
return dockerRequirement["dockerImageId"]
@@ -591,7 +593,10 @@ class ArvadosCommandTool(CommandLineTool):
self.arvrunner = arvrunner
def makeJobRunner(self):
- return ArvadosJob(self.arvrunner)
+ if kwargs.get("crunch2"):
+ return ArvadosContainer(self.arvrunner)
+ else:
+ return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, **kwargs):
return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -704,8 +709,12 @@ class ArvCwlRunner(object):
kwargs["fs_access"] = self.fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ if kwargs.get("crunch2"):
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["tmpdir"] = "/tmp"
+ else:
+ kwargs["outdir"] = "$(task.outdir)"
+ kwargs["tmpdir"] = "$(task.tmpdir)"
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644
index 0000000..fe0f7ca
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+ def __init__(self, runner):
+ self.arvrunner = runner
+ self.running = False
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ container_request = {
+ "command": self.command_line
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": self.name,
+ "output_path", "/var/spool/cwl",
+ "cwd", "/var/spool/cwl",
+ "priority": 1
+ }
+ runtime_constraints = {}
+ mounts = {}
+
+ if self.generatefiles:
+ vwd = arvados.collection.Collection()
+ container_request["task.vwd"] = {}
+ for t in self.generatefiles:
+ if isinstance(self.generatefiles[t], dict):
+ src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+ vwd.copy(rest, t, source_collection=src)
+ else:
+ with vwd.open(t, "w") as f:
+ f.write(self.generatefiles[t])
+ vwd.save_new()
+ # TODO
+ # for t in self.generatefiles:
+ # container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+ container_request["environment"] = {"TMPDIR": "/tmp"}
+ if self.environment:
+ container_request["environment"].update(self.environment)
+
+ # TODO, not supported
+ #if self.stdin:
+ # container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+ if self.stdout:
+ mounts["stdout"] = {"kind": "file",
+ "path": self.stdout}
+
+ (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ if not docker_req:
+ docker_req = "arvados/jobs"
+
+ container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+ docker_req,
+ pull_image,
+ self.arvrunner.project_uuid)
+
+ resources = self.builder.resources
+ if resources is not None:
+ runtime_constraints["vcpus"] = resources.get("cores", 1)
+ runtime_constraints["ram"] = resources.get("ram") * 2**20
+ #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+ container_request["mounts"] = mounts
+ container_request["runtime_constraints"] = runtime_constraints
+
+ try:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+ if response["state"] in ("Complete", "Cancelled"):
+ self.done(response)
+ except Exception as e:
+ logger.error("Got error %s" % str(e))
+ self.output_callback({}, "permanentFail")
+
+ def done(self, record):
+ try:
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ try:
+ outputs = {}
+ if record["output"]:
+ logc = arvados.collection.Collection(record["log"])
+ log = logc.open(logc.keys()[0])
+ tmpdir = None
+ outdir = None
+ keepdir = None
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
+ g = tmpdirre.match(l)
+ if g:
+ tmpdir = g.group(1)
+ g = outdirre.match(l)
+ if g:
+ outdir = g.group(1)
+ g = keepre.match(l)
+ if g:
+ keepdir = g.group(1)
+
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
+ # First, get output record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
+
+ self.builder.outdir = outdir
+ self.builder.pathmapper.keepdir = keepdir
+ outputs = self.collect_outputs("keep:" + record["output"])
+ except WorkflowException as e:
+ logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ processStatus = "permanentFail"
+ except Exception as e:
+ logger.exception("Got unknown exception while collecting job outputs:")
+ processStatus = "permanentFail"
+
+ self.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
commit 2cd1c3ed705e639fb9e4ef067a32b278a6d3d4ee
Merge: 0ac69ce f32e1aa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jun 14 16:24:55 2016 -0400
Merge branch '9388-websocket-every-notify' closes #9388
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list