[ARVADOS] updated: a262ca9978c9faef9f0363f94c4166a635d81f5f
Git user
git at public.curoverse.com
Wed Jun 15 14:51:21 EDT 2016
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 47 +++++++++++++++++++++++------------------
sdk/cwl/arvados_cwl/runner.py | 9 +++++++-
2 files changed, 35 insertions(+), 21 deletions(-)
via a262ca9978c9faef9f0363f94c4166a635d81f5f (commit)
from d58bfc39208835e745f34d3c1f05fcc09f7db99f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a262ca9978c9faef9f0363f94c4166a635d81f5f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 15 14:51:14 2016 -0400
8442: Submit CommandLineTool containers directly without an intermediate
workflow runner. Handle unsupported feature gracefully.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 050b7b9..e14eb4b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -37,6 +37,7 @@ class ArvCwlRunner(object):
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.final_output = None
+ self.final_status = None
self.uploaded = {}
self.num_retries = 4
self.uuid = None
@@ -106,9 +107,29 @@ class ArvCwlRunner(object):
# cwltool.main will write our return value to stdout.
return tmpl.uuid
+ self.debug = kwargs.get("debug")
+ self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+ self.fs_access = CollectionFsAccess(kwargs["basedir"])
+
+ kwargs["fs_access"] = self.fs_access
+ kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+
+ if self.crunch2:
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["tmpdir"] = "/tmp"
+ else:
+ kwargs["outdir"] = "$(task.outdir)"
+ kwargs["tmpdir"] = "$(task.tmpdir)"
+
+ runnerjob = None
if kwargs.get("submit"):
if self.crunch2:
- runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+ if tool.tool["class"] == "CommandLineTool":
+ runnerjob = tool.job(job_order,
+ self.output_callback,
+ **kwargs).next()
+ else:
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
else:
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -122,30 +143,16 @@ class ArvCwlRunner(object):
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
- if kwargs.get("submit") and not kwargs.get("wait"):
- runnerjob.run()
- return runnerjob.uuid
+ if runnerjob and not kwargs.get("wait"):
+ runnerjob.run()
+ return runnerjob.uuid
if self.crunch2:
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
else:
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
- self.debug = kwargs.get("debug")
- self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.fs_access = CollectionFsAccess(kwargs["basedir"])
-
- kwargs["fs_access"] = self.fs_access
- kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-
- if self.crunch2:
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["tmpdir"] = "/tmp"
- else:
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
-
- if kwargs.get("submit"):
+ if runnerjob:
jobiter = iter((runnerjob,))
else:
if "cwl_runner_job" in kwargs:
@@ -185,7 +192,7 @@ class ArvCwlRunner(object):
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and self.crunch2:
+ if runnerjob and runnerjob.uuid and self.crunch2:
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 002c0ca..639426a 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ class Runner(object):
self.job_order = job_order
self.running = False
self.enable_reuse = enable_reuse
+ self.uuid = None
def update_pipeline_component(self, record):
pass
@@ -51,8 +52,14 @@ class Runner(object):
return path
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+ loaded = set()
def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
+ joined = urlparse.urljoin(b, u)
+ if joined not in loaded:
+ loaded.add(joined)
+ return document_loader.fetch(urlparse.urljoin(b, u))
+ else:
+ return {}
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list