[ARVADOS] updated: 1.1.2-53-g2702b79
Git user
git at public.curoverse.com
Tue Jan 16 07:27:46 EST 2018
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 27 ++++++++++--------------
sdk/cwl/arvados_cwl/arvcontainer.py | 3 ++-
sdk/cwl/arvados_cwl/arvjob.py | 7 ++++---
sdk/cwl/arvados_cwl/arvworkflow.py | 4 ++--
sdk/cwl/arvados_cwl/fsaccess.py | 7 +------
sdk/cwl/arvados_cwl/runner.py | 42 ++++++++++++++++++++++++++++++-------
sdk/cwl/setup.py | 2 +-
7 files changed, 56 insertions(+), 36 deletions(-)
via 2702b79d8981e562aa9848f41d96bd0a37a278c6 (commit)
from f44a15adce692614ecb816dbe2d0205704d9a4ab (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2702b79d8981e562aa9848f41d96bd0a37a278c6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Jan 15 22:22:39 2018 -0500
12934: Rewrite file paths in pack instead of using "overrides"
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 6aa4c8f..e82fd9f 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -110,8 +110,7 @@ class ArvCwlRunner(object):
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries,
- overrides=kwargs.get("override_tools"))
+ num_retries=self.num_retries)
kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
@@ -366,8 +365,7 @@ class ArvCwlRunner(object):
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- override_tools = {}
- upload_workflow_deps(self, tool, override_tools)
+ merged_map = upload_workflow_deps(self, tool)
# Reload tool object which may have been updated by
# upload_workflow_deps
@@ -375,14 +373,7 @@ class ArvCwlRunner(object):
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata,
- override_tools=override_tools)
-
- tool.doc_loader.fetcher_constructor = partial(CollectionFetcher,
- api_client=self.api,
- fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries,
- overrides=override_tools)
+ metadata=tool.metadata)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -396,7 +387,8 @@ class ArvCwlRunner(object):
kwargs.get("enable_reuse"),
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"])
+ name=kwargs["name"],
+ merged_map=merged_map)
tmpl.save()
# cwltool.main will write our return value to stdout.
return (tmpl.uuid, "success")
@@ -405,7 +397,8 @@ class ArvCwlRunner(object):
self.project_uuid,
uuid=existing_uuid,
submit_runner_ram=kwargs.get("submit_runner_ram"),
- name=kwargs["name"]),
+ name=kwargs["name"],
+ merged_map=merged_map),
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
@@ -443,7 +436,8 @@ class ArvCwlRunner(object):
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"),
- intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
+ intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+ merged_map=merged_map)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
@@ -451,7 +445,8 @@ class ArvCwlRunner(object):
submit_runner_ram=kwargs.get("submit_runner_ram"),
name=kwargs.get("name"),
on_error=kwargs.get("on_error"),
- submit_runner_image=kwargs.get("submit_runner_image"))
+ submit_runner_image=kwargs.get("submit_runner_image"),
+ merged_map=merged_map)
elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index e8e2a51..014e1b9 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -330,7 +330,7 @@ class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.tool)
+ packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
@@ -339,6 +339,7 @@ class RunnerContainer(Runner):
if self.tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
command.append("--output-name=" + self.output_name)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 25f64ea..2731b26 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -280,7 +280,7 @@ class RunnerJob(Runner):
if self.tool.tool["id"].startswith("keep:"):
self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
- packed = packed_workflow(self.arvrunner, self.tool)
+ packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
@@ -370,7 +370,7 @@ class RunnerTemplate(object):
}
def __init__(self, runner, tool, job_order, enable_reuse, uuid,
- submit_runner_ram=0, name=None):
+ submit_runner_ram=0, name=None, merged_map=None):
self.runner = runner
self.tool = tool
self.job = RunnerJob(
@@ -381,7 +381,8 @@ class RunnerTemplate(object):
output_name=None,
output_tags=None,
submit_runner_ram=submit_runner_ram,
- name=name)
+ name=name,
+ merged_map=merged_map)
self.uuid = uuid
def pipeline_component_spec(self):
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 4acdc32..f0f9c77 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -27,9 +27,9 @@ logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
- submit_runner_ram=0, name=None):
+ submit_runner_ram=0, name=None, merged_map=None):
- packed = packed_workflow(arvRunner, tool)
+ packed = packed_workflow(arvRunner, tool, merged_map)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 47749ee..69f918e 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -179,16 +179,13 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4, overrides=None):
+ def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
self.fsaccess = fs_access
self.num_retries = num_retries
- self.overrides = overrides if overrides else {}
def fetch_text(self, url):
- if url in self.overrides:
- return self.overrides[url]
if url.startswith("keep:"):
with self.fsaccess.open(url, "r") as f:
return f.read()
@@ -199,8 +196,6 @@ class CollectionFetcher(DefaultFetcher):
return super(CollectionFetcher, self).fetch_text(url)
def check_exists(self, url):
- if url in self.overrides:
- return True
try:
if url.startswith("http://arvados.org/cwl"):
return True
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 28c35ad..2ca63cf 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -172,13 +172,32 @@ def upload_docker(arvrunner, tool):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
-def packed_workflow(arvrunner, tool):
+def packed_workflow(arvrunner, tool, merged_map):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
- return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
- tool.tool["id"], tool.metadata)
+ rewrites = {}
+ packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+ tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+
+ rewrite_to_orig = {}
+ for k,v in rewrites.items():
+ rewrite_to_orig[v] = k
+
+ def visit(v, cur_id):
+ if isinstance(v, dict):
+ if v.get("class") in ("CommandLineTool", "Workflow"):
+ cur_id = rewrite_to_orig.get(v["id"], v["id"])
+ if "location" in v and not v["location"].startswith("keep:"):
+ v["location"] = merged_map[cur_id][v["location"]]
+ for l in v:
+ visit(v[l], cur_id)
+ if isinstance(v, list):
+ for l in v:
+ visit(l, cur_id)
+ visit(packed, None)
+ return packed
def tag_git_version(packed):
if tool.tool["id"].startswith("file://"):
@@ -229,16 +248,18 @@ def upload_job_order(arvrunner, name, tool, job_order):
return job_order
-def upload_workflow_deps(arvrunner, tool, override_tools):
+def upload_workflow_deps(arvrunner, tool):
# Ensure that Docker images needed by this workflow are available
upload_docker(arvrunner, tool)
document_loader = tool.doc_loader
+ merged_map = {}
+
def upload_tool_deps(deptool):
if "id" in deptool:
- upload_dependencies(arvrunner,
+ pm = upload_dependencies(arvrunner,
"%s dependencies" % (shortname(deptool["id"])),
document_loader,
deptool,
@@ -246,10 +267,15 @@ def upload_workflow_deps(arvrunner, tool, override_tools):
False,
include_primary=False)
document_loader.idx[deptool["id"]] = deptool
- override_tools[deptool["id"]] = yaml.round_trip_dump(deptool)
+ toolmap = {}
+ for k,v in pm.items():
+ toolmap[k] = v.resolved
+ merged_map[deptool["id"]] = toolmap
tool.visit(upload_tool_deps)
+ return merged_map
+
def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
@@ -291,7 +317,7 @@ class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
- intermediate_output_ttl=0):
+ intermediate_output_ttl=0, merged_map=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
@@ -320,6 +346,8 @@ class Runner(object):
if self.submit_runner_ram <= 0:
raise Exception("Value of --submit-runner-ram must be greater than zero")
+ self.merged_map = merged_map or {}
+
def update_pipeline_component(self, record):
pass
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index de47098..9645a33 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180111185617',
+ 'cwltool==1.0.20180116032016',
'schema-salad==2.6.20171201034858',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list