[ARVADOS] updated: 0effb9301167df51d071e52562b3acfe7513498b

git at public.curoverse.com git at public.curoverse.com
Tue Jun 30 13:52:08 EDT 2015


Summary of changes:
 sdk/python/arvados/commands/cwl_runner.py | 58 +++++++++++++++++++++----------
 1 file changed, 39 insertions(+), 19 deletions(-)

       via  0effb9301167df51d071e52562b3acfe7513498b (commit)
      from  9d209cb34089febeaadeab572a1b4c8d9d485741 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0effb9301167df51d071e52562b3acfe7513498b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jun 30 13:52:33 2015 -0400

    6264: Passes 29/35 conformance tests.  Some debugging and feature addition remains.

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 19fca27..b03f5ad 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -19,6 +19,9 @@ from cwltool.process import get_feature
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
+del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("EnvVarRequirement")]
+del cwltool.draft2tool.supportedProcessRequirements[cwltool.draft2tool.supportedProcessRequirements.index("CreateFileRequirement")]
+
 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -64,22 +67,23 @@ class CollectionFsAccess(object):
         return ret
 
     def glob(self, pattern):
-        collection, rest = self.get_path(pattern)
+        collection, rest = self.get_collection(pattern)
         patternsegments = rest.split("/")
         return self._match(collection, patternsegments, collection.manifest_locator())
 
     def open(self, fn, mode):
-        collection, rest = self.get_path(fn)
-        return c.open(rest, mode)
+        collection, rest = self.get_collection(fn)
+        return collection.open(rest, mode)
 
     def exists(self, fn):
-        collection, rest = self.get_path(fn)
-        return c.exists(rest)
+        collection, rest = self.get_collection(fn)
+        return collection.exists(rest)
 
 
 class ArvadosJob(object):
     def __init__(self, runner):
         self.arvrunner = runner
+        self.running = False
 
     def run(self, dry_run=False, pull_image=True, **kwargs):
         script_parameters = {
@@ -88,7 +92,7 @@ class ArvadosJob(object):
         runtime_constraints = {}
 
         if self.stdin:
-            script_parameters["task.stdin"] = self.stdin
+            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
 
         if self.stdout:
             script_parameters["task.stdout"] = self.stdout
@@ -106,18 +110,27 @@ class ArvadosJob(object):
             "runtime_constraints": runtime_constraints
         }).execute()
 
+        logger.info("Submitted job %s", response["uuid"])
+
         self.arvrunner.jobs[response["uuid"]] = self
 
     def done(self, record):
-        outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
-
-        if record["state"] == "Complete":
-            processStatus = "success"
-        else:
-            processStatus = "permanentFail"
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
 
-        self.output_callback(outputs, processStatus)
+            try:
+                outputs = {}
+                outputs = self.collect_outputs(record["output"])
+            except Exception as e:
+                logger.warn(str(e))
+                processStatus = "permanentFail"
 
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
@@ -147,6 +160,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
             self._pathmap[src] = (ab, st.fn)
 
 
+
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
@@ -173,26 +187,32 @@ class ArvCwlRunner(object):
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
-    def output_callback(out, processStatus):
+    def output_callback(self, out, processStatus):
         if processStatus == "success":
-            _logger.info("Overall job status is %s", processStatus)
+            logger.info("Overall job status is %s", processStatus)
         else:
-            _logger.warn("Overall job status is %s", processStatus)
+            logger.warn("Overall job status is %s", processStatus)
         self.final_output = out
 
     def on_message(self, event):
         if "object_uuid" in event:
                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+                        logger.info("Job %s is Running", event["object_uuid"])
+                        self.jobs[event["object_uuid"]].running = True
+                    elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                        logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
                         try:
                             self.cond.acquire()
-                            self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+                            self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
                             self.cond.notify()
                         finally:
                             self.cond.release()
 
     def arvExecutor(self, t, job_order, input_basedir, **kwargs):
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
+        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+
+        kwargs["fs_access"] = CollectionFsAccess()
 
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list