[ARVADOS] updated: 0effb9301167df51d071e52562b3acfe7513498b
git at public.curoverse.com
git at public.curoverse.com
Tue Jun 30 13:52:08 EDT 2015
Summary of changes:
sdk/python/arvados/commands/cwl_runner.py | 58 +++++++++++++++++++++----------
1 file changed, 39 insertions(+), 19 deletions(-)
via 0effb9301167df51d071e52562b3acfe7513498b (commit)
from 9d209cb34089febeaadeab572a1b4c8d9d485741 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0effb9301167df51d071e52562b3acfe7513498b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jun 30 13:52:33 2015 -0400
6264: Passes 29/35 conformance tests. Some debugging and feature addition remains.
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 19fca27..b03f5ad 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -19,6 +19,9 @@ from cwltool.process import get_feature
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
+del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("EnvVarRequirement")]
+del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("CreateFileRequirement")]
+
def arv_docker_get_image(api_client, dockerRequirement, pull_image):
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -64,22 +67,23 @@ class CollectionFsAccess(object):
return ret
def glob(self, pattern):
- collection, rest = self.get_path(pattern)
+ collection, rest = self.get_collection(pattern)
patternsegments = rest.split("/")
return self._match(collection, patternsegments, collection.manifest_locator())
def open(self, fn, mode):
- collection, rest = self.get_path(fn)
- return c.open(rest, mode)
+ collection, rest = self.get_collection(fn)
+ return collection.open(rest, mode)
def exists(self, fn):
- collection, rest = self.get_path(fn)
- return c.exists(rest)
+ collection, rest = self.get_collection(fn)
+ return collection.exists(rest)
class ArvadosJob(object):
def __init__(self, runner):
self.arvrunner = runner
+ self.running = False
def run(self, dry_run=False, pull_image=True, **kwargs):
script_parameters = {
@@ -88,7 +92,7 @@ class ArvadosJob(object):
runtime_constraints = {}
if self.stdin:
- script_parameters["task.stdin"] = self.stdin
+ script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
if self.stdout:
script_parameters["task.stdout"] = self.stdout
@@ -106,18 +110,27 @@ class ArvadosJob(object):
"runtime_constraints": runtime_constraints
}).execute()
+ logger.info("Submitted job %s", response["uuid"])
+
self.arvrunner.jobs[response["uuid"]] = self
def done(self, record):
- outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
-
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
+ try:
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
- self.output_callback(outputs, processStatus)
+ try:
+ outputs = {}
+ outputs = self.collect_outputs(record["output"])
+ except Exception as e:
+ logger.warn(str(e))
+ processStatus = "permanentFail"
+ self.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
class ArvPathMapper(cwltool.pathmapper.PathMapper):
def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
@@ -147,6 +160,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
self._pathmap[src] = (ab, st.fn)
+
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def __init__(self, arvrunner, toolpath_object, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
@@ -173,26 +187,32 @@ class ArvCwlRunner(object):
else:
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
- def output_callback(out, processStatus):
+ def output_callback(self, out, processStatus):
if processStatus == "success":
- _logger.info("Overall job status is %s", processStatus)
+ logger.info("Overall job status is %s", processStatus)
else:
- _logger.warn("Overall job status is %s", processStatus)
+ logger.warn("Overall job status is %s", processStatus)
self.final_output = out
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
- if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+ logger.info("Job %s is Running", event["object_uuid"])
+ self.jobs[event["object_uuid"]].running = True
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
try:
self.cond.acquire()
- self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+ self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
def arvExecutor(self, t, job_order, input_basedir, **kwargs):
- events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+
+ kwargs["fs_access"] = CollectionFsAccess()
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list