[ARVADOS] created: ae1f1ea0a3130f7cd05402a7924550a8247d88f8

Git user git at public.curoverse.com
Fri Jan 6 16:41:08 EST 2017


        at  ae1f1ea0a3130f7cd05402a7924550a8247d88f8 (commit)


commit ae1f1ea0a3130f7cd05402a7924550a8247d88f8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jan 6 16:40:51 2017 -0500

    10812: Fixing up tests.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ff72e14..72eed37 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -227,7 +227,7 @@ class RunnerContainer(Runner):
             "kind": "json",
             "json": packed
         }
-        if packed["id"].startswith("arvwf:"):
+        if packed.get("id", "").startswith("arvwf:"):
             container_req["properties"]["template_uuid"] = packed["id"][6:]
 
         command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index eee4ef6..2fccb57 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -240,7 +240,7 @@ class RunnerJob(Runner):
         with collection.open("workflow.cwl", "w") as f:
             f.write(yaml.round_trip_dump(packed))
 
-        exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+        exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
                                                  ["portable_data_hash", "=", collection.portable_data_hash()],
                                                  ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
 
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 01853e6..cd7e41a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -36,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
     if not name:
         name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
 
-    upload_dependencies(arvRunner, name, document_loader,
-                        packed, uri, False)
+    upload_dependencies(arvRunner, name, tool.doc_loader,
+                        packed, tool.tool["id"], False)
 
     # TODO nowhere for submit_runner_ram to go.
 
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d8630d4..00e5132 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -18,6 +18,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.utils import aslist
 from cwltool.builder import substitute
+from cwltool.pack import pack
 
 import arvados.collection
 import ruamel.yaml as yaml
@@ -127,22 +128,22 @@ def packed_workflow(arvrunner, tool):
     document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
     packed = pack(document_loader, workflowobj, uri, tool.metadata)
 
-    if tool.tool["id"].startswith("file://"):
-        path = os.path.dirname(tool.tool["id"][7:])
-        try:
-            githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
-        except OSError, subprocess.CalledProcessError:
-            pass
-        else:
-            packed["http://schema.org/version"] = githash
+    # if tool.tool["id"].startswith("file://"):
+    #     path = os.path.dirname(tool.tool["id"][7:])
+    #     try:
+    #         githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+    #     except (OSError, subprocess.CalledProcessError):
+    #         pass
+    #     else:
+    #         packed["http://schema.org/version"] = githash
 
     mapper = {}
-    def upload_tool_deps(tool):
+    def upload_tool_deps(deptool):
         workflowmapper = upload_dependencies(arvrunner,
-                                             shortname(tool.tool["id"]),
-                                             tool.doc_loader,
-                                             tool.tool,
-                                             tool.tool["id"],
+                                             shortname(deptool["id"]),
+                                             document_loader,
+                                             deptool,
+                                             deptool["id"],
                                              False)
         for k,v in workflowmapper.items():
             mapper[k] = v.resolved
@@ -155,6 +156,8 @@ def packed_workflow(arvrunner, tool):
     adjustFileObjs(packed, setloc)
     adjustDirObjs(packed, setloc)
 
+    #print yaml.round_trip_dump(packed)
+
     return packed
 
 def upload_instance(arvrunner, name, tool, job_order):
@@ -231,7 +234,7 @@ class Runner(object):
 
         packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
         adjustDirObjs(self.job_order, trim_listing)
-        return workflowmapper
+        return packed
 
     def done(self, record):
         try:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 0a01fb4..7b2a0ec 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -17,6 +17,8 @@ import arvados.keep
 from .matcher import JsonDiffMatcher
 from .mock_discovery import get_rootDesc
 
+import ruamel.yaml as yaml
+
 _rootDesc = None
 
 def stubs(func):
@@ -122,7 +124,7 @@ def stubs(func):
                     'class': 'Directory'
                 },
                 'cwl:tool':
-                '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+                'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
             },
             'repository': 'arvados',
             'script_version': 'master',
@@ -144,7 +146,7 @@ def stubs(func):
                               'listing': [
                                   {'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
                               ]}},
-                        'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+                        'cwl:tool': 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl',
                         'arv:enable_reuse': True,
                         'arv:on_error': 'continue'
                     },
@@ -167,6 +169,9 @@ def stubs(func):
         stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
         stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
 
+        with open("tests/wf/submit_wf_packed.cwl") as f:
+            expect_packed_workflow = yaml.round_trip_load(f)
+
         stubs.expect_container_spec = {
             'priority': 1,
             'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
                     'writable': True,
                     'kind': 'collection'
                 },
-                '/var/lib/cwl/workflow': {
-                    'portable_data_hash': '99999999999999999999999999999991+99',
-                    'kind': 'collection'
+                '/var/lib/cwl/workflow.json': {
+                    'json': expect_packed_workflow,
+                    'kind': 'json'
                 },
                 'stdout': {
                     'path': '/var/spool/cwl/cwl.output.json',
@@ -198,7 +203,7 @@ def stubs(func):
             'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                         '--enable-reuse', '--on-error=continue',
-                        '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'output_path': '/var/spool/cwl',
@@ -747,7 +752,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
-            body=expect_container)
+            body=JsonDiffMatcher(expect_container))
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
diff --git a/sdk/cwl/tests/wf/submit_wf_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
new file mode 100644
index 0000000..a1ada94
--- /dev/null
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -0,0 +1,33 @@
+cwlVersion: v1.0
+$graph:
+- class: CommandLineTool
+  requirements:
+  - class: DockerRequirement
+    dockerPull: debian:8
+  inputs:
+  - id: '#submit_tool.cwl/x'
+    type: File
+    default:
+      class: File
+      location: keep:99999999999999999999999999999992+99/blub.txt
+    inputBinding:
+      position: 1
+  outputs: []
+  baseCommand: cat
+  id: '#submit_tool.cwl'
+- class: Workflow
+  inputs:
+  - id: '#main/x'
+    type: File
+  - id: '#main/y'
+    type: Directory
+  - id: '#main/z'
+    type: Directory
+  outputs: []
+  steps:
+  - id: '#main/step1'
+    in:
+    - {id: '#main/step1/x', source: '#main/x'}
+    out: []
+    run: '#submit_tool.cwl'
+  id: '#main'

commit 93fde4b1632b046fe60884c603c55a654771630f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jan 6 15:34:44 2017 -0500

    10812: Use packed workflows for all run modes.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 987b0d6..ff72e14 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,7 +190,7 @@ class RunnerContainer(Runner):
         the +body+ argument to container_requests().create().
         """
 
-        workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
 
         container_req = {
             "owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +222,13 @@ class RunnerContainer(Runner):
             "properties": {}
         }
 
-        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        if workflowcollection.startswith("keep:"):
-            workflowcollection = workflowcollection[5:workflowcollection.index('/')]
-            workflowname = os.path.basename(self.tool.tool["id"])
-            workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
-            container_req["mounts"]["/var/lib/cwl/workflow"] = {
-                "kind": "collection",
-                "portable_data_hash": "%s" % workflowcollection
-                }
-        elif workflowcollection.startswith("arvwf:"):
-            workflowpath = "/var/lib/cwl/workflow.json#main"
-            wfuuid = workflowcollection[6:workflowcollection.index("#")]
-            wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
-            wfobj = yaml.safe_load(wfrecord["definition"])
-            if container_req["name"].startswith("arvwf:"):
-                container_req["name"] = wfrecord["name"]
-            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
-                "kind": "json",
-                "json": wfobj
-            }
-            container_req["properties"]["template_uuid"] = wfuuid
+        workflowpath = "/var/lib/cwl/workflow.json#main"
+        container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+            "kind": "json",
+            "json": packed
+        }
+        if packed["id"].startswith("arvwf:"):
+            container_req["properties"]["template_uuid"] = packed["id"][6:]
 
         command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
         if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 11ef653..eee4ef6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -10,6 +10,8 @@ from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 
+import ruamel.yaml as yaml
+
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
@@ -231,6 +233,28 @@ class ArvadosJob(object):
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
 
+    def upload_workflow_collection(self, packed):
+        collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+        with collection.open("workflow.cwl", "w") as f:
+            f.write(yaml.round_trip_dump(packed))
+
+        exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                                 ["portable_data_hash", "=", collection.portable_data_hash()],
+                                                 ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+        if exists["items"]:
+            logger.info("Using collection %s", exists["items"][0]["uuid"])
+        else:
+            collection.save_new(name=self.name,
+                                owner_uuid=self.arvrunner.project_uuid,
+                                ensure_unique_name=True,
+                                num_retries=self.arvrunner.num_retries)
+            logger.info("Uploaded to %s", collection.manifest_locator())
+
+        return collection.portable_data_hash()
+
     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
         """Create an Arvados job specification for this workflow.
 
@@ -239,9 +263,10 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
+        wf_pdh = self.upload_workflow_collection(packed)
 
-        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+        self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
 
         if self.output_name:
             self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 9e70a6e..01853e6 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import ruamel.yaml as yaml
 
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
                     submit_runner_ram=0, name=None):
-    upload_docker(arvRunner, tool)
 
-    document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
-    packed = pack(document_loader, workflowobj, uri, tool.metadata)
+    packed = packed_workflow(arvRunner, tool)
 
     adjustDirObjs(job_order, trim_listing)
 
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 1c3625e..d8630d4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -4,6 +4,8 @@ from functools import partial
 import logging
 import json
 import re
+import subprocess
+
 from cStringIO import StringIO
 
 from schema_salad.sourceline import SourceLine
@@ -118,43 +120,70 @@ def upload_docker(arvrunner, tool):
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
-    elif isinstance(tool, cwltool.workflow.Workflow):
-        for s in tool.steps:
-            upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
-        upload_docker(arvrunner, tool)
 
-        for t in tool.tool["inputs"]:
-            def setSecondary(fileobj):
-                if isinstance(fileobj, dict) and fileobj.get("class") == "File":
-                    if "secondaryFiles" not in fileobj:
-                        fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+def packed_workflow(arvrunner, tool):
+    tool.visit(partial(upload_docker, arvrunner))
 
-                if isinstance(fileobj, list):
-                    for e in fileobj:
-                        setSecondary(e)
+    document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
+    packed = pack(document_loader, workflowobj, uri, tool.metadata)
 
-            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
-                setSecondary(job_order[shortname(t["id"])])
+    if tool.tool["id"].startswith("file://"):
+        path = os.path.dirname(tool.tool["id"][7:])
+        try:
+            githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
+        except OSError, subprocess.CalledProcessError:
+            pass
+        else:
+            packed["http://schema.org/version"] = githash
 
+    mapper = {}
+    def upload_tool_deps(tool):
         workflowmapper = upload_dependencies(arvrunner,
-                                             name,
+                                             shortname(tool.tool["id"]),
                                              tool.doc_loader,
                                              tool.tool,
                                              tool.tool["id"],
-                                             True)
-        jobmapper = upload_dependencies(arvrunner,
-                                        os.path.basename(job_order.get("id", "#")),
-                                        tool.doc_loader,
-                                        job_order,
-                                        job_order.get("id", "#"),
-                                        False)
+                                             False)
+        for k,v in workflowmapper.items():
+            mapper[k] = v.resolved
 
-        if "id" in job_order:
-            del job_order["id"]
+    tool.visit(upload_tool_deps)
 
-        return workflowmapper
+    def setloc(p):
+        if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+            p["location"] = mapper[p["location"]]
+    adjustFileObjs(packed, setloc)
+    adjustDirObjs(packed, setloc)
+
+    return packed
+
+def upload_instance(arvrunner, name, tool, job_order):
+    packed = packed_workflow(arvrunner, tool)
+
+    for t in tool.tool["inputs"]:
+        def setSecondary(fileobj):
+            if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                if "secondaryFiles" not in fileobj:
+                    fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+            if isinstance(fileobj, list):
+                for e in fileobj:
+                    setSecondary(e)
+
+        if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+            setSecondary(job_order[shortname(t["id"])])
+
+    jobmapper = upload_dependencies(arvrunner,
+                                    os.path.basename(job_order.get("id", "#")),
+                                    tool.doc_loader,
+                                    job_order,
+                                    job_order.get("id", "#"),
+                                    False)
+
+    if "id" in job_order:
+        del job_order["id"]
+
+    return packed
 
 def arvados_jobs_image(arvrunner):
     img = "arvados/jobs:"+__version__
@@ -191,7 +220,7 @@ class Runner(object):
     def update_pipeline_component(self, record):
         pass
 
-    def arvados_job_spec(self, *args, **kwargs):
+    def upload_instance(self, **kwargs):
         if self.name is None:
             self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
 
@@ -200,7 +229,7 @@ class Runner(object):
         if "job_order" in self.job_order:
             del self.job_order["job_order"]
 
-        workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
         adjustDirObjs(self.job_order, trim_listing)
         return workflowmapper
 
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 8403327..20cb635 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -173,7 +173,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
 
         exists = api.collections().list(filters=[["owner_uuid", "=", project],
                                                  ["portable_data_hash", "=", collection.portable_data_hash()],
-                                                 ["name", "=", name]]).execute(num_retries=num_retries)
+                                                 ["name", "like", name+"%"]]).execute(num_retries=num_retries)
         if exists["items"]:
             item = exists["items"][0]
             logger.info("Using collection %s", item["uuid"])

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list