[ARVADOS] created: f60100cfb2a42ca650d86b421185f8e43843da57

Git user git at public.curoverse.com
Tue Dec 20 11:07:55 EST 2016


        at  f60100cfb2a42ca650d86b421185f8e43843da57 (commit)


commit f60100cfb2a42ca650d86b421185f8e43843da57
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 20 11:07:37 2016 -0500

    10497: Make log formatting more uniform.  Lower last line count to 25.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 85156db..6de355b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -46,6 +46,9 @@ logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 logger.setLevel(logging.INFO)
 
+arvados.log_handler.setFormatter(logging.Formatter(
+        '%(asctime)s %(name)s %(levelname)s: %(message)s',
+        '%Y-%m-%d %H:%M:%S'))
 
 class ArvCwlRunner(object):
     """Execute a CWL tool or workflow, submit work (using either jobs or
@@ -127,7 +130,7 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     with self.lock:
                         j = self.processes[uuid]
-                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        logger.info("%s %s is Running", self.arvrunner.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
@@ -135,14 +138,16 @@ class ArvCwlRunner(object):
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
-                        txt = self.work_api[0].upper() + self.work_api[1:-1]
-                        logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def label(self, obj):
+        return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
     def poll_states(self):
         """Poll status of jobs or containers listed in the processes dict.
 
@@ -656,4 +661,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
                                                          keep_client=keep_client),
-                             resolver=partial(collectionResolver, api_client))
+                             resolver=partial(collectionResolver, api_client),
+                             logger_handler=arvados.log_handler)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 8c984aa..25169ba 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -131,12 +131,12 @@ class ArvadosContainer(object):
             self.uuid = response["uuid"]
             self.arvrunner.processes[self.uuid] = self
 
-            logger.info("Container request %s (%s) state is %s", self.name, response["uuid"], response["state"])
+            logger.info("%s %s state is %s", self.arvrunner.label(self), response["uuid"], response["state"])
 
             if response["state"] == "Final":
                 self.done(response)
         except Exception as e:
-            logger.error("Got error %s" % str(e))
+            logger.error("%s got error %s" % (self.arvrunner.label(self), str(e)))
             self.output_callback({}, "permanentFail")
 
     def done(self, record):
@@ -164,13 +164,17 @@ class ArvadosContainer(object):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger, "[%s] " % self.name)
+                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
 
             outputs = {}
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+        except WorkflowException as e:
+            logger.error("%s unable to collect output from %s:\n%s",
+                         self.arvrunner.label(self), container["output"], e, exc_info=(e if self.arvrunner.debug else False))
+            processStatus = "permanentFail"
         except Exception as e:
-            logger.exception("[%s] While getting output object: %s", self.name, e)
+            logger.exception("%s while getting output object: %s", self.arvrunner.label(self), e)
             self.output_callback({}, "permanentFail")
         else:
             self.output_callback(outputs, processStatus)
@@ -274,7 +278,7 @@ class RunnerContainer(Runner):
         self.uuid = response["uuid"]
         self.arvrunner.processes[self.uuid] = self
 
-        logger.info("Submitted container %s", response["uuid"])
+        logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
 
         if response["state"] == "Final":
             self.done(response)
@@ -285,7 +289,7 @@ class RunnerContainer(Runner):
                 uuid=record["container_uuid"]
             ).execute(num_retries=self.arvrunner.num_retries)
         except Exception as e:
-            logger.exception("While getting runner container: %s", e)
+            logger.exception("%s while getting runner container: %s", self.arvrunner.label(self), e)
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             super(RunnerContainer, self).done(container)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 59fbdc0..f656673 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -136,13 +136,13 @@ class ArvadosJob(object):
 
             self.update_pipeline_component(response)
 
-            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+            logger.info("%s %s is %s", self.arvrunner.label(self), response["uuid"], response["state"])
 
             if response["state"] in ("Complete", "Failed", "Cancelled"):
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
         except Exception as e:
-            logger.exception("Job %s error" % (self.name))
+            logger.exception("%s error" % (self.arvrunner.label(self)))
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
@@ -204,13 +204,14 @@ class ArvadosJob(object):
                                 dirs[g.group(1)] = g.group(2)
 
                     if processStatus == "permanentFail":
-                        done.logtail(logc, logger, "[%s] " % self.name)
+                        done.logtail(logc, logger, self.arvrunner.label(self) + " ")
 
                     with Perf(metrics, "output collection %s" % self.name):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
-                logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("%s unable to collect output from %s:\n%s",
+                             self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
                 logger.exception("Got unknown exception while collecting output for job %s:", self.name)
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index a7728cd..4dd83d2 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -23,8 +23,8 @@ def done(self, record, tmpdir, outdir, keepdir):
 
         if not collections["items"]:
             raise WorkflowException(
-                "Job output '%s' cannot be found on API server" % (
-                    record["output"]))
+                "[job %s] output '%s' cannot be found on API server" % (
+                    self.name, record["output"]))
 
         # Create new collection in the parent project
         # with the output contents.
@@ -43,13 +43,13 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
     self.builder.pathmapper.keepdir = keepdir
     return self.collect_outputs("keep:" + record["output"])
 
-def logtail(logcollection, logger, prefix, maxlen=40):
-    logtail = deque([], maxlen)
+def logtail(logcollection, logger, prefix, maxlen=25):
+    logtail = deque([], maxlen*len(logcollection))
     for log in logcollection.keys():
         with logcollection.open(log) as f:
             for l in f:
                 logtail.append(l)
     if len(logcollection) > 1:
-        logtail = sorted(logtail)
+        logtail = sorted(logtail)[-maxlen:]
     for l in logtail:
-        logger.error("%s%s" % (prefix, l))
+        logger.info("%s%s" % (prefix, l.rstrip()))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 85098b5..cf96c51 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -225,7 +225,7 @@ class Runner(object):
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                done.logtail(logc, logger, "[%s] " % self.name)
+                done.logtail(logc, logger, self.arvrunner.label(self) + " ")
 
             self.final_output = record["output"]
             outc = arvados.collection.CollectionReader(self.final_output,

commit 135d8378048a2b542203e656e2fec041926f196d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 19 16:26:29 2016 -0500

    10497: Print last 40 lines of log when job fails.

diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 25a8ffa..8c984aa 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -159,17 +159,24 @@ class ArvadosContainer(object):
             else:
                 processStatus = "permanentFail"
 
-            outputs = {}
+            if processStatus == "permanentFail":
+                logc = arvados.collection.CollectionReader(container["log"],
+                                                           api_client=self.arvrunner.api,
+                                                           keep_client=self.arvrunner.keep_client,
+                                                           num_retries=self.arvrunner.num_retries)
+                done.logtail(logc, logger, "[%s] " % self.name)
 
+            outputs = {}
             if container["output"]:
-                try:
-                    outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
-                except Exception as e:
-                    logger.error("Got error %s" % str(e))
-                    self.output_callback({}, "permanentFail")
+                outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+        except Exception as e:
+            logger.exception("[%s] While getting output object: %s", self.name, e)
+            self.output_callback({}, "permanentFail")
+        else:
             self.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
 
 
 class RunnerContainer(Runner):
@@ -280,8 +287,8 @@ class RunnerContainer(Runner):
         except Exception as e:
             logger.exception("While getting runner container: %s", e)
             self.arvrunner.output_callback({}, "permanentFail")
-            del self.arvrunner.processes[record["uuid"]]
         else:
             super(RunnerContainer, self).done(container)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index d6055d3..59fbdc0 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -203,6 +203,9 @@ class ArvadosJob(object):
                             if g:
                                 dirs[g.group(1)] = g.group(2)
 
+                    if processStatus == "permanentFail":
+                        done.logtail(logc, logger, "[%s] " % self.name)
+
                     with Perf(metrics, "output collection %s" % self.name):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
@@ -222,8 +225,8 @@ class ArvadosJob(object):
 
             self.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.processes[record["uuid"]]
-
+            if record["uuid"] in self.arvrunner.processes:
+                del self.arvrunner.processes[record["uuid"]]
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index c755565..a7728cd 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -1,4 +1,5 @@
 from cwltool.errors import WorkflowException
+from collections import deque
 
 def done(self, record, tmpdir, outdir, keepdir):
     colname = "Output %s of %s" % (record["output"][0:7], self.name)
@@ -41,3 +42,14 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
     self.builder.outdir = outdir
     self.builder.pathmapper.keepdir = keepdir
     return self.collect_outputs("keep:" + record["output"])
+
+def logtail(logcollection, logger, prefix, maxlen=40):
+    logtail = deque([], maxlen)
+    for log in logcollection.keys():
+        with logcollection.open(log) as f:
+            for l in f:
+                logtail.append(l)
+    if len(logcollection) > 1:
+        logtail = sorted(logtail)
+    for l in logtail:
+        logger.error("%s%s" % (prefix, l))
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index e902140..85098b5 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ import ruamel.yaml as yaml
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
 from ._version import __version__
+from . import done
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -203,39 +204,48 @@ class Runner(object):
         return workflowmapper
 
     def done(self, record):
-        if record["state"] == "Complete":
-            if record.get("exit_code") is not None:
-                if record["exit_code"] == 33:
-                    processStatus = "UnsupportedRequirement"
-                elif record["exit_code"] == 0:
-                    processStatus = "success"
+        try:
+            if record["state"] == "Complete":
+                if record.get("exit_code") is not None:
+                    if record["exit_code"] == 33:
+                        processStatus = "UnsupportedRequirement"
+                    elif record["exit_code"] == 0:
+                        processStatus = "success"
+                    else:
+                        processStatus = "permanentFail"
                 else:
-                    processStatus = "permanentFail"
+                    processStatus = "success"
             else:
-                processStatus = "success"
-        else:
-            processStatus = "permanentFail"
+                processStatus = "permanentFail"
 
-        outputs = {}
-        try:
-            try:
-                self.final_output = record["output"]
-                outc = arvados.collection.CollectionReader(self.final_output,
+            outputs = {}
+
+            if processStatus == "permanentFail":
+                logc = arvados.collection.CollectionReader(record["log"],
                                                            api_client=self.arvrunner.api,
                                                            keep_client=self.arvrunner.keep_client,
                                                            num_retries=self.arvrunner.num_retries)
-                if "cwl.output.json" in outc:
-                    with outc.open("cwl.output.json") as f:
-                        if f.size() > 0:
-                            outputs = json.load(f)
-                def keepify(fileobj):
-                    path = fileobj["location"]
-                    if not path.startswith("keep:"):
-                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
-                adjustFileObjs(outputs, keepify)
-                adjustDirObjs(outputs, keepify)
-            except Exception as e:
-                logger.exception("While getting final output object: %s", e)
+                done.logtail(logc, logger, "[%s] " % self.name)
+
+            self.final_output = record["output"]
+            outc = arvados.collection.CollectionReader(self.final_output,
+                                                       api_client=self.arvrunner.api,
+                                                       keep_client=self.arvrunner.keep_client,
+                                                       num_retries=self.arvrunner.num_retries)
+            if "cwl.output.json" in outc:
+                with outc.open("cwl.output.json") as f:
+                    if f.size() > 0:
+                        outputs = json.load(f)
+            def keepify(fileobj):
+                path = fileobj["location"]
+                if not path.startswith("keep:"):
+                    fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+            adjustFileObjs(outputs, keepify)
+            adjustDirObjs(outputs, keepify)
+        except Exception as e:
+            logger.exception("[%s] While getting final output object: %s", self.name, e)
+            self.arvrunner.output_callback({}, "permanentFail")
+        else:
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
             if record["uuid"] in self.arvrunner.processes:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list