[ARVADOS] created: 20d3523229805ee800ec11bf4ab6e41c4e18eea6

Git user git at public.curoverse.com
Wed Nov 16 15:38:39 EST 2016


        at  20d3523229805ee800ec11bf4ab6e41c4e18eea6 (commit)


commit 20d3523229805ee800ec11bf4ab6e41c4e18eea6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Nov 16 15:06:02 2016 -0500

    10529: Improve handling errors raised when collecting outputs.  If state
    polling thread crashes, terminate executor.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index b3d47dd..8e3f01e 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -139,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
 
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
 
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -374,6 +383,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -395,7 +408,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index e7cd617..1fda412 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -151,10 +151,17 @@ class ArvadosContainer(object):
                 if record["output"]:
                     outputs = done.done(self, record, "/tmp", self.outdir, "/keep")
             except WorkflowException as e:
-                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting output for container %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting output for container %s:", self.name)
+                processStatus = "permanentFail"
+
+            # Note: Currently, on error output_callback is expecting an empty dict,
+            # anything else will fail.
+            if not isinstance(outputs, dict):
+                logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+                outputs = {}
                 processStatus = "permanentFail"
 
             self.output_callback(outputs, processStatus)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 4db23b9..b706d9f 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -139,7 +139,7 @@ class ArvadosJob(object):
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
         except Exception as e:
-            logger.error("Got error %s" % str(e))
+            logger.exception("Job %s error" % (self.name))
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
@@ -204,13 +204,18 @@ class ArvadosJob(object):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
-                outputs = None
             except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting output for job %s:", self.name)
+                processStatus = "permanentFail"
+
+            # Note: Currently, on error output_callback is expecting an empty dict,
+            # anything else will fail.
+            if not isinstance(outputs, dict):
+                logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+                outputs = {}
                 processStatus = "permanentFail"
-                outputs = None
 
             self.output_callback(outputs, processStatus)
         finally:
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 5cc447e..3bbcb8b 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -195,7 +195,7 @@ class Runner(object):
         else:
             processStatus = "permanentFail"
 
-        outputs = None
+        outputs = {}
         try:
             try:
                 self.final_output = record["output"]
@@ -212,7 +212,7 @@ class Runner(object):
                 adjustFileObjs(outputs, keepify)
                 adjustDirObjs(outputs, keepify)
             except Exception as e:
-                logger.error("While getting final output object: %s", e)
+                logger.exception("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
             del self.arvrunner.processes[record["uuid"]]

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list