[ARVADOS] created: ae1f1ea0a3130f7cd05402a7924550a8247d88f8
Git user
git at public.curoverse.com
Fri Jan 6 16:41:08 EST 2017
at ae1f1ea0a3130f7cd05402a7924550a8247d88f8 (commit)
commit ae1f1ea0a3130f7cd05402a7924550a8247d88f8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 6 16:40:51 2017 -0500
10812: Fixing up tests.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ff72e14..72eed37 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -227,7 +227,7 @@ class RunnerContainer(Runner):
"kind": "json",
"json": packed
}
- if packed["id"].startswith("arvwf:"):
+ if packed.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = packed["id"][6:]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index eee4ef6..2fccb57 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -240,7 +240,7 @@ class RunnerJob(Runner):
with collection.open("workflow.cwl", "w") as f:
f.write(yaml.round_trip_dump(packed))
- exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ exists = self.arvrunner.api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 01853e6..cd7e41a 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -36,8 +36,8 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
if not name:
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
- upload_dependencies(arvRunner, name, document_loader,
- packed, uri, False)
+ upload_dependencies(arvRunner, name, tool.doc_loader,
+ packed, tool.tool["id"], False)
# TODO nowhere for submit_runner_ram to go.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d8630d4..00e5132 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -18,6 +18,7 @@ from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.utils import aslist
from cwltool.builder import substitute
+from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
@@ -127,22 +128,22 @@ def packed_workflow(arvrunner, tool):
document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
packed = pack(document_loader, workflowobj, uri, tool.metadata)
- if tool.tool["id"].startswith("file://"):
- path = os.path.dirname(tool.tool["id"][7:])
- try:
- githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
- except OSError, subprocess.CalledProcessError:
- pass
- else:
- packed["http://schema.org/version"] = githash
+ # if tool.tool["id"].startswith("file://"):
+ # path = os.path.dirname(tool.tool["id"][7:])
+ # try:
+ # githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
+ # except (OSError, subprocess.CalledProcessError):
+ # pass
+ # else:
+ # packed["http://schema.org/version"] = githash
mapper = {}
- def upload_tool_deps(tool):
+ def upload_tool_deps(deptool):
workflowmapper = upload_dependencies(arvrunner,
- shortname(tool.tool["id"]),
- tool.doc_loader,
- tool.tool,
- tool.tool["id"],
+ shortname(deptool["id"]),
+ document_loader,
+ deptool,
+ deptool["id"],
False)
for k,v in workflowmapper.items():
mapper[k] = v.resolved
@@ -155,6 +156,8 @@ def packed_workflow(arvrunner, tool):
adjustFileObjs(packed, setloc)
adjustDirObjs(packed, setloc)
+ #print yaml.round_trip_dump(packed)
+
return packed
def upload_instance(arvrunner, name, tool, job_order):
@@ -231,7 +234,7 @@ class Runner(object):
packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
adjustDirObjs(self.job_order, trim_listing)
- return workflowmapper
+ return packed
def done(self, record):
try:
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 0a01fb4..7b2a0ec 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -17,6 +17,8 @@ import arvados.keep
from .matcher import JsonDiffMatcher
from .mock_discovery import get_rootDesc
+import ruamel.yaml as yaml
+
_rootDesc = None
def stubs(func):
@@ -122,7 +124,7 @@ def stubs(func):
'class': 'Directory'
},
'cwl:tool':
- '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl'
},
'repository': 'arvados',
'script_version': 'master',
@@ -144,7 +146,7 @@ def stubs(func):
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'cwl:tool': 'ef2c299cb4e5a565a46d94887eafdc62+58/workflow.cwl',
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
@@ -167,6 +169,9 @@ def stubs(func):
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
stubs.expect_container_spec = {
'priority': 1,
'mounts': {
@@ -174,9 +179,9 @@ def stubs(func):
'writable': True,
'kind': 'collection'
},
- '/var/lib/cwl/workflow': {
- 'portable_data_hash': '99999999999999999999999999999991+99',
- 'kind': 'collection'
+ '/var/lib/cwl/workflow.json': {
+ 'json': expect_packed_workflow,
+ 'kind': 'json'
},
'stdout': {
'path': '/var/spool/cwl/cwl.output.json',
@@ -198,7 +203,7 @@ def stubs(func):
'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
'--enable-reuse', '--on-error=continue',
- '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
@@ -747,7 +752,7 @@ class TestSubmit(unittest.TestCase):
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
- body=expect_container)
+ body=JsonDiffMatcher(expect_container))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
diff --git a/sdk/cwl/tests/wf/submit_wf_packed.cwl b/sdk/cwl/tests/wf/submit_wf_packed.cwl
new file mode 100644
index 0000000..a1ada94
--- /dev/null
+++ b/sdk/cwl/tests/wf/submit_wf_packed.cwl
@@ -0,0 +1,33 @@
+cwlVersion: v1.0
+$graph:
+- class: CommandLineTool
+ requirements:
+ - class: DockerRequirement
+ dockerPull: debian:8
+ inputs:
+ - id: '#submit_tool.cwl/x'
+ type: File
+ default:
+ class: File
+ location: keep:99999999999999999999999999999992+99/blub.txt
+ inputBinding:
+ position: 1
+ outputs: []
+ baseCommand: cat
+ id: '#submit_tool.cwl'
+- class: Workflow
+ inputs:
+ - id: '#main/x'
+ type: File
+ - id: '#main/y'
+ type: Directory
+ - id: '#main/z'
+ type: Directory
+ outputs: []
+ steps:
+ - id: '#main/step1'
+ in:
+ - {id: '#main/step1/x', source: '#main/x'}
+ out: []
+ run: '#submit_tool.cwl'
+ id: '#main'
commit 93fde4b1632b046fe60884c603c55a654771630f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 6 15:34:44 2017 -0500
10812: Use packed workflows for all run modes.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 987b0d6..ff72e14 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -190,7 +190,7 @@ class RunnerContainer(Runner):
the +body+ argument to container_requests().create().
"""
- workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
container_req = {
"owner_uuid": self.arvrunner.project_uuid,
@@ -222,27 +222,13 @@ class RunnerContainer(Runner):
"properties": {}
}
- workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
- if workflowcollection.startswith("keep:"):
- workflowcollection = workflowcollection[5:workflowcollection.index('/')]
- workflowname = os.path.basename(self.tool.tool["id"])
- workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
- container_req["mounts"]["/var/lib/cwl/workflow"] = {
- "kind": "collection",
- "portable_data_hash": "%s" % workflowcollection
- }
- elif workflowcollection.startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- wfuuid = workflowcollection[6:workflowcollection.index("#")]
- wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
- wfobj = yaml.safe_load(wfrecord["definition"])
- if container_req["name"].startswith("arvwf:"):
- container_req["name"] = wfrecord["name"]
- container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
- "kind": "json",
- "json": wfobj
- }
- container_req["properties"]["template_uuid"] = wfuuid
+ workflowpath = "/var/lib/cwl/workflow.json#main"
+ container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+ "kind": "json",
+ "json": packed
+ }
+ if packed["id"].startswith("arvwf:"):
+ container_req["properties"]["template_uuid"] = packed["id"][6:]
command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
if self.output_name:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 11ef653..eee4ef6 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -10,6 +10,8 @@ from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
+import ruamel.yaml as yaml
+
import arvados.collection
from .arvdocker import arv_docker_get_image
@@ -231,6 +233,28 @@ class ArvadosJob(object):
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+ def upload_workflow_collection(self, packed):
+ collection = arvados.collection.Collection(api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ with collection.open("workflow.cwl", "w") as f:
+ f.write(yaml.round_trip_dump(packed))
+
+ exists = api.collections().list(filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ["portable_data_hash", "=", collection.portable_data_hash()],
+ ["name", "like", self.name+"%"]]).execute(num_retries=self.arvrunner.num_retries)
+
+ if exists["items"]:
+ logger.info("Using collection %s", exists["items"][0]["uuid"])
+ else:
+ collection.save_new(name=self.name,
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ num_retries=self.arvrunner.num_retries)
+ logger.info("Uploaded to %s", collection.manifest_locator())
+
+ return collection.portable_data_hash()
+
def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
"""Create an Arvados job specification for this workflow.
@@ -239,9 +263,10 @@ class RunnerJob(Runner):
a pipeline template or pipeline instance.
"""
- workflowmapper = super(RunnerJob, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ packed = self.upload_instance(dry_run=dry_run, pull_image=pull_image, **kwargs)
+ wf_pdh = self.upload_workflow_collection(packed)
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+ self.job_order["cwl:tool"] = "%s/workflow.cwl" % wf_pdh
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 9e70a6e..01853e6 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import ruamel.yaml as yaml
-from .runner import upload_docker, upload_dependencies, trim_listing
+from .runner import upload_dependencies, trim_listing, packed_workflow
from .arvtool import ArvadosCommandTool
from .perf import Perf
@@ -22,11 +22,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None):
- upload_docker(arvRunner, tool)
- document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
-
- packed = pack(document_loader, workflowobj, uri, tool.metadata)
+ packed = packed_workflow(arvRunner, tool)
adjustDirObjs(job_order, trim_listing)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 1c3625e..d8630d4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -4,6 +4,8 @@ from functools import partial
import logging
import json
import re
+import subprocess
+
from cStringIO import StringIO
from schema_salad.sourceline import SourceLine
@@ -118,43 +120,70 @@ def upload_docker(arvrunner, tool):
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
-
-def upload_instance(arvrunner, name, tool, job_order):
- upload_docker(arvrunner, tool)
- for t in tool.tool["inputs"]:
- def setSecondary(fileobj):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+def packed_workflow(arvrunner, tool):
+ tool.visit(partial(upload_docker, arvrunner))
- if isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(e)
+ document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
+ packed = pack(document_loader, workflowobj, uri, tool.metadata)
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(job_order[shortname(t["id"])])
+ if tool.tool["id"].startswith("file://"):
+ path = os.path.dirname(tool.tool["id"][7:])
+ try:
+ githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H', 'path']).strip()
+ except OSError, subprocess.CalledProcessError:
+ pass
+ else:
+ packed["http://schema.org/version"] = githash
+ mapper = {}
+ def upload_tool_deps(tool):
workflowmapper = upload_dependencies(arvrunner,
- name,
+ shortname(tool.tool["id"]),
tool.doc_loader,
tool.tool,
tool.tool["id"],
- True)
- jobmapper = upload_dependencies(arvrunner,
- os.path.basename(job_order.get("id", "#")),
- tool.doc_loader,
- job_order,
- job_order.get("id", "#"),
- False)
+ False)
+ for k,v in workflowmapper.items():
+ mapper[k] = v.resolved
- if "id" in job_order:
- del job_order["id"]
+ tool.visit(upload_tool_deps)
- return workflowmapper
+ def setloc(p):
+ if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
+ p["location"] = mapper[p["location"]]
+ adjustFileObjs(packed, setloc)
+ adjustDirObjs(packed, setloc)
+
+ return packed
+
+def upload_instance(arvrunner, name, tool, job_order):
+ packed = packed_workflow(arvrunner, tool)
+
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
+ jobmapper = upload_dependencies(arvrunner,
+ os.path.basename(job_order.get("id", "#")),
+ tool.doc_loader,
+ job_order,
+ job_order.get("id", "#"),
+ False)
+
+ if "id" in job_order:
+ del job_order["id"]
+
+ return packed
def arvados_jobs_image(arvrunner):
img = "arvados/jobs:"+__version__
@@ -191,7 +220,7 @@ class Runner(object):
def update_pipeline_component(self, record):
pass
- def arvados_job_spec(self, *args, **kwargs):
+ def upload_instance(self, **kwargs):
if self.name is None:
self.name = self.tool.tool.get("label") or os.path.basename(self.tool.tool["id"])
@@ -200,7 +229,7 @@ class Runner(object):
if "job_order" in self.job_order:
del self.job_order["job_order"]
- workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+ packed = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
adjustDirObjs(self.job_order, trim_listing)
return workflowmapper
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 8403327..20cb635 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -173,7 +173,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
exists = api.collections().list(filters=[["owner_uuid", "=", project],
["portable_data_hash", "=", collection.portable_data_hash()],
- ["name", "=", name]]).execute(num_retries=num_retries)
+ ["name", "like", name+"%"]]).execute(num_retries=num_retries)
if exists["items"]:
item = exists["items"][0]
logger.info("Using collection %s", item["uuid"])
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list