[ARVADOS] created: f60100cfb2a42ca650d86b421185f8e43843da57
Git user
git at public.curoverse.com
Tue Dec 20 11:07:55 EST 2016
at f60100cfb2a42ca650d86b421185f8e43843da57 (commit)
commit f60100cfb2a42ca650d86b421185f8e43843da57
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 20 11:07:37 2016 -0500
10497: Make log formatting more uniform. Lower last line count to 25.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 85156db..6de355b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -46,6 +46,9 @@ logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
logger.setLevel(logging.INFO)
+arvados.log_handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
@@ -127,7 +130,7 @@ class ArvCwlRunner(object):
uuid = event["object_uuid"]
with self.lock:
j = self.processes[uuid]
- logger.info("Job %s (%s) is Running", j.name, uuid)
+ logger.info("%s %s is Running", self.arvrunner.label(j), uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
@@ -135,14 +138,16 @@ class ArvCwlRunner(object):
try:
self.cond.acquire()
j = self.processes[uuid]
- txt = self.work_api[0].upper() + self.work_api[1:-1]
- logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
+ def label(self, obj):
+ return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
def poll_states(self):
"""Poll status of jobs or containers listed in the processes dict.
@@ -656,4 +661,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
keep_client=keep_client),
- resolver=partial(collectionResolver, api_client))
+ resolver=partial(collectionResolver, api_client),
+ logger_handler=arvados.log_handler)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 8c984aa..25169ba 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -131,12 +131,12 @@ class ArvadosContainer(object):
self.uuid = response["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("Container request %s (%s) state is %s", self.name, response["uuid"], response["state"])
+ logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
if response["state"] == "Final":
self.done(response)
except Exception as e:
- logger.error("Got error %s" % str(e))
+ logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
self.output_callback({}, "permanentFail")
def done(self, record):
@@ -164,13 +164,17 @@ class ArvadosContainer(object):
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "[%s] " % self.name)
+ done.logtail(logc, logger, self.arvrunner.label(self) + " ")
outputs = {}
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+ except WorkflowException as e:
+ logger.error("%s unable to collect output from %s:\n%s",
+ self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
+ processStatus = "permanentFail"
except Exception as e:
- logger.exception("[%s] While getting output object: %s", self.name, e)
+ logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
self.output_callback({}, "permanentFail")
else:
self.output_callback(outputs, processStatus)
@@ -274,7 +278,7 @@ class RunnerContainer(Runner):
self.uuid = response["uuid"]
self.arvrunner.processes[self.uuid] = self
- logger.info("Submitted container %s", response["uuid"])
+ logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
if response["state"] == "Final":
self.done(response)
@@ -285,7 +289,7 @@ class RunnerContainer(Runner):
uuid=record["container_uuid"]
).execute(num_retries=self.arvrunner.num_retries)
except Exception as e:
- logger.exception("While getting runner container: %s", e)
+ logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
self.arvrunner.output_callback({}, "permanentFail")
else:
super(RunnerContainer, self).done(container)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 59fbdc0..f656673 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -136,13 +136,13 @@ class ArvadosJob(object):
self.update_pipeline_component(response)
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+ logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
if response["state"] in ("Complete", "Failed", "Cancelled"):
with Perf(metrics, "done %s" % self.name):
self.done(response)
except Exception as e:
- logger.exception("Job %s error" % (self.name))
+ logger.exception("%s error" % (self.arvrunner.label(self)))
self.output_callback({}, "permanentFail")
def update_pipeline_component(self, record):
@@ -204,13 +204,14 @@ class ArvadosJob(object):
dirs[g.group(1)] = g.group(2)
if processStatus == "permanentFail":
- done.logtail(logc, logger, "[%s] " % self.name)
+ done.logtail(logc, logger, self.arvrunner.label(self) + " ")
with Perf(metrics, "output collection %s" % self.name):
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("%s unable to collect output from %s:\n%s",
+ self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
except Exception as e:
logger.exception("Got unknown exception while collecting output for job %s:", self.name)
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index a7728cd..4dd83d2 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -23,8 +23,8 @@ def done(self, record, tmpdir, outdir, keepdir):
if not collections["items"]:
raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
+ "[job %s] output '%s' cannot be found on API server" % (
+ self.name, record["output"]))
# Create new collection in the parent project
# with the output contents.
@@ -43,13 +43,13 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
self.builder.pathmapper.keepdir = keepdir
return self.collect_outputs("keep:" + record["output"])
-def logtail(logcollection, logger, prefix, maxlen=40):
- logtail = deque([], maxlen)
+def logtail(logcollection, logger, prefix, maxlen=25):
+ logtail = deque([], maxlen*len(logcollection))
for log in logcollection.keys():
with logcollection.open(log) as f:
for l in f:
logtail.append(l)
if len(logcollection) > 1:
- logtail = sorted(logtail)
+ logtail = sorted(logtail)[-maxlen:]
for l in logtail:
- logger.error("%s%s" % (prefix, l))
+ logger.info("%s%s" % (prefix, l.rstrip()))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 85098b5..cf96c51 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -225,7 +225,7 @@ class Runner(object):
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "[%s] " % self.name)
+ done.logtail(logc, logger, self.arvrunner.label(self) + " ")
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,
commit 135d8378048a2b542203e656e2fec041926f196d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 19 16:26:29 2016 -0500
10497: Print last 40 lines of log when job fails.
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 25a8ffa..8c984aa 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -159,17 +159,24 @@ class ArvadosContainer(object):
else:
processStatus = "permanentFail"
- outputs = {}
+ if processStatus == "permanentFail":
+ logc = arvados.collection.CollectionReader(container["log"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ done.logtail(logc, logger, "[%s] " % self.name)
+ outputs = {}
if container["output"]:
- try:
- outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
+ outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+ except Exception as e:
+ logger.exception("[%s] While getting output object: %s", self.name, e)
+ self.output_callback({}, "permanentFail")
+ else:
self.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.processes[record["uuid"]]
+ if record["uuid"] in self.arvrunner.processes:
+ del self.arvrunner.processes[record["uuid"]]
class RunnerContainer(Runner):
@@ -280,8 +287,8 @@ class RunnerContainer(Runner):
except Exception as e:
logger.exception("While getting runner container: %s", e)
self.arvrunner.output_callback({}, "permanentFail")
- del self.arvrunner.processes[record["uuid"]]
else:
super(RunnerContainer, self).done(container)
finally:
- del self.arvrunner.processes[record["uuid"]]
+ if record["uuid"] in self.arvrunner.processes:
+ del self.arvrunner.processes[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index d6055d3..59fbdc0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -203,6 +203,9 @@ class ArvadosJob(object):
if g:
dirs[g.group(1)] = g.group(2)
+ if processStatus == "permanentFail":
+ done.logtail(logc, logger, "[%s] " % self.name)
+
with Perf(metrics, "output collection %s" % self.name):
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
@@ -222,8 +225,8 @@ class ArvadosJob(object):
self.output_callback(outputs, processStatus)
finally:
- del self.arvrunner.processes[record["uuid"]]
-
+ if record["uuid"] in self.arvrunner.processes:
+ del self.arvrunner.processes[record["uuid"]]
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index c755565..a7728cd 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -1,4 +1,5 @@
from cwltool.errors import WorkflowException
+from collections import deque
def done(self, record, tmpdir, outdir, keepdir):
colname = "Output %s of %s" % (record["output"][0:7], self.name)
@@ -41,3 +42,14 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
self.builder.outdir = outdir
self.builder.pathmapper.keepdir = keepdir
return self.collect_outputs("keep:" + record["output"])
+
+def logtail(logcollection, logger, prefix, maxlen=40):
+ logtail = deque([], maxlen)
+ for log in logcollection.keys():
+ with logcollection.open(log) as f:
+ for l in f:
+ logtail.append(l)
+ if len(logcollection) > 1:
+ logtail = sorted(logtail)
+ for l in logtail:
+ logger.error("%s%s" % (prefix, l))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index e902140..85098b5 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ import ruamel.yaml as yaml
from .arvdocker import arv_docker_get_image
from .pathmapper import ArvPathMapper
from ._version import __version__
+from . import done
logger = logging.getLogger('arvados.cwl-runner')
@@ -203,39 +204,48 @@ class Runner(object):
return workflowmapper
def done(self, record):
- if record["state"] == "Complete":
- if record.get("exit_code") is not None:
- if record["exit_code"] == 33:
- processStatus = "UnsupportedRequirement"
- elif record["exit_code"] == 0:
- processStatus = "success"
+ try:
+ if record["state"] == "Complete":
+ if record.get("exit_code") is not None:
+ if record["exit_code"] == 33:
+ processStatus = "UnsupportedRequirement"
+ elif record["exit_code"] == 0:
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
else:
- processStatus = "permanentFail"
+ processStatus = "success"
else:
- processStatus = "success"
- else:
- processStatus = "permanentFail"
+ processStatus = "permanentFail"
- outputs = {}
- try:
- try:
- self.final_output = record["output"]
- outc = arvados.collection.CollectionReader(self.final_output,
+ outputs = {}
+
+ if processStatus == "permanentFail":
+ logc = arvados.collection.CollectionReader(record["log"],
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- if "cwl.output.json" in outc:
- with outc.open("cwl.output.json") as f:
- if f.size() > 0:
- outputs = json.load(f)
- def keepify(fileobj):
- path = fileobj["location"]
- if not path.startswith("keep:"):
- fileobj["location"] = "keep:%s/%s" % (record["output"], path)
- adjustFileObjs(outputs, keepify)
- adjustDirObjs(outputs, keepify)
- except Exception as e:
- logger.exception("While getting final output object: %s", e)
+ done.logtail(logc, logger, "[%s] " % self.name)
+
+ self.final_output = record["output"]
+ outc = arvados.collection.CollectionReader(self.final_output,
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+ if "cwl.output.json" in outc:
+ with outc.open("cwl.output.json") as f:
+ if f.size() > 0:
+ outputs = json.load(f)
+ def keepify(fileobj):
+ path = fileobj["location"]
+ if not path.startswith("keep:"):
+ fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+ adjustFileObjs(outputs, keepify)
+ adjustDirObjs(outputs, keepify)
+ except Exception as e:
+ logger.exception("[%s] While getting final output object: %s", self.name, e)
+ self.arvrunner.output_callback({}, "permanentFail")
+ else:
self.arvrunner.output_callback(outputs, processStatus)
finally:
if record["uuid"] in self.arvrunner.processes:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list