[ARVADOS] updated: 1.2.0-12-g01693774d
Git user
git at public.curoverse.com
Fri Sep 7 16:57:54 EDT 2018
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 123 +++++++++++++++++++++++++-----------
sdk/cwl/arvados_cwl/arvcontainer.py | 5 +-
sdk/cwl/tests/test_container.py | 6 +-
sdk/cwl/tests/test_submit.py | 4 ++
4 files changed, 97 insertions(+), 41 deletions(-)
via 01693774dffd5ad5dd5313f24a5933f44b0e069d (commit)
via 77001923c90a319a1923f56e624c1048ede41542 (commit)
from 4a466ccc5f447e7284c2d479ea3ee427876896e0 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 01693774dffd5ad5dd5313f24a5933f44b0e069d
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Fri Sep 7 17:57:21 2018 -0300
13773: Intercept logging calls to update runtime_status on runner containers.
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index b73b768fb..e1bbcc2a7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -38,7 +38,7 @@ import arvados.commands._util as arv_cmd
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@ -46,6 +46,7 @@ from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
from .context import ArvLoadingContext, ArvRuntimeContext
+from .util import get_current_container
from ._version import __version__
from cwltool.pack import pack
@@ -65,6 +66,38 @@ arvados.log_handler.setFormatter(logging.Formatter(
DEFAULT_PRIORITY = 500
+class RuntimeStatusLoggingHandler(logging.Handler):
+ """
+ Intercepts logging calls and report them as runtime statuses on runner
+ containers.
+ """
+ def __init__(self, runtime_status_update_func):
+ super(RuntimeStatusLoggingHandler, self).__init__()
+ self.runtime_status_update = runtime_status_update_func
+
+ def emit(self, record):
+ kind = None
+ if record.levelno == logging.ERROR:
+ kind = 'error'
+ elif record.levelno == logging.WARNING:
+ kind = 'warning'
+ elif record.levelno == logging.INFO:
+ kind = 'activity'
+ if kind is not None:
+ log_msg = record.getMessage()
+ if '\n' in log_msg:
+ # If the logged message is multi-line, include it as a detail
+ self.runtime_status_update(
+ kind,
+ "%s from %s (please see details)" % (kind, record.name),
+ log_msg
+ )
+ else:
+ self.runtime_status_update(
+ kind,
+ "%s: %s" % (record.name, record.getMessage())
+ )
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
@@ -155,6 +188,12 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
self.loadingContext.construct_tool_object = self.arv_make_tool
+ # Add a custom logging handler to the root logger for runtime status reporting
+ # if running inside a container
+ if get_current_container(self.api, self.num_retries, logger):
+ root_logger = logging.getLogger('')
+ handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
+ root_logger.addHandler(handler)
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
@@ -203,12 +242,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
activity statuses.
"""
with self.workflow_eval_lock:
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
runtime_status = current.get('runtime_status', {})
# In case of status being an error, only report the first one.
@@ -221,10 +256,14 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
# Further errors are only mentioned as a count.
else:
# Get anything before an optional 'and N more' string.
- error_msg = re.match(
- r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ try:
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ except TypeError:
+ # Ignore tests stubbing errors
+ return
if more_failures:
failure_qty = int(more_failures.groups()[0])
runtime_status.update({
@@ -438,12 +477,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
def set_crunch_output(self):
if self.work_api == "containers":
- try:
- current = self.api.containers().current().execute(num_retries=self.num_retries)
- except ApiError as e:
- # Status code 404 just means we're not running in a container.
- if e.resp.status != 404:
- logger.info("Getting current container: %s", e)
+ current = get_current_container(self.api, self.num_retries, logger)
+ if current is None:
return
try:
self.api.containers().update(uuid=current['uuid'],
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 8875b7d95..f718a86b3 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -48,6 +48,7 @@ def stubs(func):
stubs.keep_client = keep_client2
stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+ stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
stubs.api = mock.MagicMock()
stubs.api._rootDesc = get_rootDesc()
@@ -56,6 +57,9 @@ def stubs(func):
"uuid": stubs.fake_user_uuid,
}
stubs.api.collections().list().execute.return_value = {"items": []}
+ stubs.api.containers().current().execute.return_value = {
+ "uuid": stubs.fake_container_uuid,
+ }
class CollectionExecute(object):
def __init__(self, exe):
commit 77001923c90a319a1923f56e624c1048ede41542
Author: Lucas Di Pentima <ldipentima at veritasgenetics.com>
Date: Fri Sep 7 14:04:02 2018 -0300
13773: Update runtime_status_error() method to report warning & activity status
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7f56eac8c..b73b768fb 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -194,15 +194,14 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
self.task_queue.add(partial(j.done, record))
del self.processes[uuid]
- def runtime_status_error(self, child_label, child_uuid, error_log):
+ def runtime_status_update(self, kind, message, detail=None):
"""
- Called from a failing child container. Records the first child error
- on this runner's runtime_status field.
- On subsequent errors, updates the 'error' key to show how many additional
- failures happened.
+ Updates the runtime_status field on the runner container.
+ Called from a failing child container: records the first child error
+ or updates the error count on subsequent error statuses.
+ Also called from other parts that need to report errros, warnings or just
+ activity statuses.
"""
- error_msg = "%s %s failed" % (child_label, child_uuid)
- logger.info(error_msg)
with self.workflow_eval_lock:
try:
current = self.api.containers().current().execute(num_retries=self.num_retries)
@@ -212,34 +211,49 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
logger.info("Getting current container: %s", e)
return
runtime_status = current.get('runtime_status', {})
- # Save first fatal error
- if not runtime_status.get('error'):
- runtime_status.update({
- 'error': error_msg,
- 'errorDetail': error_log or "No error logs available"
- })
- # Further errors are only mentioned as a count
- else:
- error_msg = re.match(
- r'^(.*failed)\s*\(?', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(.*(\d+) more\)', runtime_status.get('error'))
- if more_failures:
- failure_qty = int(more_failures.groups()[0])
+ # In case of status being an error, only report the first one.
+ if kind == 'error':
+ if not runtime_status.get('error'):
runtime_status.update({
- 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ 'error': message,
+ 'errorDetail': detail or "No error logs available"
})
+ # Further errors are only mentioned as a count.
else:
+ # Get anything before an optional 'and N more' string.
+ error_msg = re.match(
+ r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
+ more_failures = re.match(
+ r'.*\(and (\d+) more\)', runtime_status.get('error'))
+ if more_failures:
+ failure_qty = int(more_failures.groups()[0])
+ runtime_status.update({
+ 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
+ })
+ else:
+ runtime_status.update({
+ 'error': "%s (and 1 more)" % error_msg
+ })
+ elif kind in ['warning', 'activity']:
+ # Record the last warning/activity status without regard of
+ # previous occurences.
+ runtime_status.update({
+ kind: message
+ })
+ if detail is not None:
runtime_status.update({
- 'error': "%s (and 1 more)" % error_msg
+ kind+"Detail": detail
})
+ else:
+ # Ignore any other status kind
+ return
try:
self.api.containers().update(uuid=current['uuid'],
body={
'runtime_status': runtime_status,
}).execute(num_retries=self.num_retries)
except Exception as e:
- logger.error("Updating runtime_status: %s", e)
+ logger.info("Couldn't update runtime_status: %s", e)
def wrapped_callback(self, cb, obj, st):
with self.workflow_eval_lock:
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index d49b65002..670f54352 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -324,7 +324,10 @@ class ArvadosContainer(JobBase):
error_log = done.logtail(
logc, logger.error,
"%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
- self.arvrunner.runtime_status_error(label, record["uuid"], error_log)
+ self.arvrunner.runtime_status_update(
+ "error",
+ "%s %s failed" % (label, record["uuid"]),
+ error_log)
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
diff --git a/sdk/cwl/tests/test_container.py b/sdk/cwl/tests/test_container.py
index fcc9d1550..9bac3184f 100644
--- a/sdk/cwl/tests/test_container.py
+++ b/sdk/cwl/tests/test_container.py
@@ -542,9 +542,9 @@ class TestContainer(unittest.TestCase):
"modified_at": "2017-05-26T12:01:22Z"
})
- runner.runtime_status_error.assert_called_with(
- '[container testjob]',
- 'zzzzz-xvhdp-zzzzzzzzzzzzzzz',
+ runner.runtime_status_update.assert_called_with(
+ 'error',
+ '[container testjob] zzzzz-xvhdp-zzzzzzzzzzzzzzz failed',
'some error detail'
)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "permanentFail")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list