[ARVADOS] updated: 8b133b596ef25d548e01820c25595803bb49d18e
Git user
git at public.curoverse.com
Wed Mar 23 10:49:33 EDT 2016
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 70 +++++++++++++++++++----------------
sdk/cwl/tests/test_job.py | 81 ++++++++++++++++++++++++++++++++++++++++-
2 files changed, 117 insertions(+), 34 deletions(-)
via 8b133b596ef25d548e01820c25595803bb49d18e (commit)
from ed646f12b58e930fe018dc4caf2121294c26e375 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 8b133b596ef25d548e01820c25595803bb49d18e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Mar 23 10:41:30 2016 -0400
8766: Refactor logic to copy output collection and add tests.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f587ad1..ab0e48d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -213,7 +213,15 @@ class ArvadosJob(object):
tmpdir = None
outdir = None
keepdir = None
- for l in log.readlines():
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
g = tmpdirre.match(l)
if g:
tmpdir = g.group(1)
@@ -224,41 +232,39 @@ class ArvadosJob(object):
if g:
keepdir = g.group(1)
- # It turns out if the job fails and restarts it can
- # come up on a different compute node, so we have to
- # read the log to the end to be sure instead of taking the
- # easy way out.
- #
- #if tmpdir and outdir and keepdir:
- # break
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
- # Create a collection located in the same project as the pipeline with the contents of the output.
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
# First, get output record.
- collections = self.arvrunner.api.collections().list(limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["portable_data_hash", "manifest_text"]
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
).execute(num_retries=self.arvrunner.num_retries)
- if collections["items"]:
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if there is a name collision.
- name_collision = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ["name", "=", name]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not name_collision["items"]:
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": collections[0]["portable_data_hash"],
- "manifest_text": collections[0]["manifest_text"]
- }, ensure_unique_name=True).execute(num_retries=self.arvrunner.num_retries)
-
- # else: there is already a collection with the same name and the
- # same contents, so nothing to do.
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
self.builder.outdir = outdir
self.builder.pathmapper.keepdir = keepdir
diff --git a/sdk/cwl/tests/test_job.py b/sdk/cwl/tests/test_job.py
index 56f3110..ae48238 100644
--- a/sdk/cwl/tests/test_job.py
+++ b/sdk/cwl/tests/test_job.py
@@ -26,7 +26,7 @@ class TestJob(unittest.TestCase):
'task.env': {'TMPDIR': '$(task.tmpdir)'},
'command': ['ls']
}],
- 'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+ 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
},
'script_version': 'master',
'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
@@ -67,7 +67,7 @@ class TestJob(unittest.TestCase):
'task.env': {'TMPDIR': '$(task.tmpdir)'},
'command': ['ls']
}],
- 'crunchrunner': '83db29f08544e1c319572a6bd971088a+140/crunchrunner'
+ 'crunchrunner': arvados_cwl.crunchrunner_pdh + '/crunchrunner'
},
'script_version': 'master',
'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
@@ -79,3 +79,80 @@ class TestJob(unittest.TestCase):
'min_scratch_mb_per_node': 5024 # tmpdirSize + outdirSize
}
}, find_or_create=True)
+
+ @mock.patch("arvados.collection.Collection")
+ def test_done(self, col):
+ api = mock.MagicMock()
+
+ runner = mock.MagicMock()
+ runner.api = api
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.num_retries = 0
+
+ col().open.return_value = []
+ api.collections().list().execute.side_effect = ({"items": []},
+ {"items": [{"manifest_text": "XYZ"}]})
+
+ arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob.name = "testjob"
+ arvjob.builder = mock.MagicMock()
+ arvjob.output_callback = mock.MagicMock()
+ arvjob.collect_outputs = mock.MagicMock()
+
+ arvjob.done({
+ "state": "Complete",
+ "output": "99999999999999999999999999999993+99",
+ "log": "99999999999999999999999999999994+99",
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ })
+
+ api.collections().list.assert_has_calls([
+ mock.call(),
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+ ['name', '=', 'Output 9999999 of testjob']]),
+ mock.call().execute(num_retries=0),
+ mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
+ select=['manifest_text']),
+ mock.call().execute(num_retries=0)])
+
+ api.collections().create.assert_called_with(
+ ensure_unique_name=True,
+ body={'portable_data_hash': '99999999999999999999999999999993+99',
+ 'manifest_text': 'XYZ',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'name': 'Output 9999999 of testjob'})
+
+ @mock.patch("arvados.collection.Collection")
+ def test_done_use_existing_collection(self, col):
+ api = mock.MagicMock()
+
+ runner = mock.MagicMock()
+ runner.api = api
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.num_retries = 0
+
+ col().open.return_value = []
+ api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
+
+ arvjob = arvados_cwl.ArvadosJob(runner)
+ arvjob.name = "testjob"
+ arvjob.builder = mock.MagicMock()
+ arvjob.output_callback = mock.MagicMock()
+ arvjob.collect_outputs = mock.MagicMock()
+
+ arvjob.done({
+ "state": "Complete",
+ "output": "99999999999999999999999999999993+99",
+ "log": "99999999999999999999999999999994+99",
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ })
+
+ api.collections().list.assert_has_calls([
+ mock.call(),
+ mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
+ ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
+ ['name', '=', 'Output 9999999 of testjob']]),
+ mock.call().execute(num_retries=0)])
+
+ self.assertFalse(api.collections().create.called)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list