[ARVADOS] updated: d6c21313094521040c9b013172c5ebe136341c03
git at public.curoverse.com
git at public.curoverse.com
Fri Jan 29 10:22:23 EST 2016
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 46 +++++++++++++++++++++++++++++++++++------
sdk/cwl/setup.py | 4 ++--
2 files changed, 42 insertions(+), 8 deletions(-)
via d6c21313094521040c9b013172c5ebe136341c03 (commit)
from 1ca3b77ff42e1bfa0076a27bba1daa0406655082 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d6c21313094521040c9b013172c5ebe136341c03
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 29 10:22:16 2016 -0500
8319: Fixes to run bcbio-nextgen CWL workflow. Now creates pipeline instance
so that progress is visible on workbench.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index cf18894..3baa9a6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -8,6 +8,7 @@ import arvados.commands.run
import cwltool.draft2tool
import cwltool.workflow
import cwltool.main
+from cwltool.process import shortname
import threading
import cwltool.docker
import fnmatch
@@ -144,11 +145,17 @@ class ArvadosJob(object):
"script_version": "master",
"script_parameters": {"tasks": [script_parameters]},
"runtime_constraints": runtime_constraints
- }, find_or_create=kwargs.get("enable_reuse", True)).execute()
+ }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
self.arvrunner.jobs[response["uuid"]] = self
- logger.info("Job %s is %s", response["uuid"], response["state"])
+ self.arvrunner.pipeline["components"][self.name] = {"job": response}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
+
+ logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
if response["state"] in ("Complete", "Failed", "Cancelled"):
self.done(response)
@@ -156,9 +163,20 @@ class ArvadosJob(object):
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
+ def update_pipeline_component(self, record):
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
def done(self, record):
try:
+ self.update_pipeline_component(record)
+ except:
+ pass
+
+ try:
if record["state"] == "Complete":
processStatus = "success"
else:
@@ -232,6 +250,7 @@ class ArvCwlRunner(object):
self.cond = threading.Condition(self.lock)
self.final_output = None
self.uploaded = {}
+ self.num_retries = 4
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
@@ -242,22 +261,33 @@ class ArvCwlRunner(object):
def output_callback(self, out, processStatus):
if processStatus == "success":
logger.info("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+
else:
logger.warn("Overall job status is %s", processStatus)
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_output = out
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.jobs and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
- logger.info("Job %s is Running", event["object_uuid"])
+ uuid = event["object_uuid"]
with self.lock:
- self.jobs[event["object_uuid"]].running = True
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is Running", j.name, uuid)
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
+ uuid = event["object_uuid"]
try:
self.cond.acquire()
- self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
+ j = self.jobs[uuid]
+ logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
@@ -271,6 +301,10 @@ class ArvCwlRunner(object):
def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
self.fs_access = CollectionFsAccess(input_basedir)
kwargs["fs_access"] = self.fs_access
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index bcf6b96..65ae16b 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,8 +30,8 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20151026181844',
- 'arvados-python-client>=0.1.20151023214338'
+ 'cwltool>=1.0.20160129152024',
+ 'arvados-python-client>=0.1.20160122132348'
],
zip_safe=True,
cmdclass={'egg_info': tagger},
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list