[ARVADOS] updated: aa998f9f62321b44780923d60692beb6805b0ff5

Git user git at public.curoverse.com
Thu Feb 25 15:25:32 EST 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 88 ++++++++++++++++++++++++-----------------
 sdk/cwl/setup.py                |  3 +-
 2 files changed, 54 insertions(+), 37 deletions(-)

       via  aa998f9f62321b44780923d60692beb6805b0ff5 (commit)
      from  5de2c81394973231568f1116b8c8da4645459ce0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit aa998f9f62321b44780923d60692beb6805b0ff5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 25 15:25:20 2016 -0500

    8488: Fix output collection to accomodate reverse mapping fixes in cwltool.
    Catch errors and mark pipeline as terminated.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0d95ac4..6cfdd0b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,6 +17,7 @@ import fnmatch
 import logging
 import re
 import os
+import sys
 
 from cwltool.process import get_feature
 
@@ -149,7 +150,7 @@ class ArvadosJob(object):
             response = self.arvrunner.api.jobs().create(body={
                 "script": "crunchrunner",
                 "repository": "arvados",
-                "script_version": "master",
+                "script_version": "8488-cwl-crunchrunner-collection",
                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
                 "runtime_constraints": runtime_constraints
             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
@@ -192,7 +193,8 @@ class ArvadosJob(object):
             try:
                 outputs = {}
                 if record["output"]:
-                    outputs = self.collect_outputs("keep:" + record["output"])
+                    self.builder.outdir = "keep:" + record["output"]
+                    outputs = self.collect_outputs(self.builder.outdir)
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
                 processStatus = "permanentFail"
@@ -239,7 +241,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
-        super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
@@ -323,49 +325,63 @@ class ArvCwlRunner(object):
 
                 col.save_new("crunchrunner binary", ensure_unique_name=True)
 
-        self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
-                                                                   "components": {},
-                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
-
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = args.enable_reuse
 
+        kwargs["outdir"] = "$(task.outdir)"
+        kwargs["tmpdir"] = "$(task.tmpdir)"
+
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            jobiter = tool.job(job_order,
-                            input_basedir,
-                            self.output_callback,
-                            **kwargs)
-
-            for runnable in jobiter:
-                if runnable:
-                    with self.lock:
-                        runnable.run(**kwargs)
-                else:
-                    if self.jobs:
-                        try:
-                            self.cond.acquire()
-                            self.cond.wait()
-                        finally:
-                            self.cond.release()
-                    else:
-                        logger.error("Workflow cannot make any more progress.")
-                        break
-
-            while self.jobs:
-                try:
-                    self.cond.acquire()
-                    self.cond.wait()
-                finally:
-                    self.cond.release()
+            self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+                                                                   "components": {},
+                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            events.close()
+            jobiter = tool.job(job_order,
+                               input_basedir,
+                               self.output_callback,
+                               **kwargs)
 
-            if self.final_output is None:
-                raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+            try:
+                for runnable in jobiter:
+                    if runnable:
+                        with self.lock:
+                            runnable.run(**kwargs)
+                    else:
+                        if self.jobs:
+                            try:
+                                self.cond.acquire()
+                                self.cond.wait(1)
+                            except RuntimeError:
+                                pass
+                            finally:
+                                self.cond.release()
+                        else:
+                            logger.error("Workflow cannot make any more progress.")
+                            break
+
+                while self.jobs:
+                    try:
+                        self.cond.acquire()
+                        self.cond.wait(1)
+                    except RuntimeError:
+                        pass
+                    finally:
+                        self.cond.release()
+
+                events.close()
+
+                if self.final_output is None:
+                    raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+
+            except:
+                if sys.exc_info()[0] is not KeyboardInterrupt:
+                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
 
             return self.final_output
 
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 65ae16b..cf9619c 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20160129152024',
+          #'cwltool>=1.0.20160225040942',
+          'cwltool',
           'arvados-python-client>=0.1.20160122132348'
       ],
       zip_safe=True,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list