[ARVADOS] updated: 0fb601b385db9c25d083cb7fd058cafe57bb76ac
Git user
git at public.curoverse.com
Mon Jun 13 16:24:09 EDT 2016
Summary of changes:
apps/workbench/Gemfile.lock | 3 -
sdk/cwl/arvados_cwl/__init__.py | 496 +++------------------
sdk/cwl/arvados_cwl/arvcontainer.py | 93 +++-
sdk/cwl/arvados_cwl/arvjob.py | 156 ++++++-
sdk/cwl/arvados_cwl/arvtool.py | 30 ++
sdk/cwl/arvados_cwl/fsaccess.py | 60 +++
sdk/cwl/arvados_cwl/pathmapper.py | 55 +++
sdk/cwl/arvados_cwl/runner.py | 98 ++++
sdk/cwl/tests/{test_job.py => test_container.py} | 113 +++--
sdk/cwl/tests/test_job.py | 4 +-
sdk/cwl/tests/test_submit.py | 82 +++-
sdk/go/dispatch/dispatch.go | 16 +-
services/api/Gemfile.lock | 3 -
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 4 +-
14 files changed, 704 insertions(+), 509 deletions(-)
create mode 100644 sdk/cwl/arvados_cwl/arvtool.py
create mode 100644 sdk/cwl/arvados_cwl/fsaccess.py
create mode 100644 sdk/cwl/arvados_cwl/pathmapper.py
create mode 100644 sdk/cwl/arvados_cwl/runner.py
copy sdk/cwl/tests/{test_job.py => test_container.py} (64%)
via 0fb601b385db9c25d083cb7fd058cafe57bb76ac (commit)
via df973657a4c4acf59fd8adef5b7379f016973fe5 (commit)
via 65f2cd37495d29bfa1d4c234870e8c90d2d3dcfa (commit)
from 31547d143307cd7fe1e7ef91bcdaf450dca2a5e4 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0fb601b385db9c25d083cb7fd058cafe57bb76ac
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:22:46 2016 -0400
8442: Introduce a RuntimeConstraints struct instead of using a map
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index 54d596f..c0e4f14 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -31,13 +31,19 @@ type apiClientAuthorizationList struct {
Items []apiClientAuthorization `json:"items"`
}
+type RuntimeConstraints struct {
+ API *bool `json:"API"`
+ Ram int64 `json:"ram"`
+ Vcpus int64 `json:"vcpus"`
+}
+
// Represents an Arvados container record
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
- RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
- LockedByUUID string `json:"locked_by_uuid"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index f718fbc..9b2cf0c 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -70,11 +70,11 @@ func doMain() error {
// sbatchCmd
func sbatchFunc(container dispatch.Container) *exec.Cmd {
- memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
+ memPerCPU := math.Ceil((float64(container.RuntimeConstraints.Ram)) / (float64(container.RuntimeConstraints.Vcpus * 1048576)))
return exec.Command("sbatch", "--share", "--parsable",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
- fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
+ fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints.Vcpus)),
fmt.Sprintf("--priority=%d", container.Priority))
}
commit df973657a4c4acf59fd8adef5b7379f016973fe5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 16:17:45 2016 -0400
8442: Debugging container --submit with crunch2
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 136d5ae..ba26816 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,7 @@ import arvados.events
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from .arvtool import ArvadosCommandTool
+from .fsaccess import CollectionFsAccess
from cwltool.process import shortname, UnsupportedRequirement
from arvados.api import OrderedJsonModel
@@ -184,6 +185,9 @@ class ArvCwlRunner(object):
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if runnerjob and self.crunch2:
+ self.api.container_requests().update(uuid=runnerjob.uuid,
+ body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index be11404..4c80896 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -152,7 +152,7 @@ class RunnerContainer(Runner):
workflowname = os.path.basename(self.tool.tool["id"])
workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ workflowcollection = workflowcollection[workflowcollection.index('/')]
jobpath = "/var/lib/cwl/job/cwl.input.json"
container_image = arv_docker_get_image(self.arvrunner.api,
@@ -181,11 +181,16 @@ class RunnerContainer(Runner):
"stdout": {
"kind": "file",
"path": "/var/spool/cwl/cwl.output.json"
+ },
+ "/var/spool/cwl": {
+ "kind": "collection",
+ "writable": True
}
},
"runtime_constraints": {
"vcpus": 1,
- "ram": 1024*1024*256
+ "ram": 1024*1024*256,
+ "API": True
}
}
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index c911895..d48c93a 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -1,4 +1,5 @@
import fnmatch
+import cwltool.process
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
commit 65f2cd37495d29bfa1d4c234870e8c90d2d3dcfa
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 13 15:45:29 2016 -0400
8442: Adding --submit support with --crunch2. General refactoring into more/smaller files.
diff --git a/apps/workbench/Gemfile.lock b/apps/workbench/Gemfile.lock
index fdcd375..8d3e222 100644
--- a/apps/workbench/Gemfile.lock
+++ b/apps/workbench/Gemfile.lock
@@ -309,6 +309,3 @@ DEPENDENCIES
therubyracer
uglifier (>= 1.0.3)
wiselinks
-
-BUNDLED WITH
- 1.12.1
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index af74808..136d5ae 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -3,397 +3,28 @@
# Implement cwl-runner interface for submitting and running jobs on Arvados.
import argparse
-import arvados
-import arvados.collection
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.events
-import arvados.util
-import copy
-import cwltool.docker
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
-from cwltool.errors import WorkflowException
-import cwltool.main
-import cwltool.workflow
-import fnmatch
-from functools import partial
-import json
import logging
import os
-import pkg_resources # part of setuptools
-import re
import sys
import threading
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-import urlparse
-from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
-from .arvdocker import arv_docker_get_image
-
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
-from arvados.api import OrderedJsonModel
-
-logger = logging.getLogger('arvados.cwl-runner')
-logger.setLevel(logging.INFO)
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
- """Implement the cwltool FsAccess interface for Arvados Collections."""
-
- def __init__(self, basedir):
- super(CollectionFsAccess, self).__init__(basedir)
- self.collections = {}
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
- return (self.collections[pdh], "/".join(p[1:]))
- else:
- return (None, path)
-
- def _match(self, collection, patternsegments, parent):
- if not patternsegments:
- return []
-
- if not isinstance(collection, arvados.collection.RichCollectionBase):
- return []
-
- ret = []
- # iterate over the files and subcollections in 'collection'
- for filename in collection:
- if patternsegments[0] == '.':
- # Pattern contains something like "./foo" so just shift
- # past the "./"
- ret.extend(self._match(collection, patternsegments[1:], parent))
- elif fnmatch.fnmatch(filename, patternsegments[0]):
- cur = os.path.join(parent, filename)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[filename], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_collection(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.open(rest, mode)
- else:
- return open(self._abs(fn), mode)
-
- def exists(self, fn):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.exists(rest)
- else:
- return os.path.exists(self._abs(fn))
-
-
-
-class RunnerJob(object):
- """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
- self.running = False
- self.enable_reuse = enable_reuse
-
- def update_pipeline_component(self, record):
- pass
-
- def upload_docker(self, tool):
- if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
- """Create an Arvados job specification for this workflow.
-
- The returned dict can be used to create a job (i.e., passed as
- the +body+ argument to jobs().create()), or as a component in
- a pipeline template or pipeline instance.
- """
- self.upload_docker(self.tool)
-
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
-
- self.name = os.path.basename(self.tool.tool["id"])
-
- def visitFiles(files, path):
- files.add(path)
- return path
-
- document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
- def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
-
- sc = scandeps(uri, workflowobj,
- set(("$import", "run")),
- set(("$include", "$schemas", "path")),
- loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
- return {
- "script": "cwl-runner",
- "script_version": "master",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": "arvados/jobs"
- }
- }
-
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
- self.arvrunner.jobs[self.uuid] = self
-
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
-
- def done(self, record):
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- outputs = None
- try:
- try:
- outc = arvados.collection.Collection(record["output"])
- with outc.open("cwl.output.json") as f:
- outputs = json.load(f)
- def keepify(path):
- if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- adjustFiles(outputs, keepify)
- except Exception as e:
- logger.error("While getting final output object: %s", e)
- self.arvrunner.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerTemplate(object):
- """An Arvados pipeline template that invokes a CWL workflow."""
-
- type_to_dataclass = {
- 'boolean': 'boolean',
- 'File': 'File',
- 'float': 'number',
- 'int': 'number',
- 'string': 'text',
- }
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.runner = runner
- self.tool = tool
- self.job = RunnerJob(
- runner=runner,
- tool=tool,
- job_order=job_order,
- enable_reuse=enable_reuse)
-
- def pipeline_component_spec(self):
- """Return a component that Workbench and a-r-p-i will understand.
-
- Specifically, translate CWL input specs to Arvados pipeline
- format, like {"dataclass":"File","value":"xyz"}.
- """
- spec = self.job.arvados_job_spec()
-
- # Most of the component spec is exactly the same as the job
- # spec (script, script_version, etc.).
- # spec['script_parameters'] isn't right, though. A component
- # spec's script_parameters hash is a translation of
- # self.tool.tool['inputs'] with defaults/overrides taken from
- # the job order. So we move the job parameters out of the way
- # and build a new spec['script_parameters'].
- job_params = spec['script_parameters']
- spec['script_parameters'] = {}
-
- for param in self.tool.tool['inputs']:
- param = copy.deepcopy(param)
-
- # Data type and "required" flag...
- types = param['type']
- if not isinstance(types, list):
- types = [types]
- param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
- if len(non_null_types) == 1:
- the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
- if dataclass:
- param['dataclass'] = dataclass
- # Note: If we didn't figure out a single appropriate
- # dataclass, we just left that attribute out. We leave
- # the "type" attribute there in any case, which might help
- # downstream.
-
- # Title and description...
- title = param.pop('label', '')
- descr = param.pop('description', '').rstrip('\n')
- if title:
- param['title'] = title
- if descr:
- param['description'] = descr
-
- # Fill in the value from the current job order, if any.
- param_id = shortname(param.pop('id'))
- value = job_params.get(param_id)
- if value is None:
- pass
- elif not isinstance(value, dict):
- param['value'] = value
- elif param.get('dataclass') == 'File' and value.get('path'):
- param['value'] = value['path']
-
- spec['script_parameters'][param_id] = param
- spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
- return spec
-
- def save(self):
- job_spec = self.pipeline_component_spec()
- response = self.runner.api.pipeline_templates().create(body={
- "components": {
- self.job.name: job_spec,
- },
- "name": self.job.name,
- "owner_uuid": self.runner.project_uuid,
- }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
- self.uuid = response["uuid"]
- logger.info("Created template %s", self.uuid)
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
- """Convert container-local paths to and from Keep collection ids."""
-
- def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
- uploadfiles = set()
-
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, collection_pattern % src[5:])
- if "#" in src:
- src = src[:src.index("#")]
- if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
- if uploadfiles:
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
- num_retries=3,
- fnPattern=file_pattern,
- name=name,
- project=arvrunner.project_uuid)
-
- for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
-
- self.keepdir = None
-
- def reversemap(self, target):
- if target.startswith("keep:"):
- return (target, target)
- elif self.keepdir and target.startswith(self.keepdir):
- return (target, "keep:" + target[len(self.keepdir)+1:])
- else:
- return super(ArvPathMapper, self).reversemap(target)
-
+import pkg_resources # part of setuptools
-class ArvadosCommandTool(CommandLineTool):
- """Wrap cwltool CommandLineTool to override selected methods."""
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
- def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
- self.arvrunner = arvrunner
- self.crunch2 = crunch2
+import arvados
+import arvados.events
- def makeJobRunner(self):
- if self.crunch2:
- return ArvadosContainer(self.arvrunner)
- else:
- return ArvadosJob(self.arvrunner)
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvtool import ArvadosCommandTool
- def makePathMapper(self, reffiles, **kwargs):
- if self.crunch2:
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
- else:
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+from cwltool.process import shortname, UnsupportedRequirement
+from arvados.api import OrderedJsonModel
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit crunch jobs, wait for them to
@@ -475,7 +106,10 @@ class ArvCwlRunner(object):
return tmpl.uuid
if kwargs.get("submit"):
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+ if self.crunch2:
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+ else:
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
# Create pipeline for local run
@@ -510,56 +144,54 @@ class ArvCwlRunner(object):
kwargs["outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- if kwargs.get("conformance_test"):
- return cwltool.main.single_job_executor(tool, job_order, **kwargs)
+ if kwargs.get("submit"):
+ jobiter = iter((runnerjob,))
else:
- if kwargs.get("submit"):
- jobiter = iter((runnerjob,))
- else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- docker_outdir="$(task.outdir)",
- **kwargs)
-
- try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
-
- for runnable in jobiter:
- if runnable:
- runnable.run(**kwargs)
- else:
- if self.jobs:
- self.cond.wait(1)
- else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
- break
-
- while self.jobs:
- self.cond.wait(1)
-
- events.close()
- except UnsupportedRequirement:
- raise
- except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
+ if "cwl_runner_job" in kwargs:
+ self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ docker_outdir="$(task.outdir)",
+ **kwargs)
+
+ try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
+ for runnable in jobiter:
+ if runnable:
+ runnable.run(**kwargs)
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- finally:
- self.cond.release()
+ if self.jobs:
+ self.cond.wait(1)
+ else:
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ break
+
+ while self.jobs:
+ self.cond.wait(1)
+
+ events.close()
+ except UnsupportedRequirement:
+ raise
+ except:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
+ logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
- if self.final_output is None:
- raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+ if self.final_output is None:
+ raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+ return self.final_output
- return self.final_output
def versionstring():
"""Print version string of key packages for provenance and debugging."""
@@ -572,6 +204,7 @@ def versionstring():
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
+
def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
@@ -634,6 +267,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
return parser
+
def main(args, stdout, stderr, api_client=None):
parser = arg_parser()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b0e2c1f..be11404 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,10 +1,15 @@
import logging
+import json
+import os
+
+from cwltool.errors import WorkflowException
+from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+
import arvados.collection
-from cwltool.process import get_feature, adjustFiles
+
from .arvdocker import arv_docker_get_image
from . import done
-from cwltool.errors import WorkflowException
-from cwltool.process import UnsupportedRequirement
+from .runner import Runner
logger = logging.getLogger('arvados.cwl-runner')
@@ -45,7 +50,7 @@ class ArvadosContainer(object):
if self.generatefiles:
raise UnsupportedRequirement("Generate files not supported")
- vwd = arvados.collection.Collection()
+ vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
container_request["task.vwd"] = {}
for t in self.generatefiles:
if isinstance(self.generatefiles[t], dict):
@@ -124,3 +129,78 @@ class ArvadosContainer(object):
self.output_callback(outputs, processStatus)
finally:
del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerContainer(Runner):
+ """Submit and manage a container that runs arvados-cwl-runner."""
+
+ def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ """Create an Arvados job specification for this workflow.
+
+ The returned dict can be used to create a job (i.e., passed as
+ the +body+ argument to jobs().create()), or as a component in
+ a pipeline template or pipeline instance.
+ """
+
+ workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+ with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
+ with jobobj.open("cwl.input.json", "w") as f:
+ json.dump(self.job_order, f, sort_keys=True, indent=4)
+ jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ workflowname = os.path.basename(self.tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+ workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+ workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+ jobpath = "/var/lib/cwl/job/cwl.input.json"
+
+ container_image = arv_docker_get_image(self.arvrunner.api,
+ {"dockerImageId": "arvados/jobs"},
+ pull_image,
+ self.arvrunner.project_uuid)
+
+ return {
+ "command": ["arvados-cwl-runner", "--local", "--crunch2", workflowpath, jobpath],
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": self.name,
+ "output_path": "/var/spool/cwl",
+ "cwd": "/var/spool/cwl",
+ "priority": 1,
+ "state": "Committed",
+ "container_image": container_image,
+ "mounts": {
+ workflowpath: {
+ "kind": "collection",
+ "portable_data_hash": "%s" % workflowcollection
+ },
+ jobpath: {
+ "kind": "collection",
+ "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/cwl.output.json"
+ }
+ },
+ "runtime_constraints": {
+ "vcpus": 1,
+ "ram": 1024*1024*256
+ }
+ }
+
+ def run(self, *args, **kwargs):
+ job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.uuid = response["uuid"]
+ self.arvrunner.jobs[response["container_uuid"]] = self
+
+ logger.info("Submitted container %s", response["uuid"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88a8eeb..397b6d5 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -1,11 +1,19 @@
import logging
import re
-from . import done
-from .arvdocker import arv_docker_get_image
-from cwltool.process import get_feature
+import copy
+
+from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
+from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.load_tool import fetch_document
+from cwltool.builder import Builder
+
import arvados.collection
+from .arvdocker import arv_docker_get_image
+from .runner import Runner
+from . import done
+
logger = logging.getLogger('arvados.cwl-runner')
tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
@@ -164,3 +172,145 @@ class ArvadosJob(object):
self.output_callback(outputs, processStatus)
finally:
del self.arvrunner.jobs[record["uuid"]]
+
+
+class RunnerJob(Runner):
+ """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+
+ def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ """Create an Arvados job specification for this workflow.
+
+ The returned dict can be used to create a job (i.e., passed as
+ the +body+ argument to jobs().create()), or as a component in
+ a pipeline template or pipeline instance.
+ """
+
+ workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+
+ self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+ return {
+ "script": "cwl-runner",
+ "script_version": "master",
+ "repository": "arvados",
+ "script_parameters": self.job_order,
+ "runtime_constraints": {
+ "docker_image": "arvados/jobs"
+ }
+ }
+
+ def run(self, *args, **kwargs):
+ job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+
+ response = self.arvrunner.api.jobs().create(
+ body=job_spec,
+ find_or_create=self.enable_reuse
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.uuid = response["uuid"]
+ self.arvrunner.jobs[self.uuid] = self
+
+ logger.info("Submitted job %s", response["uuid"])
+
+ if kwargs.get("submit"):
+ self.pipeline = self.arvrunner.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": shortname(self.tool.tool["id"]),
+ "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
+ "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+
+
+class RunnerTemplate(object):
+ """An Arvados pipeline template that invokes a CWL workflow."""
+
+ type_to_dataclass = {
+ 'boolean': 'boolean',
+ 'File': 'File',
+ 'float': 'number',
+ 'int': 'number',
+ 'string': 'text',
+ }
+
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.runner = runner
+ self.tool = tool
+ self.job = RunnerJob(
+ runner=runner,
+ tool=tool,
+ job_order=job_order,
+ enable_reuse=enable_reuse)
+
+ def pipeline_component_spec(self):
+ """Return a component that Workbench and a-r-p-i will understand.
+
+ Specifically, translate CWL input specs to Arvados pipeline
+ format, like {"dataclass":"File","value":"xyz"}.
+ """
+ spec = self.job.arvados_job_spec()
+
+ # Most of the component spec is exactly the same as the job
+ # spec (script, script_version, etc.).
+ # spec['script_parameters'] isn't right, though. A component
+ # spec's script_parameters hash is a translation of
+ # self.tool.tool['inputs'] with defaults/overrides taken from
+ # the job order. So we move the job parameters out of the way
+ # and build a new spec['script_parameters'].
+ job_params = spec['script_parameters']
+ spec['script_parameters'] = {}
+
+ for param in self.tool.tool['inputs']:
+ param = copy.deepcopy(param)
+
+ # Data type and "required" flag...
+ types = param['type']
+ if not isinstance(types, list):
+ types = [types]
+ param['required'] = 'null' not in types
+ non_null_types = set(types) - set(['null'])
+ if len(non_null_types) == 1:
+ the_type = [c for c in non_null_types][0]
+ dataclass = self.type_to_dataclass.get(the_type)
+ if dataclass:
+ param['dataclass'] = dataclass
+ # Note: If we didn't figure out a single appropriate
+ # dataclass, we just left that attribute out. We leave
+ # the "type" attribute there in any case, which might help
+ # downstream.
+
+ # Title and description...
+ title = param.pop('label', '')
+ descr = param.pop('description', '').rstrip('\n')
+ if title:
+ param['title'] = title
+ if descr:
+ param['description'] = descr
+
+ # Fill in the value from the current job order, if any.
+ param_id = shortname(param.pop('id'))
+ value = job_params.get(param_id)
+ if value is None:
+ pass
+ elif not isinstance(value, dict):
+ param['value'] = value
+ elif param.get('dataclass') == 'File' and value.get('path'):
+ param['value'] = value['path']
+
+ spec['script_parameters'][param_id] = param
+ spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
+ return spec
+
+ def save(self):
+ job_spec = self.pipeline_component_spec()
+ response = self.runner.api.pipeline_templates().create(body={
+ "components": {
+ self.job.name: job_spec,
+ },
+ "name": self.job.name,
+ "owner_uuid": self.runner.project_uuid,
+ }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
+ self.uuid = response["uuid"]
+ logger.info("Created template %s", self.uuid)
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
new file mode 100644
index 0000000..a2c5c9e
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -0,0 +1,30 @@
+from cwltool.draft2tool import CommandLineTool
+from .arvjob import ArvadosJob
+from .arvcontainer import ArvadosContainer
+from .pathmapper import ArvPathMapper
+
+class ArvadosCommandTool(CommandLineTool):
+ """Wrap cwltool CommandLineTool to override selected methods."""
+
+ def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ self.arvrunner = arvrunner
+ self.crunch2 = crunch2
+
+ def makeJobRunner(self):
+ if self.crunch2:
+ return ArvadosContainer(self.arvrunner)
+ else:
+ return ArvadosJob(self.arvrunner)
+
+ def makePathMapper(self, reffiles, **kwargs):
+ if self.crunch2:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s",
+ "/keep/%s/%s",
+ **kwargs)
+ else:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
new file mode 100644
index 0000000..c911895
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -0,0 +1,59 @@
+import fnmatch
+
+class CollectionFsAccess(cwltool.process.StdFsAccess):
+ """Implement the cwltool FsAccess interface for Arvados Collections."""
+
+ def __init__(self, basedir):
+ super(CollectionFsAccess, self).__init__(basedir)
+ self.collections = {}
+
+ def get_collection(self, path):
+ p = path.split("/")
+ if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
+ pdh = p[0][5:]
+ if pdh not in self.collections:
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+ return (self.collections[pdh], "/".join(p[1:]))
+ else:
+ return (None, path)
+
+ def _match(self, collection, patternsegments, parent):
+ if not patternsegments:
+ return []
+
+ if not isinstance(collection, arvados.collection.RichCollectionBase):
+ return []
+
+ ret = []
+ # iterate over the files and subcollections in 'collection'
+ for filename in collection:
+ if patternsegments[0] == '.':
+ # Pattern contains something like "./foo" so just shift
+ # past the "./"
+ ret.extend(self._match(collection, patternsegments[1:], parent))
+ elif fnmatch.fnmatch(filename, patternsegments[0]):
+ cur = os.path.join(parent, filename)
+ if len(patternsegments) == 1:
+ ret.append(cur)
+ else:
+ ret.extend(self._match(collection[filename], patternsegments[1:], cur))
+ return ret
+
+ def glob(self, pattern):
+ collection, rest = self.get_collection(pattern)
+ patternsegments = rest.split("/")
+ return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
+
+ def open(self, fn, mode):
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return collection.open(rest, mode)
+ else:
+ return open(self._abs(fn), mode)
+
+ def exists(self, fn):
+ collection, rest = self.get_collection(fn)
+ if collection:
+ return collection.exists(rest)
+ else:
+ return os.path.exists(self._abs(fn))
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
new file mode 100644
index 0000000..9538a91
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -0,0 +1,55 @@
+import re
+
+import arvados.commands.run
+import arvados.collection
+import cwltool.pathmapper
+
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ """Convert container-local paths to and from Keep collection ids."""
+
+ def __init__(self, arvrunner, referenced_files, input_basedir,
+ collection_pattern, file_pattern, name=None, **kwargs):
+ self._pathmap = arvrunner.get_uploaded()
+ uploadfiles = set()
+
+ pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+
+ for src in referenced_files:
+ if isinstance(src, basestring) and pdh_path.match(src):
+ self._pathmap[src] = (src, collection_pattern % src[5:])
+ if "#" in src:
+ src = src[:src.index("#")]
+ if src not in self._pathmap:
+ ab = cwltool.pathmapper.abspath(src, input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
+ if kwargs.get("conformance_test"):
+ self._pathmap[src] = (src, ab)
+ elif isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.add((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = (ab, st.fn)
+ else:
+ raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+
+ if uploadfiles:
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
+ arvrunner.api,
+ dry_run=kwargs.get("dry_run"),
+ num_retries=3,
+ fnPattern=file_pattern,
+ name=name,
+ project=arvrunner.project_uuid)
+
+ for src, ab, st in uploadfiles:
+ arvrunner.add_uploaded(src, (ab, st.fn))
+ self._pathmap[src] = (ab, st.fn)
+
+ self.keepdir = None
+
+ def reversemap(self, target):
+ if target.startswith("keep:"):
+ return (target, target)
+ elif self.keepdir and target.startswith(self.keepdir):
+ return (target, "keep:" + target[len(self.keepdir)+1:])
+ else:
+ return super(ArvPathMapper, self).reversemap(target)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
new file mode 100644
index 0000000..0cc23ab
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -0,0 +1,98 @@
+import os
+import urlparse
+from functools import partial
+
+from cwltool.draft2tool import CommandLineTool
+import cwltool.workflow
+from cwltool.process import get_feature, scandeps, adjustFiles
+from cwltool.load_tool import fetch_document
+
+from .arvdocker import arv_docker_get_image
+from .pathmapper import ArvPathMapper
+
+class Runner(object):
+ def __init__(self, runner, tool, job_order, enable_reuse):
+ self.arvrunner = runner
+ self.tool = tool
+ self.job_order = job_order
+ self.running = False
+ self.enable_reuse = enable_reuse
+
+ def update_pipeline_component(self, record):
+ pass
+
+ def upload_docker(self, tool):
+ if isinstance(tool, CommandLineTool):
+ (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ if docker_req:
+ arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+ elif isinstance(tool, cwltool.workflow.Workflow):
+ for s in tool.steps:
+ self.upload_docker(s.embedded_tool)
+
+
+ def arvados_job_spec(self, *args, **kwargs):
+ self.upload_docker(self.tool)
+
+ workflowfiles = set()
+ jobfiles = set()
+ workflowfiles.add(self.tool.tool["id"])
+
+ self.name = os.path.basename(self.tool.tool["id"])
+
+ def visitFiles(files, path):
+ files.add(path)
+ return path
+
+ document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+ def loadref(b, u):
+ return document_loader.fetch(urlparse.urljoin(b, u))
+
+ sc = scandeps(uri, workflowobj,
+ set(("$import", "run")),
+ set(("$include", "$schemas", "path")),
+ loadref)
+ adjustFiles(sc, partial(visitFiles, workflowfiles))
+ adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+
+ workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+ "%s",
+ "%s/%s",
+ name=self.name,
+ **kwargs)
+
+ jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+ "%s",
+ "%s/%s",
+ name=os.path.basename(self.job_order.get("id", "#")),
+ **kwargs)
+
+ adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+ if "id" in self.job_order:
+ del self.job_order["id"]
+
+ return workflowmapper
+
+
+ def done(self, record):
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = None
+ try:
+ try:
+ outc = arvados.collection.Collection(record["output"])
+ with outc.open("cwl.output.json") as f:
+ outputs = json.load(f)
+ def keepify(path):
+ if not path.startswith("keep:"):
+ return "keep:%s/%s" % (record["output"], path)
+ adjustFiles(outputs, keepify)
+ except Exception as e:
+ logger.error("While getting final output object: %s", e)
+ self.arvrunner.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_container.py
similarity index 64%
copy from sdk/cwl/tests/test_job.py
copy to sdk/cwl/tests/test_container.py
index dba65b0..0237e80 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_container.py
@@ -10,14 +10,20 @@ if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-class TestJob(unittest.TestCase):
+class TestContainer(unittest.TestCase):
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- def test_run(self):
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_run(self, keepdocker):
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
tool = {
@@ -25,46 +31,44 @@ class TestJob(unittest.TestCase):
"outputs": [],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names, basedir="")
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run"):
j.run()
- runner.api.jobs().create.assert_called_with(
+ runner.api.container_requests().create.assert_called_with(
body={
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls']
- }],
+ 'environment': {
+ 'TMPDIR': '/tmp'
},
- 'script_version': 'master',
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
+ 'name': 'test_run',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 1,
- 'min_ram_mb_per_node': 1024,
- 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
- }
- },
- find_or_create=True,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']]
- )
+ 'vcpus': 1,
+ 'ram': 1073741824
+ }, 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls'],
+ 'cwd': '/var/spool/cwl'
+ })
# The test passes some fields in builder.resources
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- def test_resource_requirements(self):
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_resource_requirements(self, keepdocker):
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("draft-3")
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
tool = {
"inputs": [],
"outputs": [],
@@ -76,36 +80,31 @@ class TestJob(unittest.TestCase):
}],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, True, avsc_names=avsc_names)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir=""):
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements"):
j.run()
- runner.api.jobs().create.assert_called_with(
+
+ runner.api.container_requests().create.assert_called_with(
body={
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls']
- }]
- },
- 'script_version': 'master',
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
+ 'environment': {
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_resource_requirements',
'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 3,
- 'min_ram_mb_per_node': 3000,
- 'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
- }
- },
- find_or_create=True,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']])
+ 'vcpus': 3,
+ 'ram': 3145728000
+ }, 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls'],
+ 'cwd': '/var/spool/cwl'
+ })
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
@@ -121,7 +120,7 @@ class TestJob(unittest.TestCase):
api.collections().list().execute.side_effect = ({"items": []},
{"items": [{"manifest_text": "XYZ"}]})
- arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob = arvados_cwl.ArvadosContainer(runner)
arvjob.name = "testjob"
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
@@ -163,7 +162,7 @@ class TestJob(unittest.TestCase):
col().open.return_value = []
api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
- arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob = arvados_cwl.ArvadosContainer(runner)
arvjob.name = "testjob"
arvjob.builder = mock.MagicMock()
arvjob.output_callback = mock.MagicMock()
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index dba65b0..701afcb 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -25,7 +25,7 @@ class TestJob(unittest.TestCase):
"outputs": [],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names, basedir="")
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names, basedir="")
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir=""):
j.run()
@@ -76,7 +76,7 @@ class TestJob(unittest.TestCase):
}],
"baseCommand": "ls"
}
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, avsc_names=avsc_names)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, False, avsc_names=avsc_names)
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir=""):
j.run()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 38741eb..48e6ed2 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -30,7 +30,7 @@ def stubs(func):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
stubs.KeepClient().put.side_effect = putstub
- stubs.keepdocker.return_value = True
+ stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
stubs.api = mock.MagicMock()
@@ -44,12 +44,28 @@ def stubs(func):
}, {
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"portable_data_hash": "99999999999999999999999999999992+99",
+ },
+ {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ""
})
+ stubs.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
stubs.api.jobs().create().execute.return_value = {
"uuid": stubs.expect_job_uuid,
"state": "Queued",
}
+
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+ stubs.api.container_requests().create().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
stubs.api.pipeline_templates().create().execute.return_value = {
"uuid": stubs.expect_pipeline_template_uuid,
@@ -70,6 +86,35 @@ def stubs(func):
'script_version': 'master',
'script': 'cwl-runner'
}
+
+ stubs.expect_container_spec = {
+ 'priority': 1,
+ 'mounts': {
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/workflow/submit_wf.cwl': {
+ 'portable_data_hash': '999999999999999999999999991+99',
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/job/cwl.input.json': {
+ 'portable_data_hash': '102435082199e5229f99b01165b67096+60/cwl.input.json',
+ 'kind': 'collection'
+ }
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'command': ['arvados-cwl-runner', '--local', '--crunch2', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+ 'name': 'submit_wf.cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'output_path': '/var/spool/cwl',
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 268435456
+ }
+ }
return func(self, stubs, *args, **kwargs)
return wrapped
@@ -128,6 +173,41 @@ class TestSubmit(unittest.TestCase):
body=expect_body,
find_or_create=True)
+ @stubs
+ def test_submit_container(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--crunch2", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.collections().create.assert_has_calls([
+ mock.call(),
+ mock.call(body={
+ 'manifest_text':
+ './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
+ '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
+ 'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': 'submit_wf.cwl',
+ }, ensure_unique_name=True),
+ mock.call().execute(),
+ mock.call(body={
+ 'manifest_text':
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': '#',
+ }, ensure_unique_name=True),
+ mock.call().execute()])
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
class TestCreateTemplate(unittest.TestCase):
@stubs
diff --git a/services/api/Gemfile.lock b/services/api/Gemfile.lock
index 7be4e0f..3715718 100644
--- a/services/api/Gemfile.lock
+++ b/services/api/Gemfile.lock
@@ -257,6 +257,3 @@ DEPENDENCIES
therubyracer
trollop
uglifier (>= 1.0.3)
-
-BUNDLED WITH
- 1.12.1
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list