[ARVADOS] created: 7fd60cc870863647127a438a085685d415c37a46

Git user git at public.curoverse.com
Mon Jan 23 17:25:09 EST 2017


        at  7fd60cc870863647127a438a085685d415c37a46 (commit)


commit 7fd60cc870863647127a438a085685d415c37a46
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jan 23 17:03:30 2017 -0500

    10895: Don't add uploaded files to pathmap if they are not referenced.

diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index a6b3d15..63d36f5 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -85,8 +85,12 @@ class ArvPathMapper(PathMapper):
         # type: (List[Any], unicode) -> None
         uploadfiles = set()
 
-        for k,v in self.arvrunner.get_uploaded().iteritems():
-            self._pathmap[k] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+        already_uploaded = self.arvrunner.get_uploaded()
+        for k in referenced_files:
+            loc = k["location"]
+            if loc in already_uploaded:
+                v = already_uploaded[loc]
+                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)

commit aea835bc965d42e225c2641b0210c4b521f6dc4e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jan 23 16:21:24 2017 -0500

    10895: Improve reuse for RunInSingleContainer jobs
    
    * Consolodate and de-duplicate requirements and hints
    * Use json.dumps() with sorted_keys=True to avoid issues with round trip YAML
      formatting and comments that are irrelevant to reuse semantics.
    * Remember subworkflow packing for more efficient scattering.
    * Logging distinguishes when jobs and containers are reused.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 235e9b8..55b8baf 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -129,10 +129,11 @@ class ArvadosContainer(object):
             self.uuid = response["uuid"]
             self.arvrunner.processes[self.uuid] = self
 
-            logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
             if response["state"] == "Final":
+                logger.info("%s reuse container %s", self.arvrunner.label(self), response["container_uuid"])
                 self.done(response)
+            else:
+                logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
             logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
             self.output_callback({}, "permanentFail")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 7b31802..87bacd0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -18,7 +18,7 @@ import ruamel.yaml as yaml
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_listing, upload_workflow_collection
 from .pathmapper import InitialWorkDirPathMapper
 from .perf import Perf
 from . import done
@@ -141,11 +141,12 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
-
             if response["state"] in ("Complete", "Failed", "Cancelled"):
+                logger.info("%s reuse job %s", self.arvrunner.label(self), response["uuid"])
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
+            else:
+                logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
         except Exception as e:
             logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
@@ -236,30 +237,6 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
-    def upload_workflow_collection(self, packed):
-        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
-                                                   keep_client=self.arvrunner.keep_client,
-                                                   num_retries=self.arvrunner.num_retries)
-        with collection.open("workflow.cwl", "w") as f:
-            f.write(yaml.round_trip_dump(packed))
-
-        filters = [["portable_data_hash", "=", collection.portable_data_hash()],
-                   ["name", "like", self.name+"%"]]
-        if self.arvrunner.project_uuid:
-            filters.append(["owner_uuid", "=", self.arvrunner.project_uuid])
-        exists = self.arvrunner.api.collections().list(filters=filters).execute(num_retries=self.arvrunner.num_retries)
-
-        if exists["items"]:
-            logger.info("Using collection %s", exists["items"][0]["uuid"])
-        else:
-            collection.save_new(name=self.name,
-                                owner_uuid=self.arvrunner.project_uuid,
-                                ensure_unique_name=True,
-                                num_retries=self.arvrunner.num_retries)
-            logger.info("Uploaded to %s", collection.manifest_locator())
-
-        return collection.portable_data_hash()
-
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -272,7 +249,7 @@ class RunnerJob(Runner):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
             packed = packed_workflow(self.arvrunner, self.tool)
-            wf_pdh = self.upload_workflow_collection(packed)
+            wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
         adjustDirObjs(self.job_order, trim_listing)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index cd7e41a..4db1f4f 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import ruamel.yaml as yaml
 
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -56,6 +56,13 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         call = arvRunner.api.workflows().create(body=body)
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
+def dedup_reqs(reqs):
+    dedup = {}
+    for r in reversed(reqs):
+        if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+            dedup[r["class"]] = r
+    return [dedup[r] for r in sorted(dedup.keys())]
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
@@ -63,6 +70,7 @@ class ArvadosWorkflow(Workflow):
         super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
         self.work_api = kwargs["work_api"]
+        self.wf_pdh = None
 
     def job(self, joborder, output_callback, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -74,17 +82,6 @@ class ArvadosWorkflow(Workflow):
             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
 
             with Perf(metrics, "subworkflow upload_deps"):
-                workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
-                workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
-                packed = pack(document_loader, workflowobj, uri, self.metadata)
-
-                upload_dependencies(self.arvrunner,
-                                    kwargs.get("name", ""),
-                                    document_loader,
-                                    packed,
-                                    uri,
-                                    False)
-
                 upload_dependencies(self.arvrunner,
                                     os.path.basename(joborder.get("id", "#")),
                                     document_loader,
@@ -92,6 +89,19 @@ class ArvadosWorkflow(Workflow):
                                     joborder.get("id", "#"),
                                     False)
 
+                if self.wf_pdh is None:
+                    workflowobj["requirements"] = dedup_reqs(self.requirements)
+                    workflowobj["hints"] = dedup_reqs(self.hints)
+
+                    packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+                    upload_dependencies(self.arvrunner,
+                                        kwargs.get("name", ""),
+                                        document_loader,
+                                        packed,
+                                        uri,
+                                        False)
+
             with Perf(metrics, "subworkflow adjust"):
                 joborder_keepmount = copy.deepcopy(joborder)
 
@@ -111,8 +121,11 @@ class ArvadosWorkflow(Workflow):
 
                 adjustFileObjs(joborder_keepmount, keepmount)
                 adjustDirObjs(joborder_keepmount, keepmount)
-                adjustFileObjs(packed, keepmount)
-                adjustDirObjs(packed, keepmount)
+
+                if self.wf_pdh is None:
+                    adjustFileObjs(packed, keepmount)
+                    adjustDirObjs(packed, keepmount)
+                    self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
 
             wf_runner = cmap({
                 "class": "CommandLineTool",
@@ -125,10 +138,13 @@ class ArvadosWorkflow(Workflow):
                     "class": "InitialWorkDirRequirement",
                     "listing": [{
                             "entryname": "workflow.cwl",
-                            "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": {
+                                "class": "File",
+                                "location": "keep:%s/workflow.cwl" % self.wf_pdh
+                            }
                         }, {
                             "entryname": "cwl.input.yml",
-                            "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
                         }]
                 }],
                 "hints": workflowobj["hints"],
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d3e0a0e..ef01b7f 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -215,6 +215,31 @@ def arvados_jobs_image(arvrunner, img):
         raise Exception("Docker image %s is not available\n%s" % (img, e) )
     return img
 
+def upload_workflow_collection(arvrunner, name, packed):
+    collection = arvados.collection.Collection(api_client=arvrunner.api,
+                                               keep_client=arvrunner.keep_client,
+                                               num_retries=arvrunner.num_retries)
+    with collection.open("workflow.cwl", "w") as f:
+        f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
+
+    filters = [["portable_data_hash", "=", collection.portable_data_hash()],
+               ["name", "like", name+"%"]]
+    if arvrunner.project_uuid:
+        filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+    exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
+
+    if exists["items"]:
+        logger.info("Using collection %s", exists["items"][0]["uuid"])
+    else:
+        collection.save_new(name=name,
+                            owner_uuid=arvrunner.project_uuid,
+                            ensure_unique_name=True,
+                            num_retries=arvrunner.num_retries)
+        logger.info("Uploaded to %s", collection.manifest_locator())
+
+    return collection.portable_data_hash()
+
+
 class Runner(object):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 8aafb4a..076514b 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -306,7 +306,10 @@ class TestWorkflow(unittest.TestCase):
             find_or_create=True)
 
         mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
-        mockcollection().open().__enter__().write.assert_has_calls([mock.call('sleeptime: 5')])
+        mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+'''{
+  "sleeptime": 5
+}''')])
 
     def test_default_work_api(self):
         arvados_cwl.add_arv_hints()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index eaddca0..31d7c2b 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -124,7 +124,7 @@ def stubs(func):
                     'class': 'Directory'
                 },
                 'cwl:tool':
-                'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main'
+                '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -146,7 +146,7 @@ def stubs(func):
                               'listing': [
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
-                        'cwl:tool': 'f57578d5cfda7f70fef00cbc4b621e6b+58/workflow.cwl#main',
+                        'cwl:tool': '4db32e8a15aa48ea084b2f38108f406d+60/workflow.cwl#main',
                         'arv:enable_reuse': True,
                         'arv:on_error': 'continue'
                     },
@@ -1121,7 +1121,7 @@ class TestTemplateInputs(unittest.TestCase):
                 },
                 'script_parameters': {
                     'cwl:tool':
-                    '5800682d508698dc9ce6d2fc618f21d8+58/workflow.cwl#main',
+                    '00e281847a33e1c0df93161d70a6fc5d+60/workflow.cwl#main',
                     'optionalFloatInput': None,
                     'fileInput': {
                         'type': 'File',
diff --git a/sdk/cwl/tests/wf/scatter2_subwf.cwl b/sdk/cwl/tests/wf/scatter2_subwf.cwl
index daf18b1..df4d992 100644
--- a/sdk/cwl/tests/wf/scatter2_subwf.cwl
+++ b/sdk/cwl/tests/wf/scatter2_subwf.cwl
@@ -1,41 +1,77 @@
-cwlVersion: v1.0
-$graph:
-- class: Workflow
-  id: '#main'
-  inputs:
-  - type: int
-    id: '#main/sleeptime'
-  outputs:
-  - type: string
-    outputSource: '#main/sleep1/out'
-    id: '#main/out'
-  steps:
-  - in:
-    - valueFrom: |
-        ${
-          return String(inputs.sleeptime) + "b";
+{
+  "$graph": [
+    {
+      "class": "Workflow",
+      "hints": [],
+      "id": "#main",
+      "inputs": [
+        {
+          "id": "#main/sleeptime",
+          "type": "int"
         }
-      id: '#main/sleep1/blurb'
-    - source: '#main/sleeptime'
-      id: '#main/sleep1/sleeptime'
-    out: ['#main/sleep1/out']
-    run:
-      class: CommandLineTool
-      inputs:
-      - type: int
-        inputBinding: {position: 1}
-        id: '#main/sleep1/sleeptime'
-      outputs:
-      - type: string
-        outputBinding:
-          outputEval: out
-        id: '#main/sleep1/out'
-      baseCommand: sleep
-    id: '#main/sleep1'
-  requirements:
-  - {class: InlineJavascriptRequirement}
-  - {class: ScatterFeatureRequirement}
-  - {class: StepInputExpressionRequirement}
-  - {class: SubworkflowFeatureRequirement}
-  hints:
-  - class: http://arvados.org/cwl#RunInSingleContainer
\ No newline at end of file
+      ],
+      "outputs": [
+        {
+          "id": "#main/out",
+          "outputSource": "#main/sleep1/out",
+          "type": "string"
+        }
+      ],
+      "requirements": [
+        {
+          "class": "InlineJavascriptRequirement"
+        },
+        {
+          "class": "ScatterFeatureRequirement"
+        },
+        {
+          "class": "StepInputExpressionRequirement"
+        },
+        {
+          "class": "SubworkflowFeatureRequirement"
+        }
+      ],
+      "steps": [
+        {
+          "id": "#main/sleep1",
+          "in": [
+            {
+              "id": "#main/sleep1/blurb",
+              "valueFrom": "${\n  return String(inputs.sleeptime) + \"b\";\n}\n"
+            },
+            {
+              "id": "#main/sleep1/sleeptime",
+              "source": "#main/sleeptime"
+            }
+          ],
+          "out": [
+            "#main/sleep1/out"
+          ],
+          "run": {
+            "baseCommand": "sleep",
+            "class": "CommandLineTool",
+            "inputs": [
+              {
+                "id": "#main/sleep1/sleeptime",
+                "inputBinding": {
+                  "position": 1
+                },
+                "type": "int"
+              }
+            ],
+            "outputs": [
+              {
+                "id": "#main/sleep1/out",
+                "outputBinding": {
+                  "outputEval": "out"
+                },
+                "type": "string"
+              }
+            ]
+          }
+        }
+      ]
+    }
+  ],
+  "cwlVersion": "v1.0"
+}
\ No newline at end of file

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list