[ARVADOS] updated: 1.3.0-487-g3e9cb5654

Git user git at public.curoverse.com
Tue Mar 12 16:41:52 EDT 2019


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py |  2 +-
 sdk/cwl/arvados_cwl/executor.py |  4 +-
 sdk/cwl/arvados_cwl/runner.py   | 59 ++++++++++++++++++++++-------
 sdk/cwl/tests/test_submit.py    | 83 +++++++++++++++++++++++++++++++++++------
 4 files changed, 119 insertions(+), 29 deletions(-)

       via  3e9cb56544f3acecf6aa2bf967263600abf0c584 (commit)
      from  cea06ef31f890c5026d2ee6db4a70e8f4f723a55 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 3e9cb56544f3acecf6aa2bf967263600abf0c584
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Mar 12 16:39:12 2019 -0400

    14322: Tests for edge cases
    
    Report unknown uuids and mismatches between current collection
    PDH (from API server lookup) and location PDH.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 834ca195f..95711762c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -293,7 +293,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
         logger.exception("Error creating the Arvados CWL Executor")
         return 1
 
-    # Note that unless in debug mode, some stack traces related to user 
+    # Note that unless in debug mode, some stack traces related to user
     # workflow errors may be suppressed. See ArvadosJob.done().
     if arvargs.debug:
         logger.setLevel(logging.DEBUG)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 319e8a887..c35842616 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -87,7 +87,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
                     )
             finally:
                 self.updatingRuntimeStatus = False
-            
+
 
 class ArvCwlExecutor(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -475,7 +475,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         with final.open("cwl.output.json", "w") as f:
             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
-            f.write(res)           
+            f.write(res)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
 
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 39620a55f..d30445ab3 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -31,6 +31,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.builder import substitute
 from cwltool.pack import pack
+import schema_salad.validate as validate
 
 import arvados.collection
 from .util import collectionUUID
@@ -90,6 +91,7 @@ def discover_secondary_files(inputs, job_order, discovered=None):
             setSecondary(t, job_order[shortname(t["id"])], discovered)
 
 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
@@ -139,8 +141,21 @@ def upload_dependencies(arvrunner, name, document_loader,
                   loadref, urljoin=document_loader.fetcher.urljoin)
 
     sc = []
-    uuids = []
-    def dependencies_needing_transformation(obj):
+    uuids = {}
+
+    def collect_uuids(obj):
+        loc = obj.get("location", "")
+        sp = loc.split(":")
+        if sp[0] == "keep":
+            # Collect collection uuids that need to be resolved to
+            # portable data hashes
+            gp = collection_uuid_pattern.match(loc)
+            if gp:
+                uuids[gp.groups()[0]] = obj
+            if collectionUUID in obj:
+                uuids[obj[collectionUUID]] = obj
+
+    def collect_uploads(obj):
         loc = obj.get("location", "")
         sp = loc.split(":")
         if len(sp) < 1:
@@ -149,19 +164,18 @@ def upload_dependencies(arvrunner, name, document_loader,
             # Record local files than need to be uploaded,
             # don't include file literals, keep references, etc.
             sc.append(obj)
-        elif sp[0] == "keep":
-            # Collect collection uuids that need to be resolved to
-            # portable data hashes
-            gp = collection_uuid_pattern.match(loc)
-            if gp:
-                uuids.append(gp.groups()[0])
+        collect_uuids(obj)
 
-    visit_class(sc_result, ("File", "Directory"), dependencies_needing_transformation)
+    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+    visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
+    # Resolve any collection uuids we found to portable data hashes
+    # and assign them to uuid_map
     uuid_map = {}
-    while uuids:
+    fetch_uuids = list(uuids.keys())
+    while fetch_uuids:
         lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", uuids]],
+            filters=[["uuid", "in", fetch_uuids]],
             count="none",
             select=["uuid", "portable_data_hash"]).execute(
                 num_retries=arvrunner.num_retries)
@@ -172,7 +186,7 @@ def upload_dependencies(arvrunner, name, document_loader,
         for l in lookups["items"]:
             uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        uuids = [u for u in uuids if u not in uuid_map]
+        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
@@ -227,14 +241,31 @@ def upload_dependencies(arvrunner, name, document_loader,
         if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
             p["location"] = mapper.mapper(p["location"]).resolved
             return
-        if not uuid_map:
+
+        if not loc:
             return
+
+        if collectionUUID in p:
+            uuid = p[collectionUUID]
+            if uuid not in uuid_map:
+                raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+                    "Collection uuid %s not found" % uuid)
+            gp = collection_pdh_pattern.match(loc)
+            if gp and uuid_map[uuid] != gp.groups()[0]:
+                # This file entry has both collectionUUID and a PDH
+                # location. If the PDH doesn't match the one returned
+                # the API server, raise an error.
+                raise SourceLine(p, "location", validate.ValidationException).makeError(
+                    "Expected collection uuid %s to be %s but API server reported %s" % (
+                        uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
         gp = collection_uuid_pattern.match(loc)
         if not gp:
             return
         uuid = gp.groups()[0]
         if uuid not in uuid_map:
-            raise Exception("Cannot resolve uuid %s" % uuid)
+            raise SourceLine(p, "location", validate.ValidationException).makeError(
+                "Collection uuid %s not found" % uuid)
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 4218ec137..76b0f89f1 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -112,6 +112,11 @@ def stubs(func):
                 "portable_data_hash": "99999999999999999999999999999998+99",
                 "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
             },
+            "99999999999999999999999999999997+99": {
+                "uuid": "",
+                "portable_data_hash": "99999999999999999999999999999997+99",
+                "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
+            },
             "99999999999999999999999999999994+99": {
                 "uuid": "",
                 "portable_data_hash": "99999999999999999999999999999994+99",
@@ -1451,7 +1456,7 @@ class TestSubmit(unittest.TestCase):
 
         stubs.api.collections().list.assert_has_calls([
             mock.call(count='none',
-                      filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+                      filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
                       select=['uuid', 'portable_data_hash'])])
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -1459,6 +1464,58 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
         self.assertEqual(exited, 0)
 
+    @stubs
+    def test_submit_mismatched_uuid_inputs(self, stubs):
+        def list_side_effect(**kwargs):
+            m = mock.MagicMock()
+            if "count" in kwargs:
+                m.execute.return_value = {"items": [
+                    {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"}
+                ]}
+            else:
+                m.execute.return_value = {"items": []}
+            return m
+        stubs.api.collections().list.side_effect = list_side_effect
+
+        for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"):
+            capture_stderr = io.StringIO()
+            cwltool_logger = logging.getLogger('cwltool')
+            stderr_logger = logging.StreamHandler(capture_stderr)
+            cwltool_logger.addHandler(stderr_logger)
+
+            try:
+                exited = arvados_cwl.main(
+                    ["--submit", "--no-wait", "--api=containers", "--debug",
+                        "tests/wf/submit_wf.cwl", infile],
+                    stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+                self.assertEqual(exited, 1)
+                self.assertRegexpMatches(
+                    capture_stderr.getvalue(),
+                    r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
+            finally:
+                cwltool_logger.removeHandler(stderr_logger)
+
+    @stubs
+    def test_submit_unknown_uuid_inputs(self, stubs):
+        capture_stderr = io.StringIO()
+        cwltool_logger = logging.getLogger('cwltool')
+        stderr_logger = logging.StreamHandler(capture_stderr)
+        cwltool_logger.addHandler(stderr_logger)
+
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+                "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+            stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+        try:
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
+        finally:
+            cwltool_logger.removeHandler(stderr_logger)
+
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
@@ -1648,17 +1705,19 @@ class TestCreateWorkflow(unittest.TestCase):
         stderr_logger = logging.StreamHandler(capture_stderr)
         acr_logger.addHandler(stderr_logger)
 
-        exited = arvados_cwl.main(
-            ["--update-workflow", self.existing_workflow_uuid,
-             "--api=jobs",
-             "--debug",
-             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            sys.stderr, sys.stderr, api_client=stubs.api)
-        self.assertEqual(exited, 1)
-        self.assertRegexpMatches(
-            capture_stderr.getvalue(),
-            "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
-        acr_logger.removeHandler(stderr_logger)
+        try:
+            exited = arvados_cwl.main(
+                ["--update-workflow", self.existing_workflow_uuid,
+                 "--api=jobs",
+                 "--debug",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                sys.stderr, sys.stderr, api_client=stubs.api)
+            self.assertEqual(exited, 1)
+            self.assertRegexpMatches(
+                capture_stderr.getvalue(),
+                "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+        finally:
+            acr_logger.removeHandler(stderr_logger)
 
     @stubs
     def test_update(self, stubs):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list