[ARVADOS] created: 598ebc54f5d611ca57e281b359d6c26b297562b5

Git user git at public.curoverse.com
Wed Dec 28 10:55:55 EST 2016


        at  598ebc54f5d611ca57e281b359d6c26b297562b5 (commit)


commit 598ebc54f5d611ca57e281b359d6c26b297562b5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Dec 28 10:55:46 2016 -0500

    10497: Don't prefix every line of log from job.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 6d8bf82..6f32088 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -130,7 +130,7 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     with self.lock:
                         j = self.processes[uuid]
-                        logger.info("%s %s is Running", self.arvrunner.label(j), uuid)
+                        logger.info("%s %s is Running", self.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 5f96ac6..a0c2e59 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -162,7 +162,7 @@ class ArvadosContainer(object):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
+                done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
 
             outputs = {}
             if container["output"]:
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index f656673..780d6bd 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -204,7 +204,7 @@ class ArvadosJob(object):
                                 dirs[g.group(1)] = g.group(2)
 
                     if processStatus == "permanentFail":
-                        done.logtail(logc, logger, self.arvrunner.label(self) + " ")
+                        done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
 
                     with Perf(metrics, "output collection %s" % self.name):
                         outputs = done.done(self, record, dirs["tmpdir"],
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index 4dd83d2..59b48e3 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -43,7 +43,7 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
     self.builder.pathmapper.keepdir = keepdir
     return self.collect_outputs("keep:" + record["output"])
 
-def logtail(logcollection, logger, prefix, maxlen=25):
+def logtail(logcollection, logger, header, maxlen=25):
     logtail = deque([], maxlen*len(logcollection))
     for log in logcollection.keys():
         with logcollection.open(log) as f:
@@ -51,5 +51,5 @@ def logtail(logcollection, logger, prefix, maxlen=25):
                 logtail.append(l)
     if len(logcollection) > 1:
         logtail = sorted(logtail)[-maxlen:]
-    for l in logtail:
-        logger.info("%s%s" % (prefix, l.rstrip()))
+    logtxt = "\n  ".join(l.strip() for l in logtail)
+    logger.info("%s\n  %s", header, logtxt)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index cf96c51..eaa69c4 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -225,7 +225,7 @@ class Runner(object):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
+                done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self))
 
             self.final_output = record["output"]
             outc = arvados.collection.CollectionReader(self.final_output,

commit 9e87fdd399d058acc26e8a17c08fef150fa441ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Dec 28 10:54:37 2016 -0500

    10497: Fix crunch script to construct toolpath before it is turned into a File
    object.

diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index f856108..570f686 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -39,6 +39,7 @@ def run():
     runner = None
     try:
         job_order_object = arvados.current_job()['script_parameters']
+        toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
 
         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
 
@@ -81,7 +82,6 @@ def run():
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
                                           output_name=output_name, output_tags=output_tags)
 
-        toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
         t = load_tool(toolpath, runner.arv_make_tool)
 
         args = argparse.Namespace()

commit 9219290d93407b00023828328bbdb82788f9a78b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Dec 28 10:31:49 2016 -0500

    10497: Add --no-log-timestamps to eliminate redundant timestamps when running in Arvados job or container.
    
    Conflicts:
    	sdk/cwl/setup.py

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 6d2afc6..6d8bf82 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -559,6 +559,12 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
+                        default=True, dest="log_timestamps")
+    exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
+                        default=True, dest="log_timestamps")
+
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
@@ -643,6 +649,13 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
         metrics.setLevel(logging.DEBUG)
         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
 
+    if arvargs.log_timestamps:
+        arvados.log_handler.setFormatter(logging.Formatter(
+            '%(asctime)s %(name)s %(levelname)s: %(message)s',
+            '%Y-%m-%d %H:%M:%S'))
+    else:
+        arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+
     arvargs.conformance_test = None
     arvargs.use_container = True
     arvargs.relax_path_checks = True
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 609d8c2..5f96ac6 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -245,7 +245,7 @@ class RunnerContainer(Runner):
             }
             container_req["properties"]["template_uuid"] = wfuuid
 
-        command = ["arvados-cwl-runner", "--local", "--api=containers"]
+        command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
         if self.output_name:
             command.append("--output-name=" + self.output_name)
 
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index 9804572..f856108 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -26,6 +26,9 @@ from cwltool.errors import WorkflowException
 logger = logging.getLogger('arvados.cwl-runner')
 
 def run():
+    # Timestamps are added by crunch-job, so don't print redundant timestamps.
+    arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+
     # Print package versions
     logger.info(arvados_cwl.versionstring())
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index a39972b..cdeb6df 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -195,7 +195,7 @@ def stubs(func):
             },
             'state': 'Committed',
             'owner_uuid': None,
-            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'output_path': '/var/spool/cwl',
@@ -446,7 +446,8 @@ class TestSubmit(unittest.TestCase):
         except:
             logging.exception("")
 
-        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                                  '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -468,7 +469,9 @@ class TestSubmit(unittest.TestCase):
         except:
             logging.exception("")
 
-        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-name="+output_name, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                                  "--output-name="+output_name, '--enable-reuse',
+                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -490,7 +493,9 @@ class TestSubmit(unittest.TestCase):
         except:
             logging.exception("")
 
-        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-tags="+output_tags, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
+        stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                                  "--output-tags="+output_tags, '--enable-reuse',
+                                                  '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         stubs.api.container_requests().create.assert_called_with(
@@ -571,7 +576,8 @@ class TestSubmit(unittest.TestCase):
             'output_path': '/var/spool/cwl',
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
-            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                        '--enable-reuse', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
@@ -658,7 +664,8 @@ class TestSubmit(unittest.TestCase):
             'output_path': '/var/spool/cwl',
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
-            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+            'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                        '--enable-reuse', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,

commit b22ca9590233df0ab0e8f41ed53ff27481c139e6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 19 16:26:29 2016 -0500

    10497: Print last 25 lines of log when job fails.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index d4af0f9..6d2afc6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -46,6 +46,9 @@ logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
+arvados.log_handler.setFormatter(logging.Formatter(
+        '%(asctime)s %(name)s %(levelname)s: %(message)s',
+        '%Y-%m-%d %H:%M:%S'))
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -127,7 +130,7 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     with self.lock:
                         j = self.processes[uuid]
-                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        logger.info("%s %s is Running", self.arvrunner.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
@@ -135,14 +138,16 @@ class ArvCwlRunner(object):
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
-                        txt = self.work_api[0].upper() + self.work_api[1:-1]
-                        logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def label(self, obj):
+        return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
     def poll_states(self):
         """Poll status of jobs or containers listed in the processes dict.
 
@@ -656,4 +661,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
                                                          keep_client=keep_client),
-                             resolver=partial(collectionResolver, api_client))
+                             resolver=partial(collectionResolver, api_client),
+                             logger_handler=arvados.log_handler)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index dbbd83d..609d8c2 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -129,12 +129,12 @@ class ArvadosContainer(object):
             self.uuid = response["uuid"]
             self.arvrunner.processes[self.uuid] = self
 
-            logger.info("Container request %s (%s) state is %s", self.name, response["uuid"], response["state"])
+            logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
 
             if response["state"] == "Final":
                 self.done(response)
         except Exception as e:
-            logger.error("Got error %s" % str(e))
+            logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
             self.output_callback({}, "permanentFail")
 
     def done(self, record):
@@ -157,17 +157,28 @@ class ArvadosContainer(object):
             else:
                 processStatus = "permanentFail"
 
-            outputs = {}
+            if processStatus == "permanentFail":
+                logc = arvados.collection.CollectionReader(container["log"],
+                                                           api_client=self.arvrunner.api,
+                                                           keep_client=self.arvrunner.keep_client,
+                                                           num_retries=self.arvrunner.num_retries)
+                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
 
+            outputs = {}
             if container["output"]:
-                try:
-                    outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
-                except Exception as e:
-                    logger.error("Got error %s" % str(e))
-                    self.output_callback({}, "permanentFail")
+                outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+        except WorkflowException as e:
+            logger.error("%s unable to collect output from %s:\n%s",
+                         self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
+            processStatus = "permanentFail"
+        except Exception as e:
+            logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
+            self.output_callback({}, "permanentFail")
+        else:
             self.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
 
 
 class RunnerContainer(Runner):
@@ -265,7 +276,7 @@ class RunnerContainer(Runner):
         self.uuid = response["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted container %s", response["uuid"])
+        logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
 
         if response["state"] == "Final":
             self.done(response)
@@ -276,10 +287,10 @@ class RunnerContainer(Runner):
                 uuid=record["container_uuid"]
             ).execute(num_retries=self.arvrunner.num_retries)
         except Exception as e:
-            logger.exception("While getting runner container: %s", e)
+            logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
             self.arvrunner.output_callback({}, "permanentFail")
-            del self.arvrunner.processes[record["uuid"]]
         else:
             super(RunnerContainer, self).done(container)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index d6055d3..f656673 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -136,13 +136,13 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+            logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
 
             if response["state"] in ("Complete", "Failed", "Cancelled"):
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
         except Exception as e:
-            logger.exception("Job %s error" % (self.name))
+            logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
@@ -203,11 +203,15 @@ class ArvadosJob(object):
                             if g:
                                 dirs[g.group(1)] = g.group(2)
 
+                    if processStatus == "permanentFail":
+                        done.logtail(logc, logger, self.arvrunner.label(self) + " ")
+
                     with Perf(metrics, "output collection %s" % self.name):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
-                logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("%s unable to collect output from %s:\n%s",
+                             self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting output for job %s:", self.name)
@@ -222,8 +226,8 @@ class ArvadosJob(object):
 
             self.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
-
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index c755565..4dd83d2 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -1,4 +1,5 @@
 from cwltool.errors import WorkflowException
+from collections import deque
 
 def done(self, record, tmpdir, outdir, keepdir):
     colname = "Output %s of %s" % (record["output"][0:7], self.name)
@@ -22,8 +23,8 @@ def done(self, record, tmpdir, outdir, keepdir):
 
         if not collections["items"]:
             raise WorkflowException(
-                "Job output '%s' cannot be found on API server" % (
-                    record["output"]))
+                "[job %s] output '%s' cannot be found on API server" % (
+                    self.name, record["output"]))
 
         # Create new collection in the parent project
         # with the output contents.
@@ -41,3 +42,14 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
     self.builder.outdir = outdir
     self.builder.pathmapper.keepdir = keepdir
     return self.collect_outputs("keep:" + record["output"])
+
+def logtail(logcollection, logger, prefix, maxlen=25):
+    logtail = deque([], maxlen*len(logcollection))
+    for log in logcollection.keys():
+        with logcollection.open(log) as f:
+            for l in f:
+                logtail.append(l)
+    if len(logcollection) > 1:
+        logtail = sorted(logtail)[-maxlen:]
+    for l in logtail:
+        logger.info("%s%s" % (prefix, l.rstrip()))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index e902140..cf96c51 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ import ruamel.yaml as yaml
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 from ._version import __version__
+from . import done
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -203,39 +204,48 @@ class Runner(object):
         return workflowmapper
 
     def done(self, record):
-        if record["state"] == "Complete":
-            if record.get("exit_code") is not None:
-                if record["exit_code"] == 33:
-                    processStatus = "UnsupportedRequirement"
-                elif record["exit_code"] == 0:
-                    processStatus = "success"
+        try:
+            if record["state"] == "Complete":
+                if record.get("exit_code") is not None:
+                    if record["exit_code"] == 33:
+                        processStatus = "UnsupportedRequirement"
+                    elif record["exit_code"] == 0:
+                        processStatus = "success"
+                    else:
+                        processStatus = "permanentFail"
                 else:
-                    processStatus = "permanentFail"
+                    processStatus = "success"
             else:
-                processStatus = "success"
-        else:
-            processStatus = "permanentFail"
+                processStatus = "permanentFail"
 
-        outputs = {}
-        try:
-            try:
-                self.final_output = record["output"]
-                outc = arvados.collection.CollectionReader(self.final_output,
+            outputs = {}
+
+            if processStatus == "permanentFail":
+                logc = arvados.collection.CollectionReader(record["log"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                if "cwl.output.json" in outc:
-                    with outc.open("cwl.output.json") as f:
-                        if f.size() > 0:
-                            outputs = json.load(f)
-                def keepify(fileobj):
-                    path = fileobj["location"]
-                    if not path.startswith("keep:"):
-                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
-                adjustFileObjs(outputs, keepify)
-                adjustDirObjs(outputs, keepify)
-            except Exception as e:
-                logger.exception("While getting final output object: %s", e)
+                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
+
+            self.final_output = record["output"]
+            outc = arvados.collection.CollectionReader(self.final_output,
+                                                       api_client=self.arvrunner.api,
+                                                       keep_client=self.arvrunner.keep_client,
+                                                       num_retries=self.arvrunner.num_retries)
+            if "cwl.output.json" in outc:
+                with outc.open("cwl.output.json") as f:
+                    if f.size() > 0:
+                        outputs = json.load(f)
+            def keepify(fileobj):
+                path = fileobj["location"]
+                if not path.startswith("keep:"):
+                    fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+            adjustFileObjs(outputs, keepify)
+            adjustDirObjs(outputs, keepify)
+        except Exception as e:
+            logger.exception("[%s] While getting final output object: %s", self.name, e)
+            self.arvrunner.output_callback({}, "permanentFail")
+        else:
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
             if record["uuid"] in self.arvrunner.processes:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list