[ARVADOS] created: 20d3523229805ee800ec11bf4ab6e41c4e18eea6
Git user
git at public.curoverse.com
Wed Nov 16 15:38:39 EST 2016
at 20d3523229805ee800ec11bf4ab6e41c4e18eea6 (commit)
commit 20d3523229805ee800ec11bf4ab6e41c4e18eea6
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Nov 16 15:06:02 2016 -0500
10529: Improve handling errors raised when collecting outputs. If state
polling thread crashes, terminate executor.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index b3d47dd..8e3f01e 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -139,34 +139,43 @@ class ArvCwlRunner(object):
Runs in a separate thread.
"""
- while True:
- self.stop_polling.wait(15)
- if self.stop_polling.is_set():
- break
- with self.lock:
- keys = self.processes.keys()
- if not keys:
- continue
+ try:
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
- if self.work_api == "containers":
- table = self.poll_api.containers()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- continue
-
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+ except:
+ logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+ self.cond.acquire()
+ self.processes.clear()
+ self.cond.notify()
+ self.cond.release()
+ finally:
+ self.stop_polling.set()
def get_uploaded(self):
return self.uploaded.copy()
@@ -374,6 +383,10 @@ class ArvCwlRunner(object):
loopperf.__enter__()
for runnable in jobiter:
loopperf.__exit__()
+
+ if self.stop_polling.is_set():
+ break
+
if runnable:
with Perf(metrics, "run"):
runnable.run(**kwargs)
@@ -395,7 +408,7 @@ class ArvCwlRunner(object):
if sys.exc_info()[0] is KeyboardInterrupt:
logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index e7cd617..1fda412 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -151,10 +151,17 @@ class ArvadosContainer(object):
if record["output"]:
outputs = done.done(self, record, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
- logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting output for container %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
+ logger.exception("Got unknown exception while collecting output for container %s:", self.name)
+ processStatus = "permanentFail"
+
+ # Note: Currently, on error output_callback is expecting an empty dict,
+ # anything else will fail.
+ if not isinstance(outputs, dict):
+ logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+ outputs = {}
processStatus = "permanentFail"
self.output_callback(outputs, processStatus)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 4db23b9..b706d9f 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -139,7 +139,7 @@ class ArvadosJob(object):
with Perf(metrics, "done %s" % self.name):
self.done(response)
except Exception as e:
- logger.error("Got error %s" % str(e))
+ logger.exception("Job %s error" % (self.name))
self.output_callback({}, "permanentFail")
def update_pipeline_component(self, record):
@@ -204,13 +204,18 @@ class ArvadosJob(object):
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- outputs = None
except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
+ logger.exception("Got unknown exception while collecting output for job %s:", self.name)
+ processStatus = "permanentFail"
+
+ # Note: Currently, on error output_callback is expecting an empty dict,
+ # anything else will fail.
+ if not isinstance(outputs, dict):
+ logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+ outputs = {}
processStatus = "permanentFail"
- outputs = None
self.output_callback(outputs, processStatus)
finally:
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 5cc447e..3bbcb8b 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -195,7 +195,7 @@ class Runner(object):
else:
processStatus = "permanentFail"
- outputs = None
+ outputs = {}
try:
try:
self.final_output = record["output"]
@@ -212,7 +212,7 @@ class Runner(object):
adjustFileObjs(outputs, keepify)
adjustDirObjs(outputs, keepify)
except Exception as e:
- logger.error("While getting final output object: %s", e)
+ logger.exception("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
del self.arvrunner.processes[record["uuid"]]
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list