[ARVADOS] created: 1.1.4-420-g3bf756a35
Git user
git at public.curoverse.com
Fri Jun 15 14:44:25 EDT 2018
at 3bf756a35827ed8f7e0c18b5496031cdfeb8157e (commit)
commit 3bf756a35827ed8f7e0c18b5496031cdfeb8157e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Jun 15 10:59:24 2018 -0400
13627: Fix crunch_script
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index e3bce68e6..70c2173db 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -106,11 +106,14 @@ class ArvadosJob(JobBase):
with Perf(metrics, "arv_docker_get_image %s" % self.name):
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if docker_req and runtimeContextuse_container is not False:
+ if docker_req and runtimeContext.use_container is not False:
if docker_req.get("dockerOutputDirectory"):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api,
+ docker_req,
+ runtimeContext.pull_image,
+ self.arvrunner.project_uuid)
else:
runtime_constraints["docker_image"] = "arvados/jobs"
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index 742c671c7..5b1806b35 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -21,7 +21,7 @@ class ArvadosCommandTool(CommandLineTool):
elif runtimeContext.work_api == "jobs":
return partial(ArvadosJob, self.arvrunner)
- def make_path_mapper(self, reffiles, stagedir, runtimeContext):
+ def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
if runtimeContext.work_api == "containers":
return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index 94e9c8fc8..9f0c91f11 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -27,7 +27,7 @@ from cwltool.process import shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
from cwltool.load_tool import load_tool
from cwltool.errors import WorkflowException
-from cwltool.context import RuntimeContext
+from arvados_cwl.context import ArvRuntimeContext
from .fsaccess import CollectionFetcher, CollectionFsAccess
@@ -98,25 +98,27 @@ def run():
debug = job_order_object["arv:debug"]
del job_order_object["arv:debug"]
+ arvargs = argparse.Namespace()
+ arvargs.work_api = "jobs"
+ arvargs.output_name = output_name
+ arvargs.output_tags = output_tags
+ arvargs.thread_count = 1
+
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
- output_name=output_name, output_tags=output_tags)
+ arvargs=arvargs)
make_fs_access = functools.partial(CollectionFsAccess,
collection_cache=runner.collection_cache)
- t = load_tool(toolpath, runner.arv_make_tool,
- fetcher_constructor=functools.partial(CollectionFetcher,
- api_client=runner.api,
- fs_access=make_fs_access(""),
- num_retries=runner.num_retries))
+ t = load_tool(toolpath, runner.loadingContext)
if debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
logging.getLogger("cwltool").setLevel(logging.DEBUG)
- args = RuntimeContext()
+ args = ArvRuntimeContext(vars(arvargs))
args.project_uuid = arvados.current_job()["owner_uuid"]
args.enable_reuse = enable_reuse
args.on_error = on_error
@@ -135,7 +137,7 @@ def run():
args.disable_js_validation = False
args.tmp_outdir_prefix = "tmp"
- runner.arv_executor(t, job_order_object, args)
+ runner.arv_executor(t, job_order_object, args, logger=logger)
except Exception as e:
if isinstance(e, WorkflowException):
logging.info("Workflow error %s", e)
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 13d1d581b..55548130d 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180615142437',
+ 'cwltool==1.0.20180615183820',
'schema-salad==2.7.20180501211602',
'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
commit d7ca0bbb652dcd2662fb00640d74caae805c6b4d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Jun 15 10:49:45 2018 -0400
13627: Add missing file.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
new file mode 100644
index 000000000..7eeda8bf6
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -0,0 +1,27 @@
+from cwltool.context import LoadingContext, RuntimeContext
+
+class ArvLoadingContext(LoadingContext):
+ def __init__(self, kwargs=None):
+ super(ArvLoadingContext, self).__init__(kwargs)
+
+class ArvRuntimeContext(RuntimeContext):
+ def __init__(self, kwargs=None):
+ self.work_api = None
+ self.extra_reffiles = []
+ self.priority = 500
+ self.enable_reuse = True
+ self.runnerjob = ""
+ self.submit_request_uuid = None
+ self.project_uuid = None
+ self.trash_intermediate = False
+ self.intermediate_output_ttl = 0
+ self.update_workflow = ""
+ self.create_workflow = False
+ self.submit_runner_ram = 0
+ self.ignore_docker_for_reuse = False
+ self.submit = True
+ self.submit_runner_image = None
+ self.wait = True
+ self.cwl_runner_job = None
+
+ super(ArvRuntimeContext, self).__init__(kwargs)
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 45464a9ea..13d1d581b 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180615135742',
+ 'cwltool==1.0.20180615142437',
'schema-salad==2.7.20180501211602',
'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
commit bdb64a15af28e33cd5a56d88121325766ea9d941
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Jun 15 10:40:43 2018 -0400
13627: Improve error reporting.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 48df66296..274c66891 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -147,7 +147,7 @@ class ArvCwlRunner(object):
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
return ArvadosWorkflow(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.defaultMakeTool(toolpath_object, loadingContext)
+ return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
@@ -157,7 +157,7 @@ class ArvCwlRunner(object):
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
else:
- logger.warn("Overall process status is %s", processStatus)
+ logger.error("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 049c0c4b5..86031f731 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -309,7 +309,7 @@ class ArvadosContainer(JobBase):
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 0ab065d78..e3bce68e6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -252,7 +252,7 @@ class ArvadosJob(JobBase):
dirs[g.group(1)] = g.group(2)
if processStatus == "permanentFail":
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
with Perf(metrics, "output collection %s" % self.name):
outputs = done.done(self, record, dirs["tmpdir"],
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index e9254c013..25efade2a 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -55,10 +55,10 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
-def logtail(logcollection, logger, header, maxlen=25):
+def logtail(logcollection, logfunc, header, maxlen=25):
if len(logcollection) == 0:
- logger.info(header)
- logger.info(" ** log is empty **")
+ logfunc(header)
+ logfunc(" ** log is empty **")
return
containersapi = ("crunch-run.txt" in logcollection)
@@ -95,5 +95,5 @@ def logtail(logcollection, logger, header, maxlen=25):
loglines = mergelogs.values()[0]
logtxt = "\n ".join(l.strip() for l in loglines)
- logger.info(header)
- logger.info("\n %s", logtxt)
+ logfunc(header)
+ logfunc("\n %s", logtxt)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 6cdda6140..60e0bf99c 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -414,7 +414,7 @@ class Runner(object):
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index c28860375..45464a9ea 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180614214548',
+ 'cwltool==1.0.20180615135742',
'schema-salad==2.7.20180501211602',
'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
commit 2e9553d023c57e1354bbd54ee85b8fb5ff875baf
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Jun 15 10:11:01 2018 -0400
13627: Fix loadingContext used for reloading document.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 29608844e..48df66296 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -71,9 +71,19 @@ class ArvCwlRunner(object):
"""
- def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
+ def __init__(self, api_client,
+ arvargs=None,
+ keep_client=None,
+ num_retries=4,
thread_count=4):
+
+ if arvargs is None:
+ arvargs = argparse.Namespace()
+ arvargs.work_api = None
+ arvargs.output_name = None
+ arvargs.output_tags = None
+ arvargs.thread_count = 1
+
self.api = api_client
self.processes = {}
self.workflow_eval_lock = threading.Condition(threading.RLock())
@@ -85,14 +95,15 @@ class ArvCwlRunner(object):
self.poll_api = None
self.pipeline = None
self.final_output_collection = None
- self.output_name = output_name
- self.output_tags = output_tags
+ self.output_name = arvargs.output_name
+ self.output_tags = arvargs.output_tags
self.project_uuid = None
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.thread_count = thread_count
+ self.thread_count = arvargs.thread_count
self.poll_interval = 12
+ self.loadingContext = None
if keep_client is not None:
self.keep_client = keep_client
@@ -112,18 +123,24 @@ class ArvCwlRunner(object):
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
if ('httpMethod' in methods['create'] and
- (work_api == api or work_api is None)):
+ (arvargs.work_api == api or arvargs.work_api is None)):
self.work_api = api
break
except KeyError:
pass
if not self.work_api:
- if work_api is None:
+ if arvargs.work_api is None:
raise Exception("No supported APIs")
else:
raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
+ self.loadingContext = ArvLoadingContext(vars(arvargs))
+ self.loadingContext.fetcher_constructor = self.fetcher_constructor
+ self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ self.loadingContext.construct_tool_object = self.arv_make_tool
+
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, loadingContext)
@@ -405,12 +422,11 @@ class ArvCwlRunner(object):
# Reload tool object which may have been updated by
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
- loadingContext = ArvLoadingContext({
- "construct_tool_object": self.arv_make_tool,
- "loader": tool.doc_loader,
- "avsc_names": tool.doc_schema,
- "metadata": tool.metadata,
- "do_validate": False})
+ loadingContext = self.loadingContext.copy()
+ loadingContext.loader = tool.doc_loader
+ loadingContext.avsc_names = tool.doc_schema
+ loadingContext.metadata = tool.metadata
+ loadingContext.do_validate = False
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
loadingContext)
@@ -777,6 +793,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
job_order_object = None
arvargs = parser.parse_args(args)
+ arvargs.use_container = True
+ arvargs.relax_path_checks = True
+ arvargs.print_supported_versions = False
+
if install_sig_handlers:
arv_cmd.install_signal_handlers()
@@ -804,10 +824,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
- num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags,
- thread_count=arvargs.thread_count)
+ runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
@@ -836,15 +853,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
if not hasattr(arvargs, key):
setattr(arvargs, key, val)
- arvargs.use_container = True
- arvargs.relax_path_checks = True
- arvargs.print_supported_versions = False
-
- loadingContext = ArvLoadingContext(vars(arvargs))
- loadingContext.fetcher_constructor = runner.fetcher_constructor
- loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
- loadingContext.construct_tool_object = runner.arv_make_tool
-
runtimeContext = ArvRuntimeContext(vars(arvargs))
runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=runner.collection_cache)
@@ -857,5 +865,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
job_order_object=job_order_object,
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
- loadingContext=loadingContext,
+ loadingContext=runner.loadingContext,
runtimeContext=runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 940ae92e7..0ab065d78 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -278,7 +278,7 @@ class ArvadosJob(JobBase):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def arvados_job_spec(self, runtimeContext):
+ def arvados_job_spec(self, debug=False):
"""Create an Arvados job specification for this workflow.
The returned dict can be used to create a job (i.e., passed as
@@ -308,7 +308,7 @@ class RunnerJob(Runner):
if self.on_error:
self.job_order["arv:on_error"] = self.on_error
- if runtimeContext.debug:
+ if debug:
self.job_order["arv:debug"] = True
return {
@@ -324,7 +324,7 @@ class RunnerJob(Runner):
}
def run(self, runtimeContext):
- job_spec = self.arvados_job_spec(runtimeContext)
+ job_spec = self.arvados_job_spec(runtimeContext.debug)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
commit a52a27cd01fed6525a45539728b979467b1b5f13
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Jun 14 22:38:57 2018 -0400
13627: a-c-r migration to cwltool loading/runtimeContext API
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5bacb6c03..29608844e 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -45,12 +45,14 @@ from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver,
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
+from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
+
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
@@ -370,7 +372,7 @@ class ArvCwlRunner(object):
'progress':1.0
}).execute(num_retries=self.num_retries)
- def arv_executor(self, tool, job_order, runtimeContext):
+ def arv_executor(self, tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
tool.visit(self.check_features)
@@ -403,12 +405,15 @@ class ArvCwlRunner(object):
# Reload tool object which may have been updated by
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
+ loadingContext = ArvLoadingContext({
+ "construct_tool_object": self.arv_make_tool,
+ "loader": tool.doc_loader,
+ "avsc_names": tool.doc_schema,
+ "metadata": tool.metadata,
+ "do_validate": False})
+
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- construct_tool_object=self.arv_make_tool,
- loader=tool.doc_loader,
- avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- do_validate=False)
+ loadingContext)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
@@ -827,7 +832,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- for key, val in six.iteritems(cwltool.argparser.get_default_args()):
+ for key, val in cwltool.argparser.get_default_args().items():
if not hasattr(arvargs, key):
setattr(arvargs, key, val)
@@ -835,12 +840,12 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
arvargs.relax_path_checks = True
arvargs.print_supported_versions = False
- loadingContext = LoadingContext(vars(arvargs))
+ loadingContext = ArvLoadingContext(vars(arvargs))
loadingContext.fetcher_constructor = runner.fetcher_constructor
loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
loadingContext.construct_tool_object = runner.arv_make_tool
- runtimeContext = RuntimeContext(vars(arvargs))
+ runtimeContext = ArvRuntimeContext(vars(arvargs))
runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=runner.collection_cache)
@@ -851,4 +856,6 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
versionfunc=versionstring,
job_order_object=job_order_object,
logger_handler=arvados.log_handler,
- custom_schema_callback=add_arv_hints)
+ custom_schema_callback=add_arv_hints,
+ loadingContext=loadingContext,
+ runtimeContext=runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index d820f8f6b..049c0c4b5 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -17,6 +17,7 @@ from cwltool.errors import WorkflowException
from cwltool.process import UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
+from cwltool.job import JobBase
import arvados.collection
@@ -30,10 +31,18 @@ from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-class ArvadosContainer(object):
+class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
@@ -196,7 +205,7 @@ class ArvadosContainer(object):
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
- pull_image,
+ runtimeContext.pull_image,
self.arvrunner.project_uuid)
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 4973c8a8c..940ae92e7 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -14,6 +14,7 @@ from cwltool.command_line_tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.job import JobBase
from schema_salad.sourceline import SourceLine
@@ -36,10 +37,18 @@ crunchrunner_re = re.compile(r"^.*crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)
crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
-class ArvadosJob(object):
+class ArvadosJob(JobBase):
"""Submit and manage a Crunch job for executing a CWL CommandLineTool."""
- def __init__(self, runner):
+ def __init__(self, runner,
+ builder, # type: Builder
+ joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
+ make_path_mapper, # type: Callable[..., PathMapper]
+ requirements, # type: List[Dict[Text, Text]]
+ hints, # type: List[Dict[Text, Text]]
+ name # type: Text
+ ):
+ super(ArvadosJob, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
self.running = False
self.uuid = None
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index f4caf7f4b..742c671c7 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -6,28 +6,27 @@ from cwltool.command_line_tool import CommandLineTool
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from functools import partial
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, runtimeContext):
- super(ArvadosCommandTool, self).__init__(toolpath_object, runtimeContext)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = runtimeContext.work_api
- def makeJobRunner(self, runtimeContext):
- if self.work_api == "containers":
- return ArvadosContainer(self.arvrunner)
- elif self.work_api == "jobs":
- return ArvadosJob(self.arvrunner)
+ def make_job_runner(self, runtimeContext):
+ if runtimeContext.work_api == "containers":
+ return partial(ArvadosContainer, self.arvrunner)
+ elif runtimeContext.work_api == "jobs":
+ return partial(ArvadosJob, self.arvrunner)
- def makePathMapper(self, reffiles, stagedir, runtimeContext):
- # type: (List[Any], unicode, **Any) -> PathMapper
- if self.work_api == "containers":
+ def make_path_mapper(self, reffiles, stagedir, runtimeContext):
+ if runtimeContext.work_api == "containers":
return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
"/keep/%s/%s")
- elif self.work_api == "jobs":
+ elif runtimeContext.work_api == "jobs":
return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
"$(task.keep)/%s",
"$(task.keep)/%s/%s")
@@ -43,7 +42,7 @@ class ArvadosCommandTool(CommandLineTool):
runtimeContext = runtimeContext.copy()
- if self.work_api == "containers":
+ if runtimeContext.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
if dockerReq and dockerReq.get("dockerOutputDirectory"):
runtimeContext.outdir = dockerReq.get("dockerOutputDirectory")
@@ -51,7 +50,7 @@ class ArvadosCommandTool(CommandLineTool):
else:
runtimeContext.outdir = "/var/spool/cwl"
runtimeContext.docker_outdir = "/var/spool/cwl"
- elif self.work_api == "jobs":
+ elif runtimeContext.work_api == "jobs":
runtimeContext.outdir = "$(task.outdir)"
runtimeContext.docker_outdir = "$(task.outdir)"
runtimeContext.tmpdir = "$(task.tmpdir)"
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 94b8568db..5939b0413 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -143,11 +143,10 @@ class ArvadosWorkflow(Workflow):
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder()
- builder.job = joborder
- builder.requirements = workflowobj["requirements"]
- builder.hints = workflowobj["hints"]
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
+ resources={})
def visit(item):
for t in ("hints", "requirements"):
@@ -259,14 +258,12 @@ class ArvadosWorkflow(Workflow):
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
"requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
{
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
}, {
"entryname": "cwl.input.yml",
"entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 2295e934a..af6f7721f 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0
import arvados_cwl
+import arvados_cwl.context
from arvados_cwl.arvdocker import arv_docker_clear_cache
import logging
import mock
@@ -23,6 +24,28 @@ if not os.getenv('ARVADOS_DEBUG'):
class TestContainer(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"}})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -40,8 +63,6 @@ class TestContainer(unittest.TestCase):
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
tool = cmap({
"inputs": [],
"outputs": [],
@@ -50,15 +71,14 @@ class TestContainer(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=enable_reuse, priority=500)
+
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
@@ -102,8 +122,6 @@ class TestContainer(unittest.TestCase):
runner.intermediate_output_ttl = 3600
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
@@ -136,15 +154,14 @@ class TestContainer(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_resource_requirements"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
@@ -200,8 +217,6 @@ class TestContainer(unittest.TestCase):
runner.intermediate_output_ttl = 0
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
@@ -248,15 +263,14 @@ class TestContainer(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_initial_work_dir"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
@@ -350,15 +364,14 @@ class TestContainer(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_redirect"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
@@ -421,9 +434,13 @@ class TestContainer(unittest.TestCase):
col().open.return_value = []
- arvjob = arvados_cwl.ArvadosContainer(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosContainer(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.successCodes = [0]
@@ -476,11 +493,11 @@ class TestContainer(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_mounts"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {
"p1": {
@@ -498,9 +515,8 @@ class TestContainer(unittest.TestCase):
]
}
}
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
@@ -584,19 +600,18 @@ class TestContainer(unittest.TestCase):
]
}
]})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_secrets"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {"pw": "blorp"}
runner.secret_store.store(["pw"], job_order)
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_secrets",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 30930dd49..1c45b384f 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -26,6 +26,28 @@ if not os.getenv('ARVADOS_DEBUG'):
class TestJob(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"},
+ "makeTool": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
@@ -35,7 +57,6 @@ class TestJob(unittest.TestCase):
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
@@ -56,14 +77,13 @@ class TestJob(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=enable_reuse)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
@@ -105,7 +125,7 @@ class TestJob(unittest.TestCase):
runner.api.links().create.side_effect = ApiError(
mock.MagicMock(return_value={'status': 403}),
'Permission denied')
- j.run(enable_reuse=enable_reuse)
+ j.run(runtimeContext)
else:
assert not runner.api.links().create.called
@@ -122,9 +142,6 @@ class TestJob(unittest.TestCase):
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-
tool = {
"inputs": [],
"outputs": [],
@@ -148,14 +165,13 @@ class TestJob(unittest.TestCase):
"id": "#",
"class": "CommandLineTool"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=True)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
@@ -205,9 +221,13 @@ class TestJob(unittest.TestCase):
{"items": []},
{"items": [{"manifest_text": "ABC"}]})
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
@@ -275,9 +295,13 @@ class TestJob(unittest.TestCase):
{"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
)
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
@@ -309,6 +333,34 @@ class TestJob(unittest.TestCase):
class TestWorkflow(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": document_loader,
+ "metadata": {"cwlVersion": "v1.0"},
+ "construct_tool_object": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.collection.CollectionReader")
@@ -330,27 +382,20 @@ class TestWorkflow(unittest.TestCase):
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
@@ -416,27 +461,19 @@ class TestWorkflow(unittest.TestCase):
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/echo-subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
commit 17be5d7235255cfdc290b0caf039529d57caf9c9
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Jun 14 17:56:13 2018 -0400
API fixup
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5b29ae517..5bacb6c03 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ import cwltool.workflow
import cwltool.process
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
+import cwltool.argparser
import arvados
import arvados.config
@@ -121,16 +122,13 @@ class ArvCwlRunner(object):
else:
raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
- def arv_make_tool(self, toolpath_object, **kwargs):
- kwargs["work_api"] = self.work_api
- kwargs["fetcher_constructor"] = self.fetcher_constructor
- kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
+ def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, loadingContext)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
- return ArvadosWorkflow(self, toolpath_object, **kwargs)
+ return ArvadosWorkflow(self, toolpath_object, loadingContext)
else:
- return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+ return cwltool.workflow.defaultMakeTool(toolpath_object, loadingContext)
def output_callback(self, out, processStatus):
with self.workflow_eval_lock:
@@ -149,8 +147,8 @@ class ArvCwlRunner(object):
self.workflow_eval_lock.notifyAll()
- def start_run(self, runnable, kwargs):
- self.task_queue.add(partial(runnable.run, **kwargs))
+ def start_run(self, runnable, runtimeContext):
+ self.task_queue.add(partial(runnable.run, runtimeContext))
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -372,33 +370,31 @@ class ArvCwlRunner(object):
'progress':1.0
}).execute(num_retries=self.num_retries)
- def arv_executor(self, tool, job_order, **kwargs):
- self.debug = kwargs.get("debug")
+ def arv_executor(self, tool, job_order, runtimeContext):
+ self.debug = runtimeContext.debug
tool.visit(self.check_features)
- self.project_uuid = kwargs.get("project_uuid")
+ self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
- make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- collection_cache=self.collection_cache)
- self.fs_access = make_fs_access(kwargs["basedir"])
- self.secret_store = kwargs.get("secret_store")
+ self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ self.secret_store = runtimeContext.secret_store
- self.trash_intermediate = kwargs["trash_intermediate"]
+ self.trash_intermediate = runtimeContext.trash_intermediate
if self.trash_intermediate and self.work_api != "containers":
raise Exception("--trash-intermediate is only supported with --api=containers.")
- self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
+ self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
if self.intermediate_output_ttl and self.work_api != "containers":
raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
- if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
- if not kwargs.get("name"):
- kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if not runtimeContext.name:
+ runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
@@ -408,25 +404,25 @@ class ArvCwlRunner(object):
# upload_workflow_deps
# Don't validate this time because it will just print redundant errors.
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- makeTool=self.arv_make_tool,
+ construct_tool_object=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
metadata=tool.metadata,
do_validate=False)
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
tool, job_order)
- existing_uuid = kwargs.get("update_workflow")
- if existing_uuid or kwargs.get("create_workflow"):
+ existing_uuid = runtimeContext.update_workflow
+ if existing_uuid or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "jobs":
tmpl = RunnerTemplate(self, tool, job_order,
- kwargs.get("enable_reuse"),
+ runtimeContext.enable_reuse,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
@@ -435,81 +431,79 @@ class ArvCwlRunner(object):
return (upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"],
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
merged_map=merged_map),
"success")
- self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.eval_timeout = kwargs.get("eval_timeout")
+ self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
+ self.eval_timeout = runtimeContext.eval_timeout
- kwargs["make_fs_access"] = make_fs_access
- kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- kwargs["use_container"] = True
- kwargs["tmpdir_prefix"] = "tmp"
- kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.use_container = True
+ runtimeContext.tmpdir_prefix = "tmp"
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
raise Exception("--ignore-docker-for-reuse not supported with containers API.")
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
- kwargs["tmpdir"] = "/tmp"
- kwargs["docker_tmpdir"] = "/tmp"
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
+ runtimeContext.tmpdir = "/tmp"
+ runtimeContext.docker_tmpdir = "/tmp"
elif self.work_api == "jobs":
- if kwargs["priority"] != DEFAULT_PRIORITY:
+ if runtimeContext.priority != DEFAULT_PRIORITY:
raise Exception("--priority not implemented for jobs API.")
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["docker_outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
- if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
runnerjob = None
- if kwargs.get("submit"):
+ if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
- kwargs["runnerjob"] = tool.tool["id"]
+ if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
+ runtimeContext.runnerjob = tool.tool["id"]
runnerjob = tool.job(job_order,
self.output_callback,
- **kwargs).next()
+ runtimeContext).next()
else:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
- intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
+ intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
- priority=kwargs.get("priority"),
+ priority=runtimeContext.priority,
secret_store=self.secret_store)
elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
self.output_tags,
- submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs.get("name"),
- on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"),
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ on_error=runtimeContext.on_error,
+ submit_runner_image=runtimeContext.submit_runner_image,
merged_map=merged_map)
- elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"owner_uuid": self.project_uuid,
- "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+ "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if runnerjob and not kwargs.get("wait"):
- submitargs = kwargs.copy()
- submitargs['submit'] = False
- runnerjob.run(**submitargs)
+ if runnerjob and not runtimeContext.wait:
+ submitargs = runtimeContext.copy()
+ submitargs.submit = False
+ runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
@@ -521,11 +515,11 @@ class ArvCwlRunner(object):
if runnerjob:
jobiter = iter((runnerjob,))
else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ if runtimeContext.cwl_runner_job is not None:
+ self.uuid = runtimeContext.cwl_runner_job.get('uuid')
jobiter = tool.job(job_order,
self.output_callback,
- **kwargs)
+ runtimeContext)
try:
self.workflow_eval_lock.acquire()
@@ -547,7 +541,7 @@ class ArvCwlRunner(object):
if runnable:
with Perf(metrics, "run"):
- self.start_run(runnable, kwargs)
+ self.start_run(runnable, runtimeContext)
else:
if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
@@ -588,7 +582,7 @@ class ArvCwlRunner(object):
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if kwargs.get("submit") and isinstance(runnerjob, Runner):
+ if runtimeContext.submit and isinstance(runnerjob, Runner):
logger.info("Final output collection %s", runnerjob.final_output)
else:
if self.output_name is None:
@@ -598,7 +592,7 @@ class ArvCwlRunner(object):
self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
self.set_crunch_output()
- if kwargs.get("compute_checksum"):
+ if runtimeContext.compute_checksum:
adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
@@ -833,26 +827,28 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- arvargs.conformance_test = None
+ for key, val in six.iteritems(cwltool.argparser.get_default_args()):
+ if not hasattr(arvargs, key):
+ setattr(arvargs, key, val)
+
arvargs.use_container = True
arvargs.relax_path_checks = True
arvargs.print_supported_versions = False
- make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
+ loadingContext = LoadingContext(vars(arvargs))
+ loadingContext.fetcher_constructor = runner.fetcher_constructor
+ loadingContext.resolver = partial(collectionResolver, api_client, num_retries=runner.num_retries)
+ loadingContext.construct_tool_object = runner.arv_make_tool
+
+ runtimeContext = RuntimeContext(vars(arvargs))
+ runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
executor=runner.arv_executor,
- makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=make_fs_access,
- fetcher_constructor=partial(CollectionFetcher,
- api_client=api_client,
- fs_access=make_fs_access(""),
- num_retries=runner.num_retries),
- resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0bec69264..d820f8f6b 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -14,7 +14,7 @@ import uuid
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.process import UnsupportedRequirement, shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
@@ -41,7 +41,7 @@ class ArvadosContainer(object):
def update_pipeline_component(self, r):
pass
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
# object. The fields that define execution such as
@@ -54,7 +54,7 @@ class ArvadosContainer(object):
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
- "priority": kwargs.get("priority"),
+ "priority": runtimeContext.priority,
"state": "Committed",
"properties": {},
}
@@ -190,7 +190,7 @@ class ArvadosContainer(object):
mounts["stdout"] = {"kind": "file",
"path": "%s/%s" % (self.outdir, self.stdout)}
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if not docker_req:
docker_req = {"dockerImageId": "arvados/jobs"}
@@ -199,11 +199,11 @@ class ArvadosContainer(object):
pull_image,
self.arvrunner.project_uuid)
- api_req, _ = get_feature(self, "http://arvados.org/cwl#APIRequirement")
+ api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"] * 2**20
@@ -217,11 +217,11 @@ class ArvadosContainer(object):
"writable": True
}
- partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+ partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
if partition_req:
scheduling_parameters["partitions"] = aslist(partition_req["partition"])
- intermediate_output_req, _ = get_feature(self, "http://arvados.org/cwl#IntermediateOutput")
+ intermediate_output_req, _ = self.get_requirement("http://arvados.org/cwl#IntermediateOutput")
if intermediate_output_req:
self.output_ttl = intermediate_output_req["outputTTL"]
else:
@@ -236,15 +236,15 @@ class ArvadosContainer(object):
container_request["runtime_constraints"] = runtime_constraints
container_request["scheduling_parameters"] = scheduling_parameters
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
- if kwargs.get("runnerjob", "").startswith("arvwf:"):
- wfuuid = kwargs["runnerjob"][6:kwargs["runnerjob"].index("#")]
+ if runtimeContext.runnerjob.startswith("arvwf:"):
+ wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if container_request["name"] == "main":
container_request["name"] = wfrecord["name"]
@@ -253,9 +253,9 @@ class ArvadosContainer(object):
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
else:
@@ -329,7 +329,7 @@ class ArvadosContainer(object):
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, runtimeContext):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
@@ -424,7 +424,7 @@ class RunnerContainer(Runner):
if self.output_tags:
command.append("--output-tags=" + self.output_tags)
- if kwargs.get("debug"):
+ if runtimeContext.debug:
command.append("--debug")
if self.on_error:
@@ -446,15 +446,15 @@ class RunnerContainer(Runner):
return container_req
- def run(self, **kwargs):
- kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(**kwargs)
+ def run(self, runtimeContext):
+ runtimeContext.keepprefix = "keep:"
+ job_spec = self.arvados_job_spec(runtimeContext)
if self.arvrunner.project_uuid:
job_spec["owner_uuid"] = self.arvrunner.project_uuid
- if kwargs.get("submit_request_uuid"):
+ if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
- uuid=kwargs["submit_request_uuid"],
+ uuid=runtimeContext.submit_request_uuid,
body=job_spec
).execute(num_retries=self.arvrunner.num_retries)
else:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 04256c68f..4973c8a8c 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -8,7 +8,7 @@ import copy
import json
import time
-from cwltool.process import get_feature, shortname, UnsupportedRequirement
+from cwltool.process import shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
from cwltool.command_line_tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
@@ -44,7 +44,7 @@ class ArvadosJob(object):
self.running = False
self.uuid = None
- def run(self, dry_run=False, pull_image=True, **kwargs):
+ def run(self, runtimeContext):
script_parameters = {
"command": self.command_line
}
@@ -96,8 +96,8 @@ class ArvadosJob(object):
script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
with Perf(metrics, "arv_docker_get_image %s" % self.name):
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+ if docker_req and runtimeContextuse_container is not False:
if docker_req.get("dockerOutputDirectory"):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
@@ -111,7 +111,7 @@ class ArvadosJob(object):
runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
- runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
+ runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
if "keep_cache" in runtime_req:
runtime_constraints["keep_cache_mb_per_task"] = runtime_req["keep_cache"]
@@ -128,9 +128,9 @@ class ArvadosJob(object):
if not self.arvrunner.ignore_docker_for_reuse:
filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
- enable_reuse = kwargs.get("enable_reuse", True)
+ enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
- reuse_req, _ = get_feature(self, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
@@ -269,7 +269,7 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
+ def arvados_job_spec(self, runtimeContext):
"""Create an Arvados job specification for this workflow.
The returned dict can be used to create a job (i.e., passed as
@@ -299,7 +299,7 @@ class RunnerJob(Runner):
if self.on_error:
self.job_order["arv:on_error"] = self.on_error
- if kwargs.get("debug"):
+ if runtimeContext.debug:
self.job_order["arv:debug"] = True
return {
@@ -314,8 +314,8 @@ class RunnerJob(Runner):
}
}
- def run(self, **kwargs):
- job_spec = self.arvados_job_spec(**kwargs)
+ def run(self, runtimeContext):
+ job_spec = self.arvados_job_spec(runtimeContext)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
@@ -346,7 +346,7 @@ class RunnerJob(Runner):
body=instance_spec).execute(num_retries=self.arvrunner.num_retries)
logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
- if kwargs.get("wait") is False:
+ if runtimeContext.wait is False:
self.uuid = self.arvrunner.pipeline["uuid"]
return
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index fea6adfac..f4caf7f4b 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -10,50 +10,50 @@ from .pathmapper import ArvPathMapper
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, runtimeContext):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, runtimeContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
+ self.work_api = runtimeContext.work_api
- def makeJobRunner(self, **kwargs):
+ def makeJobRunner(self, runtimeContext):
if self.work_api == "containers":
return ArvadosContainer(self.arvrunner)
elif self.work_api == "jobs":
return ArvadosJob(self.arvrunner)
- def makePathMapper(self, reffiles, stagedir, **kwargs):
+ def makePathMapper(self, reffiles, stagedir, runtimeContext):
# type: (List[Any], unicode, **Any) -> PathMapper
if self.work_api == "containers":
- return ArvPathMapper(self.arvrunner, reffiles+kwargs.get("extra_reffiles", []), kwargs["basedir"],
+ return ArvPathMapper(self.arvrunner, reffiles+runtimeContext.extra_reffiles, runtimeContext.basedir,
"/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ "/keep/%s/%s")
elif self.work_api == "jobs":
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ return ArvPathMapper(self.arvrunner, reffiles, runtimeContext.basedir,
"$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+ "$(task.keep)/%s/%s")
- def job(self, joborder, output_callback, **kwargs):
+ def job(self, joborder, output_callback, runtimeContext):
# Workaround for #13365
- builderargs = kwargs.copy()
- builderargs["toplevel"] = True
- builderargs["tmp_outdir_prefix"] = ""
- builder = self._init_job(joborder, **builderargs)
+ builderargs = runtimeContext.copy()
+ builderargs.toplevel = True
+ builderargs.tmp_outdir_prefix = ""
+ builder = self._init_job(joborder, builderargs)
joborder = builder.job
+ runtimeContext = runtimeContext.copy()
+
if self.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
if dockerReq and dockerReq.get("dockerOutputDirectory"):
- kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
- kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+ runtimeContext.outdir = dockerReq.get("dockerOutputDirectory")
+ runtimeContext.docker_outdir = dockerReq.get("dockerOutputDirectory")
else:
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
+ runtimeContext.outdir = "/var/spool/cwl"
+ runtimeContext.docker_outdir = "/var/spool/cwl"
elif self.work_api == "jobs":
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["docker_outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
- kwargs["docker_tmpdir"] = "$(task.tmpdir)"
- return super(ArvadosCommandTool, self).job(joborder, output_callback, **kwargs)
+ runtimeContext.outdir = "$(task.outdir)"
+ runtimeContext.docker_outdir = "$(task.outdir)"
+ runtimeContext.tmpdir = "$(task.tmpdir)"
+ runtimeContext.docker_tmpdir = "$(task.tmpdir)"
+ return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index f675fb10e..94b8568db 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -15,6 +15,7 @@ from cwltool.process import shortname
from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
+from cwltool.context import LoadingContext
import ruamel.yaml as yaml
@@ -109,17 +110,16 @@ def get_overall_res_req(res_reqs):
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
+ self.loadingContext = loadingContext
- def job(self, joborder, output_callback, **kwargs):
- kwargs["work_api"] = self.work_api
+ def job(self, joborder, output_callback, runtimeContext):
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
@@ -176,7 +176,7 @@ class ArvadosWorkflow(Workflow):
self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
+ runtimeContext.name,
document_loader,
packed,
uri,
@@ -213,15 +213,16 @@ class ArvadosWorkflow(Workflow):
reffiles = []
visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
- mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
# For containers API, we need to make sure any extra
# referenced files (ie referenced by the workflow but
# not in the inputs) are included in the mounts.
- kwargs["extra_reffiles"] = copy.deepcopy(self.wf_reffiles)
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
def keepmount(obj):
remove_redundant_fields(obj)
@@ -275,9 +276,6 @@ class ArvadosWorkflow(Workflow):
"arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
"id": "#"
})
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- kwargs["metadata"] = self.metadata
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index 5024e95f7..94e9c8fc8 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -27,6 +27,7 @@ from cwltool.process import shortname
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
from cwltool.load_tool import load_tool
from cwltool.errors import WorkflowException
+from cwltool.context import RuntimeContext
from .fsaccess import CollectionFetcher, CollectionFsAccess
@@ -115,7 +116,7 @@ def run():
logging.getLogger('arvados').setLevel(logging.DEBUG)
logging.getLogger("cwltool").setLevel(logging.DEBUG)
- args = argparse.Namespace()
+ args = RuntimeContext()
args.project_uuid = arvados.current_job()["owner_uuid"]
args.enable_reuse = enable_reuse
args.on_error = on_error
@@ -134,7 +135,7 @@ def run():
args.disable_js_validation = False
args.tmp_outdir_prefix = "tmp"
- runner.arv_executor(t, job_order_object, **vars(args))
+ runner.arv_executor(t, job_order_object, args)
except Exception as e:
if isinstance(e, WorkflowException):
logging.info("Workflow error %s", e)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 27e48f1f4..05a358e0d 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -42,7 +42,7 @@ class ArvPathMapper(PathMapper):
pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False, **kwargs):
+ collection_pattern, file_pattern, name=None, single_collection=False):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index cf91f69f8..6cdda6140 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
@@ -201,7 +201,7 @@ def upload_docker(arvrunner, tool):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
# TODO: can be supported by containers API, but not jobs API.
@@ -362,7 +362,7 @@ class Runner(object):
if enable_reuse:
# If reuse is permitted by command line arguments but
# disabled by the workflow itself, disable it.
- reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 4c31d3b44..c28860375 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180524215209',
+ 'cwltool==1.0.20180614214548',
'schema-salad==2.7.20180501211602',
'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list