[ARVADOS] created: 1.3.0-523-g545073697
Git user
git at public.curoverse.com
Wed Mar 13 14:59:41 EDT 2019
at 5450736979761397688dd6cf84a0df2ed01133cf (commit)
commit 5450736979761397688dd6cf84a0df2ed01133cf
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Mar 13 14:57:53 2019 -0400
14322: Tweak documentation to mention uuid keep URI form
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/doc/user/cwl/cwl-runner.html.textile.liquid b/doc/user/cwl/cwl-runner.html.textile.liquid
index ad5d3bd83..fbce8e17b 100644
--- a/doc/user/cwl/cwl-runner.html.textile.liquid
+++ b/doc/user/cwl/cwl-runner.html.textile.liquid
@@ -69,7 +69,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
{
"aligned_sam": {
- "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
"checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
"class": "File",
"size": 30738986
@@ -82,7 +82,7 @@ h3. Referencing files
When running a workflow on an Arvados cluster, the input files must be stored in Keep. There are several ways this can happen.
-A URI reference to Keep uses the @keep:@ scheme followed by the portable data hash, collection size, and path to the file inside the collection. For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt at .
+A URI reference to Keep uses the @keep:@ scheme followed by either the portable data hash or UUID of the collection and then the location of the file inside the collection. For example, @keep:2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@ or @keep:zzzzz-4zz18-zzzzzzzzzzzzzzz/19.fasta.bwt at .
If you reference a file in "arv-mount":{{site.baseurl}}/user/tutorials/tutorial-keep-mount.html, such as @/home/example/keep/by_id/2463fa9efeb75e099685528b3b9071e0+438/19.fasta.bwt@, then @arvados-cwl-runner@ will automatically determine the appropriate Keep URI reference.
@@ -100,7 +100,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
{
"aligned_sam": {
- "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "location": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
"checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
"class": "File",
"size": 30738986
commit 1afb79c10535c9e2f076323dbea81e8ef41e025f
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Mar 13 14:57:20 2019 -0400
14322: CollectionFsAccess accepts UUIDs
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 3744b4a93..d5866e728 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -63,24 +63,24 @@ class CollectionCache(object):
del self.collections[pdh]
self.total -= v[1]
- def get(self, pdh):
+ def get(self, locator):
with self.lock:
- if pdh not in self.collections:
- m = pdh_size.match(pdh)
+ if locator not in self.collections:
+ m = pdh_size.match(locator)
if m:
self.cap_cache(int(m.group(2)) * 128)
- logger.debug("Creating collection reader for %s", pdh)
- cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+ logger.debug("Creating collection reader for %s", locator)
+ cr = arvados.collection.CollectionReader(locator, api_client=self.api_client,
keep_client=self.keep_client,
num_retries=self.num_retries)
sz = len(cr.manifest_text()) * 128
- self.collections[pdh] = (cr, sz)
+ self.collections[locator] = (cr, sz)
self.total += sz
else:
- cr, sz = self.collections[pdh]
+ cr, sz = self.collections[locator]
# bump it to the back
- del self.collections[pdh]
- self.collections[pdh] = (cr, sz)
+ del self.collections[locator]
+ self.collections[locator] = (cr, sz)
return cr
@@ -94,9 +94,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
def get_collection(self, path):
sp = path.split("/", 1)
p = sp[0]
- if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
- pdh = p[5:]
- return (self.collection_cache.get(pdh), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
+ if p.startswith("keep:") and (arvados.util.keep_locator_pattern.match(p[5:]) or
+ arvados.util.collection_uuid_pattern.match(p[5:])):
+ locator = p[5:]
+ return (self.collection_cache.get(locator), urllib.parse.unquote(sp[1]) if len(sp) == 2 else None)
else:
return (None, path)
@@ -261,9 +262,11 @@ class CollectionFetcher(DefaultFetcher):
baseparts = basesp.path.split("/")
urlparts = urlsp.path.split("/") if urlsp.path else []
- pdh = baseparts.pop(0)
+ locator = baseparts.pop(0)
- if basesp.scheme == "keep" and not arvados.util.keep_locator_pattern.match(pdh):
+ if (basesp.scheme == "keep" and
+ (not arvados.util.keep_locator_pattern.match(pdh)) and
+ (not arvados.util.collection_uuid_pattern.match(pdh))):
raise IOError(errno.EINVAL, "Invalid Keep locator", base_url)
if urlsp.path.startswith("/"):
@@ -273,7 +276,7 @@ class CollectionFetcher(DefaultFetcher):
if baseparts and urlsp.path:
baseparts.pop()
- path = "/".join([pdh] + baseparts + urlparts)
+ path = "/".join([locator] + baseparts + urlparts)
return urllib.parse.urlunsplit((basesp.scheme, "", path, "", urlsp.fragment))
return super(CollectionFetcher, self).urljoin(base_url, url)
commit 3373fe7c00984e1368c1e3cfd3b7bb786dbbb619
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Mar 12 16:39:12 2019 -0400
14322: Tests for edge cases
Report unknown uuids and mismatches between current collection
PDH (from API server lookup) and location PDH.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 834ca195f..95711762c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -293,7 +293,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
logger.exception("Error creating the Arvados CWL Executor")
return 1
- # Note that unless in debug mode, some stack traces related to user
+ # Note that unless in debug mode, some stack traces related to user
# workflow errors may be suppressed. See ArvadosJob.done().
if arvargs.debug:
logger.setLevel(logging.DEBUG)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 319e8a887..c35842616 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -87,7 +87,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
)
finally:
self.updatingRuntimeStatus = False
-
+
class ArvCwlExecutor(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
@@ -475,7 +475,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
with final.open("cwl.output.json", "w") as f:
res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
- f.write(res)
+ f.write(res)
final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 39620a55f..d30445ab3 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -31,6 +31,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
from cwltool.builder import substitute
from cwltool.pack import pack
+import schema_salad.validate as validate
import arvados.collection
from .util import collectionUUID
@@ -90,6 +91,7 @@ def discover_secondary_files(inputs, job_order, discovered=None):
setSecondary(t, job_order[shortname(t["id"])], discovered)
collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
+collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run,
@@ -139,8 +141,21 @@ def upload_dependencies(arvrunner, name, document_loader,
loadref, urljoin=document_loader.fetcher.urljoin)
sc = []
- uuids = []
- def dependencies_needing_transformation(obj):
+ uuids = {}
+
+ def collect_uuids(obj):
+ loc = obj.get("location", "")
+ sp = loc.split(":")
+ if sp[0] == "keep":
+ # Collect collection uuids that need to be resolved to
+ # portable data hashes
+ gp = collection_uuid_pattern.match(loc)
+ if gp:
+ uuids[gp.groups()[0]] = obj
+ if collectionUUID in obj:
+ uuids[obj[collectionUUID]] = obj
+
+ def collect_uploads(obj):
loc = obj.get("location", "")
sp = loc.split(":")
if len(sp) < 1:
@@ -149,19 +164,18 @@ def upload_dependencies(arvrunner, name, document_loader,
# Record local files than need to be uploaded,
# don't include file literals, keep references, etc.
sc.append(obj)
- elif sp[0] == "keep":
- # Collect collection uuids that need to be resolved to
- # portable data hashes
- gp = collection_uuid_pattern.match(loc)
- if gp:
- uuids.append(gp.groups()[0])
+ collect_uuids(obj)
- visit_class(sc_result, ("File", "Directory"), dependencies_needing_transformation)
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ # Resolve any collection uuids we found to portable data hashes
+ # and assign them to uuid_map
uuid_map = {}
- while uuids:
+ fetch_uuids = list(uuids.keys())
+ while fetch_uuids:
lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", uuids]],
+ filters=[["uuid", "in", fetch_uuids]],
count="none",
select=["uuid", "portable_data_hash"]).execute(
num_retries=arvrunner.num_retries)
@@ -172,7 +186,7 @@ def upload_dependencies(arvrunner, name, document_loader,
for l in lookups["items"]:
uuid_map[l["uuid"]] = l["portable_data_hash"]
- uuids = [u for u in uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
@@ -227,14 +241,31 @@ def upload_dependencies(arvrunner, name, document_loader,
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
return
- if not uuid_map:
+
+ if not loc:
return
+
+ if collectionUUID in p:
+ uuid = p[collectionUUID]
+ if uuid not in uuid_map:
+ raise SourceLine(p, collectionUUID, validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
+ gp = collection_pdh_pattern.match(loc)
+ if gp and uuid_map[uuid] != gp.groups()[0]:
+ # This file entry has both collectionUUID and a PDH
+ # location. If the PDH doesn't match the one returned
+ # the API server, raise an error.
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Expected collection uuid %s to be %s but API server reported %s" % (
+ uuid, gp.groups()[0], uuid_map[p[collectionUUID]]))
+
gp = collection_uuid_pattern.match(loc)
if not gp:
return
uuid = gp.groups()[0]
if uuid not in uuid_map:
- raise Exception("Cannot resolve uuid %s" % uuid)
+ raise SourceLine(p, "location", validate.ValidationException).makeError(
+ "Collection uuid %s not found" % uuid)
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 4218ec137..76b0f89f1 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -112,6 +112,11 @@ def stubs(func):
"portable_data_hash": "99999999999999999999999999999998+99",
"manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
},
+ "99999999999999999999999999999997+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999997+99",
+ "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
+ },
"99999999999999999999999999999994+99": {
"uuid": "",
"portable_data_hash": "99999999999999999999999999999994+99",
@@ -1451,7 +1456,7 @@ class TestSubmit(unittest.TestCase):
stubs.api.collections().list.assert_has_calls([
mock.call(count='none',
- filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+ filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
select=['uuid', 'portable_data_hash'])])
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -1459,6 +1464,58 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
+ @stubs
+ def test_submit_mismatched_uuid_inputs(self, stubs):
+ def list_side_effect(**kwargs):
+ m = mock.MagicMock()
+ if "count" in kwargs:
+ m.execute.return_value = {"items": [
+ {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"}
+ ]}
+ else:
+ m.execute.return_value = {"items": []}
+ return m
+ stubs.api.collections().list.side_effect = list_side_effect
+
+ for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"):
+ capture_stderr = io.StringIO()
+ cwltool_logger = logging.getLogger('cwltool')
+ stderr_logger = logging.StreamHandler(capture_stderr)
+ cwltool_logger.addHandler(stderr_logger)
+
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", infile],
+ stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99")
+ finally:
+ cwltool_logger.removeHandler(stderr_logger)
+
+ @stubs
+ def test_submit_unknown_uuid_inputs(self, stubs):
+ capture_stderr = io.StringIO()
+ cwltool_logger = logging.getLogger('cwltool')
+ stderr_logger = logging.StreamHandler(capture_stderr)
+ cwltool_logger.addHandler(stderr_logger)
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+ stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ try:
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found")
+ finally:
+ cwltool_logger.removeHandler(stderr_logger)
+
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
@@ -1648,17 +1705,19 @@ class TestCreateWorkflow(unittest.TestCase):
stderr_logger = logging.StreamHandler(capture_stderr)
acr_logger.addHandler(stderr_logger)
- exited = arvados_cwl.main(
- ["--update-workflow", self.existing_workflow_uuid,
- "--api=jobs",
- "--debug",
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- sys.stderr, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 1)
- self.assertRegexpMatches(
- capture_stderr.getvalue(),
- "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
- acr_logger.removeHandler(stderr_logger)
+ try:
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_workflow_uuid,
+ "--api=jobs",
+ "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stderr, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+ finally:
+ acr_logger.removeHandler(stderr_logger)
@stubs
def test_update(self, stubs):
commit 79c03a206df0164a04fc4e61965e9ec5add4c075
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Mar 11 17:42:09 2019 -0400
14322: Propagate collection UUID to container mount
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 03b4e07c7..301e5317d 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -123,6 +123,8 @@ class ArvadosContainer(JobBase):
"kind": "collection",
"portable_data_hash": pdh
}
+ if pdh in self.pathmapper.pdh_to_uuid:
+ mounts[targetdir]["uuid"] = self.pathmapper.pdh_to_uuid[pdh]
if len(sp) == 2:
if tp == "Directory":
path = sp[1]
@@ -353,8 +355,8 @@ class ArvadosContainer(JobBase):
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # A stack trace may obfuscate more useful output about the workflow.
+ # Only include a stack trace if in debug mode.
+ # A stack trace may obfuscate more useful output about the workflow.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index e0445febd..38135899d 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -58,6 +58,7 @@ class ArvPathMapper(PathMapper):
self.name = name
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
+ self.pdh_to_uuid = {}
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
@@ -67,6 +68,8 @@ class ArvPathMapper(PathMapper):
if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+ if arvados_cwl.util.collectionUUID in srcobj:
+ self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
debug = logger.isEnabledFor(logging.DEBUG)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 794078bfe..39620a55f 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -33,7 +33,7 @@ from cwltool.builder import substitute
from cwltool.pack import pack
import arvados.collection
-import arvados.util
+from .util import collectionUUID
import ruamel.yaml as yaml
import arvados_cwl.arvdocker
@@ -236,8 +236,7 @@ def upload_dependencies(arvrunner, name, document_loader,
if uuid not in uuid_map:
raise Exception("Cannot resolve uuid %s" % uuid)
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
- p["http://arvados.org/cwl#collectionUUID"] = uuid
-
+ p[collectionUUID] = uuid
visit_class(workflowobj, ("File", "Directory"), setloc)
visit_class(discovered, ("File", "Directory"), setloc)
diff --git a/sdk/cwl/arvados_cwl/util.py b/sdk/cwl/arvados_cwl/util.py
index 776fc6bc2..85ae65ecf 100644
--- a/sdk/cwl/arvados_cwl/util.py
+++ b/sdk/cwl/arvados_cwl/util.py
@@ -5,6 +5,8 @@
import datetime
from arvados.errors import ApiError
+collectionUUID = "http://arvados.org/cwl#collectionUUID"
+
def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
if workflow_step_name:
name = "Intermediate collection for step %s" % (workflow_step_name)
@@ -30,5 +32,5 @@ def get_current_container(api, num_retries=0, logger=None):
if logger:
logger.info("Getting current container: %s", e)
raise e
-
+
return current_container
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index 1a57da392..07d962bf9 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -80,7 +80,7 @@ class TestContainer(unittest.TestCase):
return loadingContext, runtimeContext
- # Helper function to set up the ArvCwlExecutor to use the containers api
+ # Helper function to set up the ArvCwlExecutor to use the containers api
# and test that the RuntimeStatusLoggingHandler is set up correctly
def setup_and_test_container_executor_and_logging(self, gcc_mock) :
api = mock.MagicMock()
@@ -96,7 +96,7 @@ class TestContainer(unittest.TestCase):
handlerClasses = [h.__class__ for h in root_logger.handlers]
self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
return runner
-
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@@ -527,9 +527,9 @@ class TestContainer(unittest.TestCase):
# get_current_container is invoked when we call runtime_status_update
# so try and log again!
gcc_mock.side_effect = lambda *args: root_logger.error("Second Error")
- try:
+ try:
root_logger.error("First Error")
- except RuntimeError:
+ except RuntimeError:
self.fail("RuntimeStatusLoggingHandler should not be called recursively")
@mock.patch("arvados_cwl.ArvCwlExecutor.runtime_status_update")
@@ -538,7 +538,7 @@ class TestContainer(unittest.TestCase):
@mock.patch("arvados.collection.Collection")
def test_child_failure(self, col, reader, gcc_mock, rts_mock):
runner = self.setup_and_test_container_executor_and_logging(gcc_mock)
-
+
gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
self.assertTrue(gcc_mock.called)
@@ -630,6 +630,7 @@ class TestContainer(unittest.TestCase):
"p1": {
"class": "Directory",
"location": "keep:99999999999999999999999999999994+44",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
"listing": [
{
"class": "File",
@@ -660,7 +661,8 @@ class TestContainer(unittest.TestCase):
'mounts': {
"/keep/99999999999999999999999999999994+44": {
"kind": "collection",
- "portable_data_hash": "99999999999999999999999999999994+44"
+ "portable_data_hash": "99999999999999999999999999999994+44",
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz"
},
'/tmp': {'kind': 'tmp',
"capacity": 1073741824 },
commit 8f121cd455b2a7c979fd3b5f9f5d97e80b371e6c
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Mon Mar 11 17:02:14 2019 -0400
14322: Convert uuids on input to portable data hashes
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index e515ac2ce..794078bfe 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -8,6 +8,7 @@ from future.utils import viewvalues, viewitems
import os
import sys
+import re
import urllib.parse
from functools import partial
import logging
@@ -32,6 +33,7 @@ from cwltool.builder import substitute
from cwltool.pack import pack
import arvados.collection
+import arvados.util
import ruamel.yaml as yaml
import arvados_cwl.arvdocker
@@ -87,6 +89,7 @@ def discover_secondary_files(inputs, job_order, discovered=None):
if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
setSecondary(t, job_order[shortname(t["id"])], discovered)
+collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run,
@@ -136,14 +139,40 @@ def upload_dependencies(arvrunner, name, document_loader,
loadref, urljoin=document_loader.fetcher.urljoin)
sc = []
- def only_real(obj):
- # Only interested in local files than need to be uploaded,
- # don't include file literals, keep references, etc.
- sp = obj.get("location", "").split(":")
- if len(sp) > 1 and sp[0] in ("file", "http", "https"):
+ uuids = []
+ def dependencies_needing_transformation(obj):
+ loc = obj.get("location", "")
+ sp = loc.split(":")
+ if len(sp) < 1:
+ return
+ if sp[0] in ("file", "http", "https"):
+ # Record local files than need to be uploaded,
+ # don't include file literals, keep references, etc.
sc.append(obj)
+ elif sp[0] == "keep":
+ # Collect collection uuids that need to be resolved to
+ # portable data hashes
+ gp = collection_uuid_pattern.match(loc)
+ if gp:
+ uuids.append(gp.groups()[0])
- visit_class(sc_result, ("File", "Directory"), only_real)
+ visit_class(sc_result, ("File", "Directory"), dependencies_needing_transformation)
+
+ uuid_map = {}
+ while uuids:
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
+
+ if not lookups["items"]:
+ break
+
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
+
+ uuids = [u for u in uuids if u not in uuid_map]
normalizeFilesDirs(sc)
@@ -194,8 +223,21 @@ def upload_dependencies(arvrunner, name, document_loader,
single_collection=True)
def setloc(p):
- if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ loc = p.get("location")
+ if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ return
+ if not uuid_map:
+ return
+ gp = collection_uuid_pattern.match(loc)
+ if not gp:
+ return
+ uuid = gp.groups()[0]
+ if uuid not in uuid_map:
+ raise Exception("Cannot resolve uuid %s" % uuid)
+ p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
+ p["http://arvados.org/cwl#collectionUUID"] = uuid
+
visit_class(workflowobj, ("File", "Directory"), setloc)
visit_class(discovered, ("File", "Directory"), setloc)
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 39117d86e..4218ec137 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1425,6 +1425,40 @@ class TestSubmit(unittest.TestCase):
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
self.assertEqual(exited, 1)
+ @stubs
+ def test_submit_uuid_inputs(self, stubs):
+ def list_side_effect(**kwargs):
+ m = mock.MagicMock()
+ if "count" in kwargs:
+ m.execute.return_value = {"items": [
+ {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999998+99"}
+ ]}
+ else:
+ m.execute.return_value = {"items": []}
+ return m
+ stubs.api.collections().list.side_effect = list_side_effect
+
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['basename'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['y']['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+ expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['z']['listing'][0]['http://arvados.org/cwl#collectionUUID'] = 'zzzzz-4zz18-zzzzzzzzzzzzzzz'
+ del expect_container['mounts']['/var/lib/cwl/cwl.input.json']['content']['z']['listing'][0]['size']
+
+ stubs.api.collections().list.assert_has_calls([
+ mock.call(count='none',
+ filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz']]],
+ select=['uuid', 'portable_data_hash'])])
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list