[ARVADOS] created: 1.3.0-1453-gb93cb0b82
Git user
git at public.curoverse.com
Tue Aug 6 14:31:01 UTC 2019
at b93cb0b828989f9e2651ba436fb8a267224e4ca8 (commit)
commit b93cb0b828989f9e2651ba436fb8a267224e4ca8
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Aug 6 10:30:33 2019 -0400
15181: Remove jobs API support from arvados-cwl-runner
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 1f8edb70d..088609fd7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -4,7 +4,7 @@
# SPDX-License-Identifier: Apache-2.0
# Implement cwl-runner interface for submitting and running work on Arvados, using
-# either the Crunch jobs API or Crunch containers API.
+# the Crunch containers API.
from future.utils import viewitems
from builtins import str
@@ -39,7 +39,6 @@ from .executor import ArvCwlExecutor
# These arn't used directly in this file but
# other code expects to import them from here
from .arvcontainer import ArvadosContainer
-from .arvjob import ArvadosJob
from .arvtool import ArvadosCommandTool
from .fsaccess import CollectionFsAccess, CollectionCache, CollectionFetcher
from .util import get_current_container
@@ -102,27 +101,27 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=True, dest="enable_reuse",
help="Disable job or container reuse")
- parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--project-uuid", metavar="UUID", help="Project that will own the workflow containers, if not provided, will go to home project.")
parser.add_argument("--output-name", help="Name to use for collection that stores the final output.", default=None)
parser.add_argument("--output-tags", help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
- help="Ignore Docker image version when deciding whether to reuse past jobs.",
+ help="Ignore Docker image version when deciding whether to reuse past containers.",
default=False)
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
default=True, dest="submit")
- exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+ exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits containers to Arvados).",
default=True, dest="submit")
exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
dest="create_workflow")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
- exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Register an Arvados workflow that can be run from Workbench")
+ exgroup.add_argument("--update-workflow", metavar="UUID", help="Update an existing Arvados workflow with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+ exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner, wait for completion.",
default=True, dest="wait")
- exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+ exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner and exit.",
default=True, dest="wait")
exgroup = parser.add_mutually_exclusive_group()
@@ -133,8 +132,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
parser.add_argument("--api",
default=None, dest="work_api",
- choices=("jobs", "containers"),
- help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
+ choices=("containers",),
+ help="Select work submission API. Only supports 'containers'")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
@@ -265,8 +264,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
- elif arvargs.update_workflow.find('-p5p6p-') == 5:
- want_api = 'jobs'
else:
want_api = None
if want_api and arvargs.work_api and want_api != arvargs.work_api:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
deleted file mode 100644
index 11efc0c1c..000000000
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ /dev/null
@@ -1,495 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from past.builtins import basestring
-from builtins import object
-from future.utils import viewitems
-
-import logging
-import re
-import copy
-import json
-import time
-
-from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.errors import WorkflowException
-from cwltool.command_line_tool import revmap_file, CommandLineTool
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.job import JobBase
-
-from schema_salad.sourceline import SourceLine
-
-import arvados_cwl.util
-import ruamel.yaml as yaml
-
-import arvados.collection
-from arvados.errors import ApiError
-
-from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, upload_workflow_collection, trim_anonymous_location, remove_redundant_fields
-from .pathmapper import VwdPathMapper, trim_listing
-from .perf import Perf
-from . import done
-from ._version import __version__
-from .util import get_intermediate_collection_info
-
-logger = logging.getLogger('arvados.cwl-runner')
-metrics = logging.getLogger('arvados.cwl-runner.metrics')
-
-crunchrunner_re = re.compile(r"^.*crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)=(.*)$")
-
-crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
-
-class ArvadosJob(JobBase):
- """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
- def __init__(self, runner,
- builder, # type: Builder
- joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
- make_path_mapper, # type: Callable[..., PathMapper]
- requirements, # type: List[Dict[Text, Text]]
- hints, # type: List[Dict[Text, Text]]
- name # type: Text
- ):
- super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
- self.arvrunner = runner
- self.running = False
- self.uuid = None
-
- def run(self, runtimeContext):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- with Perf(metrics, "generatefiles %s" % self.name):
- if self.generatefiles["listing"]:
- vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- script_parameters["task.vwd"] = {}
- generatemapper = VwdPathMapper(self.generatefiles["listing"], "", "",
- separateDirs=False)
-
- with Perf(metrics, "createfiles %s" % self.name):
- for f, p in generatemapper.items():
- if p.type == "CreateFile":
- with vwd.open(p.target, "w") as n:
- n.write(p.resolved.encode("utf-8"))
-
- if vwd:
- with Perf(metrics, "generatefiles.save_new %s" % self.name):
- info = get_intermediate_collection_info(self.name, None, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
-
- for f, p in generatemapper.items():
- if p.type == "File":
- script_parameters["task.vwd"][p.target] = p.resolved
- if p.type == "CreateFile":
- script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
-
- script_parameters["task.env"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.stdin
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- if self.stderr:
- script_parameters["task.stderr"] = self.stderr
-
- if self.successCodes:
- script_parameters["task.successCodes"] = self.successCodes
- if self.temporaryFailCodes:
- script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
- if self.permanentFailCodes:
- script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
-
- with Perf(metrics, "arv_docker_get_image %s" % self.name):
- (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if docker_req and runtimeContext.use_container is not False:
- if docker_req.get("dockerOutputDirectory"):
- raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api,
- docker_req,
- runtimeContext.pull_image,
- self.arvrunner.project_uuid)
- else:
- runtime_constraints["docker_image"] = "arvados/jobs"
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
- if runtime_req:
- if "keep_cache" in runtime_req:
- runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
- runtime_constraints["min_ram_mb_per_node"] += runtime_req["keep_cache"]
- if "outputDirType" in runtime_req:
- if runtime_req["outputDirType"] == "local_output_dir":
- script_parameters["task.keepTmpOutput"] = False
- elif runtime_req["outputDirType"] == "keep_output_dir":
- script_parameters["task.keepTmpOutput"] = True
-
- filters = [["repository", "=", "arvados"],
- ["script", "=", "crunchrunner"],
- ["script_version", "in git", crunchrunner_git_commit]]
- if not self.arvrunner.ignore_docker_for_reuse:
- filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
- enable_reuse = runtimeContext.enable_reuse
- if enable_reuse:
- reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
- if reuse_req:
- enable_reuse = reuse_req["enableReuse"]
-
- self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
-
- try:
- with Perf(metrics, "create %s" % self.name):
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": crunchrunner_git_commit,
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
- self.arvrunner.process_submitted(self)
-
- self.update_pipeline_component(response)
-
- if response["state"] == "Complete":
- logger.info("%s reused job %s", self.arvrunner.label(self), response["uuid"])
- # Give read permission to the desired project on reused jobs
- if response["owner_uuid"] != self.arvrunner.project_uuid:
- try:
- self.arvrunner.api.links().create(body={
- 'link_class': 'permission',
- 'name': 'can_read',
- 'tail_uuid': self.arvrunner.project_uuid,
- 'head_uuid': response["uuid"],
- }).execute(num_retries=self.arvrunner.num_retries)
- except ApiError as e:
- # The user might not have "manage" access on the job: log
- # a message and continue.
- logger.info("Creating read permission on job %s: %s",
- response["uuid"],
- e)
- else:
- logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
- except Exception:
- logger.exception("%s error" % (self.arvrunner.label(self)))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- with self.arvrunner.workflow_eval_lock:
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- with Perf(metrics, "update_pipeline_component %s" % self.name):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
- uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(
- uuid=self.arvrunner.uuid,
- body={
- "components": components
- }).execute(num_retries=self.arvrunner.num_retries)
- except Exception:
- logger.exception("Error adding to components")
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- # we don't have the real exit code so fake it.
- record["exit_code"] = 0
- else:
- processStatus = "permanentFail"
- record["exit_code"] = 1
-
- outputs = {}
- try:
- if record["output"]:
- with Perf(metrics, "inspect log %s" % self.name):
- logc = arvados.collection.CollectionReader(record["log"],
- api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- log = logc.open(list(logc.keys())[0])
- dirs = {
- "tmpdir": "/tmpdir",
- "outdir": "/outdir",
- "keep": "/keep"
- }
- for l in log:
- # Determine the tmpdir, outdir and keep paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
- g = crunchrunner_re.match(l)
- if g:
- dirs[g.group(1)] = g.group(2)
-
- if processStatus == "permanentFail":
- done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
-
- with Perf(metrics, "output collection %s" % self.name):
- outputs = done.done(self, record, dirs["tmpdir"],
- dirs["outdir"], dirs["keep"])
- except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
- logger.error("%s unable to collect output from %s:\n%s",
- self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception:
- logger.exception("Got unknown exception while collecting output for job %s:", self.name)
- processStatus = "permanentFail"
-
- # Note: Currently, on error output_callback is expecting an empty dict,
- # anything else will fail.
- if not isinstance(outputs, dict):
- logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
- outputs = {}
- processStatus = "permanentFail"
- finally:
- self.output_callback(outputs, processStatus)
-
-
-class RunnerJob(Runner):
- """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
- def arvados_job_spec(self, debug=False):
- """Create an Arvados job specification for this workflow.
-
- The returned dict can be used to create a job (i.e., passed as
- the +body+ argument to jobs().create()), or as a component in
- a pipeline template or pipeline instance.
- """
-
- if self.embedded_tool.tool["id"].startswith("keep:"):
- self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
- else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
- wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
- self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
-
- adjustDirObjs(self.job_order, trim_listing)
- visit_class(self.job_order, ("File", "Directory"), trim_anonymous_location)
- visit_class(self.job_order, ("File", "Directory"), remove_redundant_fields)
-
- if self.output_name:
- self.job_order["arv:output_name"] = self.output_name
-
- if self.output_tags:
- self.job_order["arv:output_tags"] = self.output_tags
-
- self.job_order["arv:enable_reuse"] = self.enable_reuse
-
- if self.on_error:
- self.job_order["arv:on_error"] = self.on_error
-
- if debug:
- self.job_order["arv:debug"] = True
-
- return {
- "script": "cwl-runner",
- "script_version": "master",
- "minimum_script_version": "570509ab4d2ef93d870fd2b1f2eab178afb1bad9",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
- "min_ram_mb_per_node": self.submit_runner_ram
- }
- }
-
- def run(self, runtimeContext):
- job_spec = self.arvados_job_spec(runtimeContext.debug)
-
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- job = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- for k,v in viewitems(job_spec["script_parameters"]):
- if v is False or v is None or isinstance(v, dict):
- job_spec["script_parameters"][k] = {"value": v}
-
- del job_spec["owner_uuid"]
- job_spec["job"] = job
-
- instance_spec = {
- "owner_uuid": self.arvrunner.project_uuid,
- "name": self.name,
- "components": {
- "cwl-runner": job_spec,
- },
- "state": "RunningOnServer",
- }
- if not self.enable_reuse:
- instance_spec["properties"] = {"run_options": {"enable_job_reuse": False}}
-
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
- body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
- logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
-
- if runtimeContext.wait is False:
- self.uuid = self.arvrunner.pipeline["uuid"]
- return
-
- self.uuid = job["uuid"]
- self.arvrunner.process_submitted(self)
-
-
-class RunnerTemplate(object):
- """An Arvados pipeline template that invokes a CWL workflow."""
-
- type_to_dataclass = {
- 'boolean': 'boolean',
- 'File': 'File',
- 'Directory': 'Collection',
- 'float': 'number',
- 'int': 'number',
- 'string': 'text',
- }
-
- def __init__(self, runner, tool, job_order, enable_reuse, uuid,
- submit_runner_ram=0, name=None, merged_map=None,
- loadingContext=None):
- self.runner = runner
- self.embedded_tool = tool
- self.job = RunnerJob(
- runner=runner,
- tool=tool,
- enable_reuse=enable_reuse,
- output_name=None,
- output_tags=None,
- submit_runner_ram=submit_runner_ram,
- name=name,
- merged_map=merged_map,
- loadingContext=loadingContext)
- self.job.job_order = job_order
- self.uuid = uuid
-
- def pipeline_component_spec(self):
- """Return a component that Workbench and a-r-p-i will understand.
-
- Specifically, translate CWL input specs to Arvados pipeline
- format, like {"dataclass":"File","value":"xyz"}.
- """
-
- spec = self.job.arvados_job_spec()
-
- # Most of the component spec is exactly the same as the job
- # spec (script, script_version, etc.).
- # spec['script_parameters'] isn't right, though. A component
- # spec's script_parameters hash is a translation of
- # self.tool.tool['inputs'] with defaults/overrides taken from
- # the job order. So we move the job parameters out of the way
- # and build a new spec['script_parameters'].
- job_params = spec['script_parameters']
- spec['script_parameters'] = {}
-
- for param in self.embedded_tool.tool['inputs']:
- param = copy.deepcopy(param)
-
- # Data type and "required" flag...
- types = param['type']
- if not isinstance(types, list):
- types = [types]
- param['required'] = 'null' not in types
- non_null_types = [t for t in types if t != "null"]
- if len(non_null_types) == 1:
- the_type = [c for c in non_null_types][0]
- dataclass = None
- if isinstance(the_type, basestring):
- dataclass = self.type_to_dataclass.get(the_type)
- if dataclass:
- param['dataclass'] = dataclass
- # Note: If we didn't figure out a single appropriate
- # dataclass, we just left that attribute out. We leave
- # the "type" attribute there in any case, which might help
- # downstream.
-
- # Title and description...
- title = param.pop('label', '')
- descr = param.pop('doc', '').rstrip('\n')
- if title:
- param['title'] = title
- if descr:
- param['description'] = descr
-
- # Fill in the value from the current job order, if any.
- param_id = shortname(param.pop('id'))
- value = job_params.get(param_id)
- if value is None:
- pass
- elif not isinstance(value, dict):
- param['value'] = value
- elif param.get('dataclass') in ('File', 'Collection') and value.get('location'):
- param['value'] = value['location'][5:]
-
- spec['script_parameters'][param_id] = param
- spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
- return spec
-
- def save(self):
- body = {
- "components": {
- self.job.name: self.pipeline_component_spec(),
- },
- "name": self.job.name,
- }
- if self.runner.project_uuid:
- body["owner_uuid"] = self.runner.project_uuid
- if self.uuid:
- self.runner.api.pipeline_templates().update(
- uuid=self.uuid, body=body).execute(
- num_retries=self.runner.num_retries)
- logger.info("Updated template %s", self.uuid)
- else:
- self.uuid = self.runner.api.pipeline_templates().create(
- body=body, ensure_unique_name=True).execute(
- num_retries=self.runner.num_retries)['uuid']
- logger.info("Created template %s", self.uuid)
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index 4fc02a016..704edaccb 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -3,7 +3,6 @@
# SPDX-License-Identifier: Apache-2.0
from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from .runner import make_builder
@@ -48,8 +47,6 @@ class ArvadosCommandTool(CommandLineTool):
def make_job_runner(self, runtimeContext):
if runtimeContext.work_api == "containers":
return partial(ArvadosContainer, self.arvrunner, runtimeContext)
- elif runtimeContext.work_api == "jobs":
- return partial(ArvadosJob, self.arvrunner)
else:
raise Exception("Unsupported work_api %s", runtimeContext.work_api)
@@ -58,10 +55,6 @@ class ArvadosCommandTool(CommandLineTool):
return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
"/keep/%s/%s")
- elif runtimeContext.work_api == "jobs":
- return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s")
def job(self, joborder, output_callback, runtimeContext):
builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
@@ -75,11 +68,6 @@ class ArvadosCommandTool(CommandLineTool):
else:
runtimeContext.outdir = "/var/spool/cwl"
runtimeContext.docker_outdir = "/var/spool/cwl"
- elif runtimeContext.work_api == "jobs":
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
- runtimeContext.docker_tmpdir = "$(task.tmpdir)"
return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
class ArvadosExpressionTool(ExpressionTool):
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
deleted file mode 100644
index c886550d4..000000000
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# Crunch script integration for running arvados-cwl-runner (importing
-# arvados_cwl module) inside a crunch job.
-#
-# This gets the job record, transforms the script parameters into a valid CWL
-# input object, then executes the CWL runner to run the underlying workflow or
-# tool. When the workflow completes, record the output object in an output
-# collection for this runner job.
-
-from past.builtins import basestring
-from future.utils import viewitems
-
-import arvados
-import arvados_cwl
-import arvados.collection
-import arvados.util
-import cwltool.main
-import logging
-import os
-import json
-import argparse
-import re
-import functools
-
-from arvados.api import OrderedJsonModel
-from cwltool.process import shortname
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
-from cwltool.load_tool import load_tool
-from cwltool.errors import WorkflowException
-from arvados_cwl.context import ArvRuntimeContext
-
-from .fsaccess import CollectionFetcher, CollectionFsAccess
-
-logger = logging.getLogger('arvados.cwl-runner')
-
-def run():
- # Timestamps are added by crunch-job, so don't print redundant timestamps.
- arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
-
- # Print package versions
- logger.info(arvados_cwl.versionstring())
-
- api = arvados.api("v1")
-
- arvados_cwl.add_arv_hints()
-
- runner = None
- try:
- job_order_object = arvados.current_job()['script_parameters']
- toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
-
- pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
-
- def keeppath(v):
- if pdh_path.match(v):
- return "keep:%s" % v
- else:
- return v
-
- def keeppathObj(v):
- if "location" in v:
- v["location"] = keeppath(v["location"])
-
- for k,v in viewitems(job_order_object):
- if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
- job_order_object[k] = {
- "class": "File",
- "location": "keep:%s" % v
- }
-
- adjustFileObjs(job_order_object, keeppathObj)
- adjustDirObjs(job_order_object, keeppathObj)
- normalizeFilesDirs(job_order_object)
-
- output_name = None
- output_tags = None
- enable_reuse = True
- on_error = "continue"
- debug = False
-
- if "arv:output_name" in job_order_object:
- output_name = job_order_object["arv:output_name"]
- del job_order_object["arv:output_name"]
-
- if "arv:output_tags" in job_order_object:
- output_tags = job_order_object["arv:output_tags"]
- del job_order_object["arv:output_tags"]
-
- if "arv:enable_reuse" in job_order_object:
- enable_reuse = job_order_object["arv:enable_reuse"]
- del job_order_object["arv:enable_reuse"]
-
- if "arv:on_error" in job_order_object:
- on_error = job_order_object["arv:on_error"]
- del job_order_object["arv:on_error"]
-
- if "arv:debug" in job_order_object:
- debug = job_order_object["arv:debug"]
- del job_order_object["arv:debug"]
-
- arvargs = argparse.Namespace()
- arvargs.work_api = "jobs"
- arvargs.output_name = output_name
- arvargs.output_tags = output_tags
- arvargs.thread_count = 1
- arvargs.collection_cache_size = None
-
- runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
- api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
- arvargs=arvargs)
-
- make_fs_access = functools.partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
-
- t = load_tool(toolpath, runner.loadingContext)
-
- if debug:
- logger.setLevel(logging.DEBUG)
- logging.getLogger('arvados').setLevel(logging.DEBUG)
- logging.getLogger("cwltool").setLevel(logging.DEBUG)
-
- args = ArvRuntimeContext(vars(arvargs))
- args.project_uuid = arvados.current_job()["owner_uuid"]
- args.enable_reuse = enable_reuse
- args.on_error = on_error
- args.submit = False
- args.debug = debug
- args.quiet = False
- args.ignore_docker_for_reuse = False
- args.basedir = os.getcwd()
- args.name = None
- args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
- args.make_fs_access = make_fs_access
- args.trash_intermediate = False
- args.intermediate_output_ttl = 0
- args.priority = arvados_cwl.DEFAULT_PRIORITY
- args.do_validate = True
- args.disable_js_validation = False
- args.tmp_outdir_prefix = "tmp"
-
- runner.arv_executor(t, job_order_object, args, logger=logger)
- except Exception as e:
- if isinstance(e, WorkflowException):
- logging.info("Workflow error %s", e)
- else:
- logging.exception("Unhandled exception")
- if runner and runner.final_output_collection:
- outputCollection = runner.final_output_collection.portable_data_hash()
- else:
- outputCollection = None
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': outputCollection,
- 'success': False,
- 'progress':1.0
- }).execute()
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 9a94095ae..97e6b736a 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -31,7 +31,6 @@ from arvados.errors import ApiError
import arvados_cwl.util
from .arvcontainer import RunnerContainer
-from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
@@ -91,8 +90,8 @@ class RuntimeStatusLoggingHandler(logging.Handler):
class ArvCwlExecutor(object):
- """Execute a CWL tool or workflow, submit work (using either jobs or
- containers API), wait for them to complete, and report output.
+ """Execute a CWL tool or workflow, submit work (using containers API),
+ wait for them to complete, and report output.
"""
@@ -154,7 +153,7 @@ class ArvCwlExecutor(object):
num_retries=self.num_retries)
self.work_api = None
- expected_api = ["containers", "jobs"]
+ expected_api = ["containers"]
for api in expected_api:
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
@@ -172,19 +171,11 @@ class ArvCwlExecutor(object):
raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
if self.work_api == "jobs":
- logger.warning("""
+ logger.error("""
*******************************
-Using the deprecated 'jobs' API.
-
-To get rid of this warning:
-
-Users: read about migrating at
-http://doc.arvados.org/user/cwl/cwl-style.html#migrate
-and use the option --api=containers
-
-Admins: configure the cluster to disable the 'jobs' API as described at:
-http://doc.arvados.org/install/install-api-server.html#disable_api_methods
+The 'jobs' API is no longer supported.
*******************************""")
+ exit(1)
self.loadingContext = ArvLoadingContext(vars(arvargs))
self.loadingContext.fetcher_constructor = self.fetcher_constructor
@@ -339,7 +330,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
return "[%s %s]" % (self.work_api[0:-1], obj.name)
def poll_states(self):
- """Poll status of jobs or containers listed in the processes dict.
+ """Poll status of containers listed in the processes dict.
Runs in a separate thread.
"""
@@ -360,8 +351,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
@@ -522,13 +511,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
except Exception:
logger.exception("Setting container output")
return
- elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
- self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- 'success': self.final_status == "success",
- 'progress':1.0
- }).execute(num_retries=self.num_retries)
def apply_reqs(self, job_order_object, tool):
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
@@ -604,18 +586,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
existing_uuid = runtimeContext.update_workflow
if existing_uuid or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
- if self.work_api == "jobs":
- tmpl = RunnerTemplate(self, tool, job_order,
- runtimeContext.enable_reuse,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- loadingContext=loadingContext)
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return (tmpl.uuid, "success")
- elif self.work_api == "containers":
+ if self.work_api == "containers":
return (upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
@@ -641,12 +612,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
runtimeContext.docker_outdir = "/var/spool/cwl"
runtimeContext.tmpdir = "/tmp"
runtimeContext.docker_tmpdir = "/tmp"
- elif self.work_api == "jobs":
- if runtimeContext.priority != DEFAULT_PRIORITY:
- raise Exception("--priority not implemented for jobs API.")
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
@@ -686,24 +651,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
secret_store=self.secret_store,
collection_cache_size=runtimeContext.collection_cache_size,
collection_cache_is_default=self.should_estimate_cache_size)
- elif self.work_api == "jobs":
- tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- merged_map=merged_map)
- elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
- # Create pipeline for local run
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runtimeContext.cwl_runner_job is not None:
self.uuid = runtimeContext.cwl_runner_job.get('uuid')
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
deleted file mode 100644
index f08e14f7c..000000000
--- a/sdk/cwl/tests/test_job.py
+++ /dev/null
@@ -1,554 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-from future import standard_library
-standard_library.install_aliases()
-from builtins import str
-from builtins import next
-
-import functools
-import json
-import logging
-import mock
-import os
-import unittest
-import copy
-import io
-import argparse
-
-import arvados
-import arvados_cwl
-import arvados_cwl.executor
-import cwltool.process
-from arvados.errors import ApiError
-from schema_salad.ref_resolver import Loader
-from schema_salad.sourceline import cmap
-from .mock_discovery import get_rootDesc
-from .matcher import JsonDiffMatcher, StripYAMLComments
-from .test_container import CollectionMock
-from arvados_cwl.arvdocker import arv_docker_clear_cache
-
-if not os.getenv('ARVADOS_DEBUG'):
- logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
- logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
-class TestJob(unittest.TestCase):
-
- def setUp(self):
- cwltool.process._names = set()
-
- def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
-
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- loadingContext = arvados_cwl.context.ArvLoadingContext(
- {"avsc_names": avsc_names,
- "basedir": "",
- "make_fs_access": make_fs_access,
- "loader": Loader({}),
- "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
- "makeTool": runner.arv_make_tool})
- runtimeContext = arvados_cwl.context.ArvRuntimeContext(
- {"work_api": "jobs",
- "basedir": "",
- "name": "test_run_job_"+str(enable_reuse),
- "make_fs_access": make_fs_access,
- "enable_reuse": enable_reuse,
- "priority": 500})
-
- return loadingContext, runtimeContext
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_run(self, list_images_in_arv):
- for enable_reuse in (True, False):
- arv_docker_clear_cache()
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- # Simulate reused job from another project so that we can check is a can_read
- # link is added.
- runner.api.jobs().create().execute.return_value = {
- 'state': 'Complete' if enable_reuse else 'Queued',
- 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
- 'output': None,
- }
-
- tool = cmap({
- "inputs": [],
- "outputs": [],
- "baseCommand": "ls",
- "arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#",
- "class": "CommandLineTool"
- })
-
- loadingContext, runtimeContext = self.helper(runner, enable_reuse)
-
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
- j.run(runtimeContext)
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls', '$(task.outdir)']
- }],
- },
- 'script_version': 'master',
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
- 'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 1,
- 'min_ram_mb_per_node': 1024,
- 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
- }
- }),
- find_or_create=enable_reuse,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']]
- )
- if enable_reuse:
- runner.api.links().create.assert_called_with(
- body=JsonDiffMatcher({
- 'link_class': 'permission',
- 'name': 'can_read',
- "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
- })
- )
- # Simulate an API excepction when trying to create a
- # sharing link on the job
- runner.api.links().create.side_effect = ApiError(
- mock.MagicMock(return_value={'status': 403}),
- bytes(b'Permission denied'))
- j.run(runtimeContext)
- else:
- assert not runner.api.links().create.called
-
- # The test passes some fields in builder.resources
- # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_resource_requirements(self, list_images_in_arv):
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
- arvados_cwl.add_arv_hints()
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
-
- tool = {
- "inputs": [],
- "outputs": [],
- "hints": [{
- "class": "ResourceRequirement",
- "coresMin": 3,
- "ramMin": 3000,
- "tmpdirMin": 4000
- }, {
- "class": "http://arvados.org/cwl#RuntimeConstraints",
- "keep_cache": 512,
- "outputDirType": "keep_output_dir"
- }, {
- "class": "http://arvados.org/cwl#APIRequirement",
- },
- {
- "class": "http://arvados.org/cwl#ReuseRequirement",
- "enableReuse": False
- }],
- "baseCommand": "ls",
- "id": "#",
- "class": "CommandLineTool"
- }
-
- loadingContext, runtimeContext = self.helper(runner)
-
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
- j.run(runtimeContext)
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
- 'task.keepTmpOutput': True,
- 'command': ['ls']
- }]
- },
- 'script_version': 'master',
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
- 'runtime_constraints': {
- 'docker_image': 'arvados/jobs',
- 'min_cores_per_node': 3,
- 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
- 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
- 'keep_cache_mb_per_task': 512
- }
- }),
- find_or_create=False,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']])
-
- @mock.patch("arvados.collection.CollectionReader")
- def test_done(self, reader):
- api = mock.MagicMock()
-
- runner = mock.MagicMock()
- runner.api = api
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.num_retries = 0
- runner.ignore_docker_for_reuse = False
-
- reader().keys.return_value = "log.txt"
- reader().open.return_value = io.StringIO(
- str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
- """))
- api.collections().list().execute.side_effect = ({"items": []},
- {"items": [{"manifest_text": "XYZ"}]},
- {"items": []},
- {"items": [{"manifest_text": "ABC"}]})
-
- arvjob = arvados_cwl.ArvadosJob(runner,
- mock.MagicMock(),
- {},
- None,
- [],
- [],
- "testjob")
- arvjob.output_callback = mock.MagicMock()
- arvjob.collect_outputs = mock.MagicMock()
- arvjob.collect_outputs.return_value = {"out": "stuff"}
-
- arvjob.done({
- "state": "Complete",
- "output": "99999999999999999999999999999993+99",
- "log": "99999999999999999999999999999994+99",
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- })
-
- api.collections().list.assert_has_calls([
- mock.call(),
- # Output collection check
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
- ['name', '=', 'Output 9999999 of testjob']]),
- mock.call().execute(num_retries=0),
- mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
- select=['manifest_text']),
- mock.call().execute(num_retries=0),
- # Log collection's turn
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
- ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
- mock.call().execute(num_retries=0),
- mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
- select=['manifest_text']),
- mock.call().execute(num_retries=0)])
-
- api.collections().create.assert_has_calls([
- mock.call(ensure_unique_name=True,
- body={'portable_data_hash': '99999999999999999999999999999993+99',
- 'manifest_text': 'XYZ',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'name': 'Output 9999999 of testjob'}),
- mock.call().execute(num_retries=0),
- mock.call(ensure_unique_name=True,
- body={'portable_data_hash': '99999999999999999999999999999994+99',
- 'manifest_text': 'ABC',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- mock.call().execute(num_retries=0),
- ])
-
- arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
-
- @mock.patch("arvados.collection.CollectionReader")
- def test_done_use_existing_collection(self, reader):
- api = mock.MagicMock()
-
- runner = mock.MagicMock()
- runner.api = api
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.num_retries = 0
-
- reader().keys.return_value = "log.txt"
- reader().open.return_value = io.StringIO(
- str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
-2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
- """))
-
- api.collections().list().execute.side_effect = (
- {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
- {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
- )
-
- arvjob = arvados_cwl.ArvadosJob(runner,
- mock.MagicMock(),
- {},
- None,
- [],
- [],
- "testjob")
- arvjob.output_callback = mock.MagicMock()
- arvjob.collect_outputs = mock.MagicMock()
- arvjob.collect_outputs.return_value = {"out": "stuff"}
-
- arvjob.done({
- "state": "Complete",
- "output": "99999999999999999999999999999993+99",
- "log": "99999999999999999999999999999994+99",
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- })
-
- api.collections().list.assert_has_calls([
- mock.call(),
- # Output collection
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
- ['name', '=', 'Output 9999999 of testjob']]),
- mock.call().execute(num_retries=0),
- # Log collection
- mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
- ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
- ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
- mock.call().execute(num_retries=0)
- ])
-
- self.assertFalse(api.collections().create.called)
-
- arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
-
-
-class TestWorkflow(unittest.TestCase):
-
- def setUp(self):
- cwltool.process._names = set()
-
- def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
-
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
-
- loadingContext = arvados_cwl.context.ArvLoadingContext(
- {"avsc_names": avsc_names,
- "basedir": "",
- "make_fs_access": make_fs_access,
- "loader": document_loader,
- "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
- "construct_tool_object": runner.arv_make_tool})
- runtimeContext = arvados_cwl.context.ArvRuntimeContext(
- {"work_api": "jobs",
- "basedir": "",
- "name": "test_run_wf_"+str(enable_reuse),
- "make_fs_access": make_fs_access,
- "enable_reuse": enable_reuse,
- "priority": 500})
-
- return loadingContext, runtimeContext
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("arvados.collection.Collection")
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
- arv_docker_clear_cache()
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = get_rootDesc()
-
- runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
- output_name=None,
- output_tags=None,
- thread_count=1,
- collection_cache_size=None))
- self.assertEqual(runner.work_api, 'jobs')
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_value = {"items": [{
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
- "portable_data_hash": "99999999999999999999999999999993+99"}]}
-
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- loadingContext, runtimeContext = self.helper(runner)
- runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
- metadata["cwlVersion"] = tool["cwlVersion"]
-
- mockc = mock.MagicMock()
- mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
- mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
-
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
- arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
- next(it).run(runtimeContext)
- next(it).run(runtimeContext)
-
- with open("tests/wf/scatter2_subwf.cwl") as f:
- subwf = StripYAMLComments(f.read().rstrip())
-
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script_version': 'master',
- 'script': 'crunchrunner',
- 'script_parameters': {
- 'tasks': [{'task.env': {
- 'HOME': '$(task.outdir)',
- 'TMPDIR': '$(task.tmpdir)'},
- 'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
- },
- 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
- 'task.stdout': 'cwl.output.json'}]},
- 'runtime_constraints': {
- 'min_scratch_mb_per_node': 2048,
- 'min_cores_per_node': 1,
- 'docker_image': 'arvados/jobs',
- 'min_ram_mb_per_node': 1024
- },
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']],
- find_or_create=True)
-
- mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockc.open().__enter__().write.assert_has_calls([mock.call(
-bytes(b'''{
- "fileblub": {
- "basename": "token.txt",
- "class": "File",
- "location": "/keep/99999999999999999999999999999999+118/token.txt",
- "size": 0
- },
- "sleeptime": 5
-}'''))])
-
- # The test passes no builder.resources
- # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("arvados.collection.Collection")
- @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
- def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
- arv_docker_clear_cache()
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = get_rootDesc()
-
- runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
- output_name=None,
- output_tags=None,
- thread_count=1,
- collection_cache_size=None))
- self.assertEqual(runner.work_api, 'jobs')
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_value = {"items": [{
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
- "portable_data_hash": "99999999999999999999999999999993+99"}]}
-
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
-
- loadingContext, runtimeContext = self.helper(runner)
- loadingContext.do_update = True
- runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
-
- mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
-
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
- arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
- next(it).run(runtimeContext)
- next(it).run(runtimeContext)
-
- with open("tests/wf/echo-subwf.cwl") as f:
- subwf = StripYAMLComments(f.read())
-
- runner.api.jobs().create.assert_called_with(
- body=JsonDiffMatcher({
- 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
- 'repository': 'arvados',
- 'script_version': 'master',
- 'script': 'crunchrunner',
- 'script_parameters': {
- 'tasks': [{'task.env': {
- 'HOME': '$(task.outdir)',
- 'TMPDIR': '$(task.tmpdir)'},
- 'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
- },
- 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
- 'task.stdout': 'cwl.output.json'}]},
- 'runtime_constraints': {
- 'min_scratch_mb_per_node': 4096,
- 'min_cores_per_node': 3,
- 'docker_image': 'arvados/jobs',
- 'min_ram_mb_per_node': 1024
- },
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
- ['docker_image_locator', 'in docker', 'arvados/jobs']],
- find_or_create=True)
-
- def test_default_work_api(self):
- arvados_cwl.add_arv_hints()
-
- api = mock.MagicMock()
- api._rootDesc = copy.deepcopy(get_rootDesc())
- del api._rootDesc.get('resources')['jobs']['methods']['create']
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
- self.assertEqual(runner.work_api, 'containers')
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 1dbd968ea..d215cba7f 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -340,73 +340,6 @@ class TestSubmit(unittest.TestCase):
def setUp(self):
cwltool.process._names = set()
- @mock.patch("arvados_cwl.arvdocker.arv_docker_get_image")
- @mock.patch("time.sleep")
- @stubs
- def test_submit(self, stubs, tm, arvdock):
- def get_image(api_client, dockerRequirement, pull_image, project_uuid):
- if dockerRequirement["dockerPull"] == 'arvados/jobs:'+arvados_cwl.__version__:
- return '999999999999999999999999999999d3+99'
- elif dockerRequirement["dockerPull"] == "debian:8":
- return '999999999999999999999999999999d4+99'
- arvdock.side_effect = get_image
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.collections().create.assert_has_calls([
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
- }), ensure_unique_name=False),
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
- 'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
- }), ensure_unique_name=False),
- mock.call(body=JsonDiffMatcher({
- 'manifest_text':
- ". 68089141fbf7e020ac90a9d6a575bc8f+1312 0:1312:workflow.cwl\n",
- 'replication_desired': None,
- 'name': 'submit_wf.cwl',
- }), ensure_unique_name=True) ])
-
- arvdock.assert_has_calls([
- mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8"}, True, None),
- mock.call(stubs.api, {"class": "DockerRequirement", "dockerPull": "debian:8", 'http://arvados.org/cwl#dockerCollectionPDH': '999999999999999999999999999999d4+99'}, True, None),
- mock.call(stubs.api, {'dockerPull': 'arvados/jobs:'+arvados_cwl.__version__}, True, None)
- ])
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_no_reuse(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--disable-reuse",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
- expect_pipeline["properties"] = {"run_options": {"enable_job_reuse": False}}
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
@stubs
def test_error_when_multiple_storage_classes_specified(self, stubs):
storage_classes = "foo,bar"
@@ -418,41 +351,6 @@ class TestSubmit(unittest.TestCase):
@mock.patch("time.sleep")
@stubs
- def test_submit_on_error(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--on-error=stop",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:on_error"] = "stop"
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_runner_ram(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["runtime_constraints"]["min_ram_mb_per_node"] = 2048
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
def test_submit_invalid_runner_ram(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=-2048",
@@ -460,81 +358,6 @@ class TestSubmit(unittest.TestCase):
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 1)
- @mock.patch("time.sleep")
- @stubs
- def test_submit_output_name(self, stubs, tm):
- output_name = "test_output_name"
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--output-name", output_name,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_pipeline_name(self, stubs, tm):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--name=hello job 123",
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["name"] = "hello job 123"
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_output_tags(self, stubs, tm):
- output_tags = "tag0,tag1,tag2"
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
-
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
-
- @mock.patch("time.sleep")
- @stubs
- def test_submit_with_project_uuid(self, stubs, tm):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = project_uuid
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
@stubs
def test_submit_container(self, stubs):
@@ -878,28 +701,6 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @mock.patch("arvados.collection.CollectionReader")
- @mock.patch("time.sleep")
- @stubs
- def test_submit_jobs_keepref(self, stubs, tm, reader):
- with open("tests/wf/expect_arvworkflow.cwl") as f:
- reader().open().__enter__().read.return_value = f.read()
-
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug",
- "keep:99999999999999999999999999999994+99/expect_arvworkflow.cwl#main", "-x", "XxX"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["x"] = "XxX"
- del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["y"]
- del expect_pipeline["components"]["cwl-runner"]["script_parameters"]["z"]
- expect_pipeline["components"]["cwl-runner"]["script_parameters"]["cwl:tool"] = "99999999999999999999999999999994+99/expect_arvworkflow.cwl#main"
- expect_pipeline["name"] = "expect_arvworkflow.cwl#main"
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(exited, 0)
-
@mock.patch("time.sleep")
@stubs
def test_submit_arvworkflow(self, stubs, tm):
@@ -1116,22 +917,6 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(exited, 0)
@stubs
- def test_submit_job_runner_image(self, stubs):
- exited = arvados_cwl.main(
- ["--submit", "--no-wait", "--api=jobs", "--debug", "--submit-runner-image=arvados/jobs:123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
- stubs.expect_pipeline_instance["components"]["cwl-runner"]["runtime_constraints"]["docker_image"] = "999999999999999999999999999999d5+99"
-
- expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.api.pipeline_instances().create.assert_called_with(
- body=JsonDiffMatcher(expect_pipeline))
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @stubs
def test_submit_container_runner_image(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--submit-runner-image=arvados/jobs:123",
@@ -1543,123 +1328,6 @@ class TestSubmit(unittest.TestCase):
cwltool_logger.removeHandler(stderr_logger)
-class TestCreateTemplate(unittest.TestCase):
- existing_template_uuid = "zzzzz-p5p6p-validworkfloyml"
-
- def _adjust_script_params(self, expect_component):
- expect_component['script_parameters']['x'] = {
- 'dataclass': 'File',
- 'required': True,
- 'type': 'File',
- 'value': '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- }
- expect_component['script_parameters']['y'] = {
- 'dataclass': 'Collection',
- 'required': True,
- 'type': 'Directory',
- 'value': '99999999999999999999999999999998+99',
- }
- expect_component['script_parameters']['z'] = {
- 'dataclass': 'Collection',
- 'required': True,
- 'type': 'Directory',
- }
-
- @stubs
- def test_create(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--create-workflow", "--debug",
- "--api=jobs",
- "--project-uuid", project_uuid,
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "submit_wf.cwl": expect_component,
- },
- "name": "submit_wf.cwl",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @stubs
- def test_create_name(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--create-workflow", "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "--name", "testing 123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "testing 123": expect_component,
- },
- "name": "testing 123",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_pipeline_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
- @stubs
- def test_update_name(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_template_uuid,
- "--debug",
- "--project-uuid", project_uuid,
- "--api=jobs",
- "--name", "testing 123",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
-
- expect_component = copy.deepcopy(stubs.expect_job_spec)
- self._adjust_script_params(expect_component)
- expect_template = {
- "components": {
- "testing 123": expect_component,
- },
- "name": "testing 123",
- "owner_uuid": project_uuid,
- }
- stubs.api.pipeline_templates().create.refute_called()
- stubs.api.pipeline_templates().update.assert_called_with(
- body=JsonDiffMatcher(expect_template), uuid=self.existing_template_uuid)
-
- self.assertEqual(stubs.capture_stdout.getvalue(),
- self.existing_template_uuid + '\n')
- self.assertEqual(exited, 0)
-
-
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
expect_workflow = StripYAMLComments(
@@ -1724,26 +1392,6 @@ class TestCreateWorkflow(unittest.TestCase):
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
- def test_incompatible_api(self, stubs):
- capture_stderr = StringIO()
- acr_logger = logging.getLogger('arvados.cwl-runner')
- stderr_logger = logging.StreamHandler(capture_stderr)
- acr_logger.addHandler(stderr_logger)
-
- try:
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_workflow_uuid,
- "--api=jobs",
- "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stderr, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 1)
- self.assertRegexpMatches(
- capture_stderr.getvalue(),
- "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
- finally:
- acr_logger.removeHandler(stderr_logger)
@stubs
def test_update(self, stubs):
@@ -1817,82 +1465,3 @@ class TestCreateWorkflow(unittest.TestCase):
self.assertEqual(stubs.capture_stdout.getvalue(),
stubs.expect_workflow_uuid + '\n')
self.assertEqual(exited, 0)
-
-class TestTemplateInputs(unittest.TestCase):
- expect_template = {
- "components": {
- "inputs_test.cwl": {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'cwl:tool':
- 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main',
- 'optionalFloatInput': None,
- 'fileInput': {
- 'type': 'File',
- 'dataclass': 'File',
- 'required': True,
- 'title': "It's a file; we expect to find some characters in it.",
- 'description': 'If there were anything further to say, it would be said here,\nor here.'
- },
- 'floatInput': {
- 'type': 'float',
- 'dataclass': 'number',
- 'required': True,
- 'title': 'Floats like a duck',
- 'default': 0.1,
- 'value': 0.1,
- },
- 'optionalFloatInput': {
- 'type': ['null', 'float'],
- 'dataclass': 'number',
- 'required': False,
- },
- 'boolInput': {
- 'type': 'boolean',
- 'dataclass': 'boolean',
- 'required': True,
- 'title': 'True or false?',
- },
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- },
- },
- "name": "inputs_test.cwl",
- }
-
- @stubs
- def test_inputs_empty(self, stubs):
- exited = arvados_cwl.main(
- ["--debug", "--api=jobs", "--create-template",
- "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(self.expect_template), ensure_unique_name=True)
-
- self.assertEqual(exited, 0)
-
- @stubs
- def test_inputs(self, stubs):
- exited = arvados_cwl.main(
- ["--api=jobs", "--create-template",
- "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
- stubs.capture_stdout, sys.stderr, api_client=stubs.api)
-
- expect_template = copy.deepcopy(self.expect_template)
- params = expect_template[
- "components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '169f39d466a5438ac4a90e779bf750c7+53/blorp.txt'
- params["cwl:tool"] = 'a2de777156fb700f1363b1f2e370adca+60/workflow.cwl#main'
- params["floatInput"]["value"] = 1.234
- params["boolInput"]["value"] = True
-
- stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
- self.assertEqual(exited, 0)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list