[ARVADOS] created: a01dc8ec9b7ea0b5a7b4b3bdc46cd64470f8d438
Git user
git at public.curoverse.com
Mon Jan 23 17:03:54 EST 2017
at a01dc8ec9b7ea0b5a7b4b3bdc46cd64470f8d438 (commit)
commit a01dc8ec9b7ea0b5a7b4b3bdc46cd64470f8d438
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 17:03:30 2017 -0500
10895: Don't add uploaded files to pathmap if they are not referenced.
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index a6b3d15..705818f 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -85,8 +85,11 @@ class ArvPathMapper(PathMapper):
# type: (List[Any], unicode) -> None
uploadfiles = set()
- for k,v in self.arvrunner.get_uploaded().iteritems():
- self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+ already_uploaded = self.arvrunner.get_uploaded()
+ for k in referenced_files:
+ if k["location"] in already_uploaded:
+ v = already_uploaded[k["location"]]
+ self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
commit aea835bc965d42e225c2641b0210c4b521f6dc4e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jan 23 16:21:24 2017 -0500
10895: Improve reuse for RunInSingleContainer jobs
* Consolodate and de-duplicate requirements and hints
* Use json.dumps() with sorted_keys=True to avoid issues with round trip YAML
formatting and comments that are irrelevant to reuse semantics.
* Remember subworkflow packing for more efficient scattering.
* Logging distinguishes when jobs and containers are reused.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 235e9b8..55b8baf 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -129,10 +129,11 @@ class ArvadosContainer(object):
self.uuid = response["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
if response["state"] == "Final":
+ logger.info("%s reuse container %s", self.arvrunner.label(self), response["container_uuid"])
self.done(response)
+ else:
+ logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
self.output_callback({}, "permanentFail")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 7b31802..87bacd0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -18,7 +18,7 @@ import ruamel.yaml as yaml
import arvados.collection
from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
from .pathmapper import InitialWorkDirPathMapper
from .perf import Perf
from . import done
@@ -141,11 +141,12 @@ class ArvadosJob(object):
self.update_pipeline_component(response)
- logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
if response["state"] in ("Complete", "Failed", "Cancelled"):
+ logger.info("%s reuse job %s", self.arvrunner.label(self), response["uuid"])
with Perf(metrics, "done %s" % self.name):
self.done(response)
+ else:
+ logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
except Exception as e:
logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
@@ -236,30 +237,6 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
- def upload_workflow_collection(self, packed):
- collection = arvados.collection.Collection(api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
- with collection.open("workflow.cwl", "w") as f:
- f.write(yaml.round_trip_dump(packed))
-
- filters = [["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "like", self.name+"%"]]
- if self.arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
- exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
- if exists["items"]:
- logger.info("Using collection %s", exists["items"][0]["uuid"])
- else:
- collection.save_new(name=self.name,
- owner_uuid=self.arvrunner.project_uuid,
- ensure_unique_name=True,
- num_retries=self.arvrunner.num_retries)
- logger.info("Uploaded to %s", collection.manifest_locator())
-
- return collection.portable_data_hash()
-
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
@@ -272,7 +249,7 @@ class RunnerJob(Runner):
self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
else:
packed = packed_workflow(self.arvrunner, self.tool)
- wf_pdh = self.upload_workflow_collection(packed)
+ wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
adjustDirObjs(self.job_order, trim_listing)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index cd7e41a..4db1f4f 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import ruamel.yaml as yaml
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
from .arvtool import ArvadosCommandTool
from .perf import Perf
@@ -56,6 +56,13 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]
+def dedup_reqs(reqs):
+ dedup = {}
+ for r in reversed(reqs):
+ if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+ dedup[r["class"]] = r
+ return [dedup[r] for r in sorted(dedup.keys())]
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
@@ -63,6 +70,7 @@ class ArvadosWorkflow(Workflow):
super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
+ self.wf_pdh = None
def job(self, joborder, output_callback, **kwargs):
kwargs["work_api"] = self.work_api
@@ -74,17 +82,6 @@ class ArvadosWorkflow(Workflow):
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
with Perf(metrics, "subworkflow upload_deps"):
- workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
- workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
- document_loader,
- packed,
- uri,
- False)
-
upload_dependencies(self.arvrunner,
os.path.basename(joborder.get("id", "#")),
document_loader,
@@ -92,6 +89,19 @@ class ArvadosWorkflow(Workflow):
joborder.get("id", "#"),
False)
+ if self.wf_pdh is None:
+ workflowobj["requirements"] = dedup_reqs(self.requirements)
+ workflowobj["hints"] = dedup_reqs(self.hints)
+
+ packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+ upload_dependencies(self.arvrunner,
+ kwargs.get("name", ""),
+ document_loader,
+ packed,
+ uri,
+ False)
+
with Perf(metrics, "subworkflow adjust"):
joborder_keepmount = copy.deepcopy(joborder)
@@ -111,8 +121,11 @@ class ArvadosWorkflow(Workflow):
adjustFileObjs(joborder_keepmount, keepmount)
adjustDirObjs(joborder_keepmount, keepmount)
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
+
+ if self.wf_pdh is None:
+ adjustFileObjs(packed, keepmount)
+ adjustDirObjs(packed, keepmount)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
wf_runner = cmap({
"class": "CommandLineTool",
@@ -125,10 +138,13 @@ class ArvadosWorkflow(Workflow):
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": {
+ "class": "File",
+ "location": "keep:%s/workflow.cwl" % self.wf_pdh
+ }
}, {
"entryname": "cwl.input.yml",
- "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
}]
}],
"hints": workflowobj["hints"],
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d3e0a0e..ef01b7f 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -215,6 +215,31 @@ def arvados_jobs_image(arvrunner, img):
raise Exception("Docker image %s is not available\n%s" % (img, e) )
return img
+def upload_workflow_collection(arvrunner, name, packed):
+ collection = arvados.collection.Collection(api_client=arvrunner.api,
+ keep_client=arvrunner.keep_client,
+ num_retries=arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", name+"%"]]
+ if arvrunner.project_uuid:
+ filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=name,
+ owner_uuid=arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
+
class Runner(object):
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 8aafb4a..076514b 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -306,7 +306,10 @@ class TestWorkflow(unittest.TestCase):
find_or_create=True)
mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockcollection().open().__enter__().write.assert_has_calls([mock.call('sleeptime: 5')])
+ mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+'''{
+ "sleeptime": 5
+}''')])
def test_default_work_api(self):
arvados_cwl.add_arv_hints()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index eaddca0..31d7c2b 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
+ '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main'
},
'repository': 'arvados',
'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
+ 'cwl:tool': '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -1121,7 +1121,7 @@ class TestTemplateInputs(unittest.TestCase):
},
'script_parameters': {
'cwl:tool':
- '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
+ '00e281847a33e1c0df93161d70a6fc5d+60/workflow.cwl#main',
'optionalFloatInput': None,
'fileInput': {
'type': 'File',
diff --git a/sdk/cwl/tests/wf/scatter2_subwf.cwl b/sdk/cwl/tests/wf/scatter2_subwf.cwl
index daf18b1..df4d992 100644
--- a/sdk/cwl/tests/wf/scatter2_subwf.cwl
+++ b/sdk/cwl/tests/wf/scatter2_subwf.cwl
@@ -1,41 +1,77 @@
-cwlVersion: v1.0
-$graph:
-- class: Workflow
- id: '#main'
- inputs:
- - type: int
- id: '#main/sleeptime'
- outputs:
- - type: string
- outputSource: '#main/sleep1/out'
- id: '#main/out'
- steps:
- - in:
- - valueFrom: |
- ${
- return String(inputs.sleeptime) + "b";
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "hints": [],
+ "id": "#main",
+ "inputs": [
+ {
+ "id": "#main/sleeptime",
+ "type": "int"
}
- id: '#main/sleep1/blurb'
- - source: '#main/sleeptime'
- id: '#main/sleep1/sleeptime'
- out: ['#main/sleep1/out']
- run:
- class: CommandLineTool
- inputs:
- - type: int
- inputBinding: {position: 1}
- id: '#main/sleep1/sleeptime'
- outputs:
- - type: string
- outputBinding:
- outputEval: out
- id: '#main/sleep1/out'
- baseCommand: sleep
- id: '#main/sleep1'
- requirements:
- - {class: InlineJavascriptRequirement}
- - {class: ScatterFeatureRequirement}
- - {class: StepInputExpressionRequirement}
- - {class: SubworkflowFeatureRequirement}
- hints:
- - class: http://arvados.org/cwl#RunInSingleContainer
\ No newline at end of file
+ ],
+ "outputs": [
+ {
+ "id": "#main/out",
+ "outputSource": "#main/sleep1/out",
+ "type": "string"
+ }
+ ],
+ "requirements": [
+ {
+ "class": "InlineJavascriptRequirement"
+ },
+ {
+ "class": "ScatterFeatureRequirement"
+ },
+ {
+ "class": "StepInputExpressionRequirement"
+ },
+ {
+ "class": "SubworkflowFeatureRequirement"
+ }
+ ],
+ "steps": [
+ {
+ "id": "#main/sleep1",
+ "in": [
+ {
+ "id": "#main/sleep1/blurb",
+ "valueFrom": "${\n return String(inputs.sleeptime) + \"b\";\n}\n"
+ },
+ {
+ "id": "#main/sleep1/sleeptime",
+ "source": "#main/sleeptime"
+ }
+ ],
+ "out": [
+ "#main/sleep1/out"
+ ],
+ "run": {
+ "baseCommand": "sleep",
+ "class": "CommandLineTool",
+ "inputs": [
+ {
+ "id": "#main/sleep1/sleeptime",
+ "inputBinding": {
+ "position": 1
+ },
+ "type": "int"
+ }
+ ],
+ "outputs": [
+ {
+ "id": "#main/sleep1/out",
+ "outputBinding": {
+ "outputEval": "out"
+ },
+ "type": "string"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.0"
+}
\ No newline at end of file
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list