[ARVADOS] updated: 8b133b596ef25d548e01820c25595803bb49d18e

Git user git at public.curoverse.com
Wed Mar 23 10:49:33 EDT 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 70 +++++++++++++++++++----------------
 sdk/cwl/tests/test_job.py       | 81 ++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 117 insertions(+), 34 deletions(-)

       via  8b133b596ef25d548e01820c25595803bb49d18e (commit)
      from  ed646f12b58e930fe018dc4caf2121294c26e375 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 8b133b596ef25d548e01820c25595803bb49d18e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Mar 23 10:41:30 2016 -0400

    8766: Refactor logic to copy output collection and add tests.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f587ad1..ab0e48d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -213,7 +213,15 @@ class ArvadosJob(object):
                     tmpdir = None
                     outdir = None
                     keepdir = None
-                    for l in log.readlines():
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
                         g = tmpdirre.match(l)
                         if g:
                             tmpdir = g.group(1)
@@ -224,41 +232,39 @@ class ArvadosJob(object):
                         if g:
                             keepdir = g.group(1)
 
-                        # It turns out if the job fails and restarts it can
-                        # come up on a different compute node, so we have to
-                        # read the log to the end to be sure instead of taking the
-                        # easy way out.
-                        #
-                        #if tmpdir and outdir and keepdir:
-                        #    break
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
 
-                        # Create a collection located in the same project as the pipeline with the contents of the output.
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
                         # First, get output record.
-                        collections = self.arvrunner.api.collections().list(limit=1,
-                                                                           filters=[['portable_data_hash', '=', record["output"]]],
-                                                                           select=["portable_data_hash", "manifest_text"]
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
                         ).execute(num_retries=self.arvrunner.num_retries)
 
-                        if collections["items"]:
-                            colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
-                            # check if there is a name collision.
-                            name_collision = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
-                                                                                            ["name", "=", name]]
-                            ).execute(num_retries=self.arvrunner.num_retries)
-
-                            if not name_collision["items"]:
-                                # Create new collection in the parent project
-                                # with the output contents.
-                                self.arvrunner.api.collections().create(body={
-                                    "owner_uuid": self.arvrunner.project_uuid,
-                                    "name": colname,
-                                    "portable_data_hash": collections[0]["portable_data_hash"],
-                                    "manifest_text": collections[0]["manifest_text"]
-                                }, ensure_unique_name=True).execute(num_retries=self.arvrunner.num_retries)
-
-                            # else: there is already a collection with the same name and the
-                            # same contents, so nothing to do.
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
 
                     self.builder.outdir = outdir
                     self.builder.pathmapper.keepdir = keepdir
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 56f3110..ae48238 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -26,7 +26,7 @@ class TestJob(unittest.TestCase):
                     'task.env': {'TMPDIR': '$(task.tmpdir)'},
                     'command': ['ls']
                 }],
-                'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+                'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
             },
             'script_version': 'master',
             'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
@@ -67,7 +67,7 @@ class TestJob(unittest.TestCase):
                     'task.env': {'TMPDIR': '$(task.tmpdir)'},
                     'command': ['ls']
                 }],
-                'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+                'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
             },
             'script_version': 'master',
             'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
@@ -79,3 +79,80 @@ class TestJob(unittest.TestCase):
                 'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
             }
         }, find_or_create=True)
+
+    @mock.patch("arvados.collection.Collection")
+    def test_done(self, col):
+        api = mock.MagicMock()
+
+        runner = mock.MagicMock()
+        runner.api = api
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.num_retries = 0
+
+        col().open.return_value = []
+        api.collections().list().execute.side_effect = ({"items": []},
+                                                        {"items": [{"manifest_text": "XYZ"}]})
+
+        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob.name = "testjob"
+        arvjob.builder = mock.MagicMock()
+        arvjob.output_callback = mock.MagicMock()
+        arvjob.collect_outputs = mock.MagicMock()
+
+        arvjob.done({
+            "state": "Complete",
+            "output": "99999999999999999999999999999993+99",
+            "log": "99999999999999999999999999999994+99",
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        })
+
+        api.collections().list.assert_has_calls([
+            mock.call(),
+            mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+                          ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+                          ['name', '=', 'Output 9999999 of testjob']]),
+            mock.call().execute(num_retries=0),
+            mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
+                 select=['manifest_text']),
+            mock.call().execute(num_retries=0)])
+
+        api.collections().create.assert_called_with(
+            ensure_unique_name=True,
+            body={'portable_data_hash': '99999999999999999999999999999993+99',
+                  'manifest_text': 'XYZ',
+                  'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                  'name': 'Output 9999999 of testjob'})
+
+    @mock.patch("arvados.collection.Collection")
+    def test_done_use_existing_collection(self, col):
+        api = mock.MagicMock()
+
+        runner = mock.MagicMock()
+        runner.api = api
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.num_retries = 0
+
+        col().open.return_value = []
+        api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
+
+        arvjob = arvados_cwl.ArvadosJob(runner)
+        arvjob.name = "testjob"
+        arvjob.builder = mock.MagicMock()
+        arvjob.output_callback = mock.MagicMock()
+        arvjob.collect_outputs = mock.MagicMock()
+
+        arvjob.done({
+            "state": "Complete",
+            "output": "99999999999999999999999999999993+99",
+            "log": "99999999999999999999999999999994+99",
+            "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        })
+
+        api.collections().list.assert_has_calls([
+            mock.call(),
+            mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+                               ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+                               ['name', '=', 'Output 9999999 of testjob']]),
+            mock.call().execute(num_retries=0)])
+
+        self.assertFalse(api.collections().create.called)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list