[ARVADOS] updated: cf1b3bd169be19d3bfe57290a74e8293c1890f51
git at public.curoverse.com
git at public.curoverse.com
Thu Dec 24 15:59:06 EST 2015
Summary of changes:
services/api/lib/crunch_dispatch.rb | 83 ++++++++++++++++----
services/api/script/cancel_stale_jobs.rb | 44 -----------
services/api/script/fail-jobs.rb | 18 +++++
services/api/test/fixtures/files/proc_stat | 14 ++++
services/api/test/unit/fail_jobs_test.rb | 79 +++++++++++++++++++
services/keepproxy/keepproxy_test.go | 2 +-
.../crunchstat_summary/chartjs.js | 15 +++-
.../crunchstat_summary/chartjs.py | 33 +++++---
.../crunchstat_summary/command.py | 14 +++-
.../crunchstat_summary/summarizer.py | 89 ++++++++++++++++++----
10 files changed, 304 insertions(+), 87 deletions(-)
delete mode 100755 services/api/script/cancel_stale_jobs.rb
create mode 100755 services/api/script/fail-jobs.rb
create mode 100644 services/api/test/fixtures/files/proc_stat
create mode 100644 services/api/test/unit/fail_jobs_test.rb
discards 6e704b98eb77e0a35dcfef1c8302f9ac14b4d98e (commit)
discards e8344fa1f4b54edbfe181596511bcdddc1194991 (commit)
discards be854bfb8bab5849acb09fcb5acbeeac7300ca5c (commit)
discards a7e2a2a7c0915de92345acbc209f5d941bf22b0a (commit)
discards ef6352c883eb5ab027e8bf3e27477d7f9102c286 (commit)
via cf1b3bd169be19d3bfe57290a74e8293c1890f51 (commit)
via 0ef97bbd65b35e69037c49562f82da05aff4c45c (commit)
via 368240a36ddec56a9797e0a559fb01b91e629219 (commit)
via 6f5082f86c8143917305ac6bb7f9559d3e364597 (commit)
via 610272beac73af1b4880bfad9f1e23d86910dee8 (commit)
via f878293dfe6c4a20312ff2db658dabc27ed60a8f (commit)
via d51793b5657cbdb165fb10d520282134d8ce3f33 (commit)
via 378e6e0cd313541c395893e832e82a85856d5105 (commit)
via b3b9aeee4dba20bcddd8cb4ee2cdcd3c8a34eaec (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (6e704b98eb77e0a35dcfef1c8302f9ac14b4d98e)
\
N -- N -- N (cf1b3bd169be19d3bfe57290a74e8293c1890f51)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cf1b3bd169be19d3bfe57290a74e8293c1890f51
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Dec 24 13:51:35 2015 -0500
7883: Add option to include stats from child jobs.
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index 056d8df..624d67d 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -30,6 +30,9 @@ class ArgumentParser(argparse.ArgumentParser):
self.add_argument(
'--debug', action='store_true',
help='Write debug messages to stderr')
+ self.add_argument(
+ '--include-child-jobs', action='store_true',
+ help='Include stats from child jobs')
class Command(object):
@@ -41,18 +44,21 @@ class Command(object):
logger.setLevel(logging.INFO)
def run(self):
+ kwargs = {
+ 'include_child_jobs': self.args.include_child_jobs,
+ }
if self.args.pipeline_instance:
- self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance)
+ self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
elif self.args.job:
- self.summer = summarizer.JobSummarizer(self.args.job)
+ self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
fh = gzip.open(self.args.log_file)
else:
fh = open(self.args.log_file)
- self.summer = summarizer.Summarizer(fh)
+ self.summer = summarizer.Summarizer(fh, **kwargs)
else:
- self.summer = summarizer.Summarizer(sys.stdin)
+ self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
return self.summer.run()
def report(self):
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 6396b5d..b976003 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -28,15 +28,15 @@ class Task(object):
class Summarizer(object):
existing_constraints = {}
- def __init__(self, logdata, label=None):
+ def __init__(self, logdata, label=None, include_child_jobs=True):
self._logdata = logdata
+ logger.debug("%s: logdata %s", self.label, repr(logdata))
+
self.label = label
self.starttime = None
self.finishtime = None
- logger.debug("%s: logdata %s", self.label, repr(logdata))
+ self._include_child_jobs = include_child_jobs
- def run(self):
- logger.debug("%s: parsing log data", self.label)
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
functools.partial(collections.defaultdict,
@@ -44,19 +44,50 @@ class Summarizer(object):
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
+
+ self.seq_to_uuid = {}
self.tasks = collections.defaultdict(Task)
+
+ def run(self):
+ logger.debug("%s: parsing log data", self.label)
for line in self._logdata:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
+
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
elapsed = int(m.group('elapsed'))
self.task_stats[task_id]['time'] = {'elapsed': elapsed}
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
+
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if not self._include_child_jobs:
+ logger.warning('%s: omitting %s (try --include-child-job)',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = JobSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
+
if self.label is None:
self.label = m.group('job_uuid')
logger.debug('%s: using job uuid as label', self.label)
@@ -65,7 +96,7 @@ class Summarizer(object):
continue
elif m.group('category') == 'error':
continue
- task_id = m.group('seq')
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
task = self.tasks[task_id]
# Use the first and last crunchstat timestamps as
@@ -126,6 +157,8 @@ class Summarizer(object):
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
+ logger.debug('%s: done parsing', self.label)
+
self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
for task_id, task_stat in self.task_stats.iteritems():
@@ -135,6 +168,7 @@ class Summarizer(object):
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
self.job_tot[category][stat] += val
+ logger.debug('%s: done totals', self.label)
def long_label(self):
label = self.label
@@ -169,6 +203,9 @@ class Summarizer(object):
tot = self._format(self.job_tot[category].get(stat, '-'))
yield "\t".join([category, stat, str(val), max_rate, tot])
for args in (
+ ('Number of tasks: {}',
+ len(self.tasks),
+ None),
('Max CPU time spent by a single task: {}s',
self.stats_max['cpu']['user+sys'],
None),
@@ -249,7 +286,8 @@ class Summarizer(object):
class CollectionSummarizer(Summarizer):
- def __init__(self, collection_id):
+ def __init__(self, collection_id, **kwargs):
+ logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
if len(filenames) != 1:
@@ -257,12 +295,12 @@ class CollectionSummarizer(Summarizer):
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]))
+ collection.open(filenames[0]), **kwargs)
self.label = collection_id
class JobSummarizer(CollectionSummarizer):
- def __init__(self, job):
+ def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, str):
self.job = arv.jobs().get(uuid=job).execute()
@@ -274,12 +312,12 @@ class JobSummarizer(CollectionSummarizer):
raise ValueError(
"job {} has no log; live summary not implemented".format(
self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'])
+ super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
self.label = self.job['uuid']
class PipelineSummarizer():
- def __init__(self, pipeline_instance_uuid):
+ def __init__(self, pipeline_instance_uuid, **kwargs):
arv = arvados.api('v1', model=OrderedJsonModel())
instance = arv.pipeline_instances().get(
uuid=pipeline_instance_uuid).execute()
@@ -295,7 +333,7 @@ class PipelineSummarizer():
else:
logger.info(
"%s: logdata %s", cname, component['job']['log'])
- summarizer = JobSummarizer(component['job'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
summarizer.label = cname
self.summarizers[cname] = summarizer
self.label = pipeline_instance_uuid
commit 0ef97bbd65b35e69037c49562f82da05aff4c45c
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 23 14:09:37 2015 -0500
7883: Generate multiple sets of charts when data source is a pipeline instance.
diff --git a/tools/crunchstat-summary/crunchstat_summary/__init__.py b/tools/crunchstat-summary/crunchstat_summary/__init__.py
index e69de29..c10988d 100644
--- a/tools/crunchstat-summary/crunchstat_summary/__init__.py
+++ b/tools/crunchstat-summary/crunchstat_summary/__init__.py
@@ -0,0 +1,4 @@
+import logging
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.js b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
index 3106e62..72ff7a4 100644
--- a/tools/crunchstat-summary/crunchstat_summary/chartjs.js
+++ b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
@@ -1,11 +1,27 @@
window.onload = function() {
- var options = {};
- chartData.forEach(function(data, idx) {
- var div = document.createElement('div');
- div.setAttribute('id', 'chart-'+idx);
- div.setAttribute('style', 'width: 100%; height: 150px');
- document.body.appendChild(div);
- var chart = new CanvasJS.Chart('chart-'+idx, data);
- chart.render();
+ var charts = {};
+ sections.forEach(function(section, section_idx) {
+ var h1 = document.createElement('h1');
+ h1.appendChild(document.createTextNode(section.label));
+ document.body.appendChild(h1);
+ section.charts.forEach(function(data, chart_idx) {
+ // Skip chart if every series has zero data points
+ if (0 == data.data.reduce(function(len, series) {
+ return len + series.dataPoints.length;
+ }, 0)) {
+ return;
+ }
+ var id = 'chart-'+section_idx+'-'+chart_idx;
+ var div = document.createElement('div');
+ div.setAttribute('id', id);
+ div.setAttribute('style', 'width: 100%; height: 150px');
+ document.body.appendChild(div);
+ charts[id] = new CanvasJS.Chart(id, data);
+ charts[id].render();
+ });
});
-}
+
+ if (typeof window.debug === 'undefined')
+ window.debug = {};
+ window.debug.charts = charts;
+};
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.py b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
index 85b49b8..b6ff279 100644
--- a/tools/crunchstat-summary/crunchstat_summary/chartjs.py
+++ b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
@@ -3,13 +3,15 @@ from __future__ import print_function
import json
import pkg_resources
+from crunchstat_summary import logger
+
class ChartJS(object):
- JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.js'
+ JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js'
- def __init__(self, label, tasks):
+ def __init__(self, label, summarizers):
self.label = label
- self.tasks = tasks
+ self.summarizers = summarizers
def html(self):
return '''<!doctype html><html><head>
@@ -20,31 +22,48 @@ class ChartJS(object):
'''.format(self.label, self.JSLIB, self.js())
def js(self):
- return 'var chartData = {};\n{}'.format(
- json.dumps(self.chartData()),
+ return 'var sections = {};\n{}'.format(
+ json.dumps(self.sections()),
pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
- def chartData(self):
- maxpts = 0
- for task in self.tasks.itervalues():
- for series in task.series.itervalues():
- maxpts = max(maxpts, len(series))
+ def sections(self):
return [
{
- 'title': {
- 'text': '{}: {} {}'.format(self.label, stat[0], stat[1]),
+ 'label': s.long_label(),
+ 'charts': self.charts(s.label, s.tasks),
+ }
+ for s in self.summarizers]
+
+ def charts(self, label, tasks):
+ return [
+ {
+ 'axisY': {
+ 'minimum': 0,
},
'data': [
{
'type': 'line',
- 'dataPoints': [
- {'x': pt[0].total_seconds(), 'y': pt[1]}
- for pt in task.series[stat]]
+ 'markerType': 'none',
+ 'dataPoints': self._datapoints(
+ label=uuid, task=task, series=task.series[stat]),
}
- for label, task in self.tasks.iteritems()
+ for uuid, task in tasks.iteritems()
],
+ 'title': {
+ 'text': '{}: {} {}'.format(label, stat[0], stat[1]),
+ },
+ 'zoomEnabled': True,
}
for stat in (('cpu', 'user+sys__rate'),
+ ('mem', 'rss')),
('net:eth0', 'tx+rx__rate'),
- ('mem', 'rss'))
- ]
+ ('net:keep0', 'tx+rx__rate')]
+
+ def _datapoints(self, label, task, series):
+ points = [
+ {'x': pt[0].total_seconds(), 'y': pt[1]}
+ for pt in series]
+ if len(points) > 0:
+ points[-1]['markerType'] = 'cross'
+ points[-1]['markerSize'] = 12
+ return points
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index ab95108..056d8df 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -1,8 +1,9 @@
import argparse
import gzip
+import logging
import sys
-from crunchstat_summary import summarizer
+from crunchstat_summary import logger, summarizer
class ArgumentParser(argparse.ArgumentParser):
@@ -23,11 +24,21 @@ class ArgumentParser(argparse.ArgumentParser):
fmt.add_argument(
'--format', type=str, choices=('html', 'text'), default='text',
help='Report format')
+ self.add_argument(
+ '--verbose', action='store_true',
+ help='Write progress messages to stderr')
+ self.add_argument(
+ '--debug', action='store_true',
+ help='Write debug messages to stderr')
class Command(object):
def __init__(self, args):
self.args = args
+ if args.debug:
+ logger.setLevel(logging.DEBUG)
+ elif args.verbose:
+ logger.setLevel(logging.INFO)
def run(self):
if self.args.pipeline_instance:
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index b630a0c..6396b5d 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -6,13 +6,12 @@ import crunchstat_summary.chartjs
import datetime
import functools
import itertools
-import logging
import math
import re
import sys
-logger = logging.getLogger(__name__)
-logger.addHandler(logging.NullHandler())
+from arvados.api import OrderedJsonModel
+from crunchstat_summary import logger
# Recommend memory constraints that are this multiple of an integral
# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
@@ -29,11 +28,15 @@ class Task(object):
class Summarizer(object):
existing_constraints = {}
- def __init__(self, logdata, label='job'):
+ def __init__(self, logdata, label=None):
self._logdata = logdata
self.label = label
+ self.starttime = None
+ self.finishtime = None
+ logger.debug("%s: logdata %s", self.label, repr(logdata))
def run(self):
+ logger.debug("%s: parsing log data", self.label)
# stats_max: {category: {stat: val}}
self.stats_max = collections.defaultdict(
functools.partial(collections.defaultdict,
@@ -51,20 +54,34 @@ class Summarizer(object):
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
- m = re.search(r'^(?P<timestamp>\S+) \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
+ if self.label is None:
+ self.label = m.group('job_uuid')
+ logger.debug('%s: using job uuid as label', self.label)
if m.group('category').endswith(':'):
# "notice:" etc.
continue
elif m.group('category') == 'error':
continue
task_id = m.group('seq')
+ task = self.tasks[task_id]
+
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
timestamp = datetime.datetime.strptime(
m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
- task = self.tasks[task_id]
if not task.starttime:
task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
this_interval_s = None
for group in ['current', 'interval']:
if not m.group(group):
@@ -119,13 +136,27 @@ class Summarizer(object):
continue
self.job_tot[category][stat] += val
+ def long_label(self):
+ label = self.label
+ if self.finishtime:
+ label += ' -- elapsed time '
+ s = (self.finishtime - self.starttime).total_seconds()
+ if s > 86400:
+ label += '{}d'.format(int(s/86400))
+ if s > 3600:
+ label += '{}h'.format(int(s/3600) % 24)
+ if s > 60:
+ label += '{}m'.format(int(s/60) % 60)
+ label += '{}s'.format(int(s) % 60)
+ return label
+
def text_report(self):
return "\n".join(itertools.chain(
self._text_report_gen(),
self._recommend_gen())) + "\n"
def html_report(self):
- return crunchstat_summary.chartjs.ChartJS(self.label, self.tasks).html()
+ return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
@@ -227,6 +258,7 @@ class CollectionSummarizer(Summarizer):
collection_id, len(filenames)))
super(CollectionSummarizer, self).__init__(
collection.open(filenames[0]))
+ self.label = collection_id
class JobSummarizer(CollectionSummarizer):
@@ -243,11 +275,12 @@ class JobSummarizer(CollectionSummarizer):
"job {} has no log; live summary not implemented".format(
self.job['uuid']))
super(JobSummarizer, self).__init__(self.job['log'])
+ self.label = self.job['uuid']
class PipelineSummarizer():
def __init__(self, pipeline_instance_uuid):
- arv = arvados.api('v1')
+ arv = arvados.api('v1', model=OrderedJsonModel())
instance = arv.pipeline_instances().get(
uuid=pipeline_instance_uuid).execute()
self.summarizers = collections.OrderedDict()
@@ -260,11 +293,12 @@ class PipelineSummarizer():
"%s: skipping job %s with no log available",
cname, component['job'].get('uuid'))
else:
- logger.debug(
- "%s: reading log from %s", cname, component['job']['log'])
+ logger.info(
+ "%s: logdata %s", cname, component['job']['log'])
summarizer = JobSummarizer(component['job'])
summarizer.label = cname
self.summarizers[cname] = summarizer
+ self.label = pipeline_instance_uuid
def run(self):
for summarizer in self.summarizers.itervalues():
@@ -278,3 +312,7 @@ class PipelineSummarizer():
txt += summarizer.text_report()
txt += '\n'
return txt
+
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(
+ self.label, self.summarizers.itervalues()).html()
commit 368240a36ddec56a9797e0a559fb01b91e629219
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 23 11:39:14 2015 -0500
7883: Add option (--format html) to generate canvasjs charts.
diff --git a/tools/crunchstat-summary/bin/crunchstat-summary b/tools/crunchstat-summary/bin/crunchstat-summary
index c32b50e..e16bd8e 100755
--- a/tools/crunchstat-summary/bin/crunchstat-summary
+++ b/tools/crunchstat-summary/bin/crunchstat-summary
@@ -10,6 +10,6 @@ import sys
logging.getLogger().addHandler(logging.StreamHandler())
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.command.Command(args).summarizer()
-s.run()
-print(s.report(), end='')
+cmd = crunchstat_summary.command.Command(args)
+cmd.run()
+print(cmd.report(), end='')
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.js b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
new file mode 100644
index 0000000..3106e62
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
@@ -0,0 +1,11 @@
+window.onload = function() {
+ var options = {};
+ chartData.forEach(function(data, idx) {
+ var div = document.createElement('div');
+ div.setAttribute('id', 'chart-'+idx);
+ div.setAttribute('style', 'width: 100%; height: 150px');
+ document.body.appendChild(div);
+ var chart = new CanvasJS.Chart('chart-'+idx, data);
+ chart.render();
+ });
+}
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.py b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
new file mode 100644
index 0000000..85b49b8
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
@@ -0,0 +1,50 @@
+from __future__ import print_function
+
+import json
+import pkg_resources
+
+
+class ChartJS(object):
+ JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.js'
+
+ def __init__(self, label, tasks):
+ self.label = label
+ self.tasks = tasks
+
+ def html(self):
+ return '''<!doctype html><html><head>
+ <title>{} stats</title>
+ <script type="text/javascript" src="{}"></script>
+ <script type="text/javascript">{}</script>
+ </head><body></body></html>
+ '''.format(self.label, self.JSLIB, self.js())
+
+ def js(self):
+ return 'var chartData = {};\n{}'.format(
+ json.dumps(self.chartData()),
+ pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
+
+ def chartData(self):
+ maxpts = 0
+ for task in self.tasks.itervalues():
+ for series in task.series.itervalues():
+ maxpts = max(maxpts, len(series))
+ return [
+ {
+ 'title': {
+ 'text': '{}: {} {}'.format(self.label, stat[0], stat[1]),
+ },
+ 'data': [
+ {
+ 'type': 'line',
+ 'dataPoints': [
+ {'x': pt[0].total_seconds(), 'y': pt[1]}
+ for pt in task.series[stat]]
+ }
+ for label, task in self.tasks.iteritems()
+ ],
+ }
+ for stat in (('cpu', 'user+sys__rate'),
+ ('net:eth0', 'tx+rx__rate'),
+ ('mem', 'rss'))
+ ]
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index fc37190..ab95108 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -19,22 +19,33 @@ class ArgumentParser(argparse.ArgumentParser):
src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
+ fmt = self.add_mutually_exclusive_group()
+ fmt.add_argument(
+ '--format', type=str, choices=('html', 'text'), default='text',
+ help='Report format')
class Command(object):
def __init__(self, args):
self.args = args
- def summarizer(self):
+ def run(self):
if self.args.pipeline_instance:
- return summarizer.PipelineSummarizer(self.args.pipeline_instance)
+ self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance)
elif self.args.job:
- return summarizer.JobSummarizer(self.args.job)
+ self.summer = summarizer.JobSummarizer(self.args.job)
elif self.args.log_file:
if self.args.log_file.endswith('.gz'):
fh = gzip.open(self.args.log_file)
else:
fh = open(self.args.log_file)
- return summarizer.Summarizer(fh)
+ self.summer = summarizer.Summarizer(fh)
else:
- return summarizer.Summarizer(sys.stdin)
+ self.summer = summarizer.Summarizer(sys.stdin)
+ return self.summer.run()
+
+ def report(self):
+ if self.args.format == 'html':
+ return self.summer.html_report()
+ elif self.args.format == 'text':
+ return self.summer.text_report()
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index f5d27d4..b630a0c 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -2,6 +2,8 @@ from __future__ import print_function
import arvados
import collections
+import crunchstat_summary.chartjs
+import datetime
import functools
import itertools
import logging
@@ -17,6 +19,13 @@ logger.addHandler(logging.NullHandler())
# that have amounts like 7.5 GiB according to the kernel.)
AVAILABLE_RAM_RATIO = 0.95
+
+class Task(object):
+ def __init__(self):
+ self.starttime = None
+ self.series = collections.defaultdict(list)
+
+
class Summarizer(object):
existing_constraints = {}
@@ -32,6 +41,7 @@ class Summarizer(object):
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
+ self.tasks = collections.defaultdict(Task)
for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
@@ -41,7 +51,7 @@ class Summarizer(object):
if elapsed > self.stats_max['time']['elapsed']:
self.stats_max['time']['elapsed'] = elapsed
continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>\S+) \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
if m.group('category').endswith(':'):
@@ -50,6 +60,11 @@ class Summarizer(object):
elif m.group('category') == 'error':
continue
task_id = m.group('seq')
+ timestamp = datetime.datetime.strptime(
+ m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+ task = self.tasks[task_id]
+ if not task.starttime:
+ task.starttime = timestamp
this_interval_s = None
for group in ['current', 'interval']:
if not m.group(group):
@@ -84,7 +99,13 @@ class Summarizer(object):
else:
stat = stat + '__rate'
val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
+ task.series[category, stat].append(
+ (timestamp - task.starttime, val))
else:
+ if stat in ['rss']:
+ task.series[category, stat].append(
+ (timestamp - task.starttime, val))
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
@@ -98,12 +119,15 @@ class Summarizer(object):
continue
self.job_tot[category][stat] += val
- def report(self):
+ def text_report(self):
return "\n".join(itertools.chain(
- self._report_gen(),
+ self._text_report_gen(),
self._recommend_gen())) + "\n"
- def _report_gen(self):
+ def html_report(self):
+ return crunchstat_summary.chartjs.ChartJS(self.label, self.tasks).html()
+
+ def _text_report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
for category, stat_max in sorted(self.stats_max.iteritems()):
for stat, val in sorted(stat_max.iteritems()):
@@ -246,11 +270,11 @@ class PipelineSummarizer():
for summarizer in self.summarizers.itervalues():
summarizer.run()
- def report(self):
+ def text_report(self):
txt = ''
for cname, summarizer in self.summarizers.iteritems():
txt += '### Summary for {} ({})\n'.format(
cname, summarizer.job['uuid'])
- txt += summarizer.report()
+ txt += summarizer.text_report()
txt += '\n'
return txt
diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py
index a19d7ad..3abf344 100644
--- a/tools/crunchstat-summary/tests/test_examples.py
+++ b/tools/crunchstat-summary/tests/test_examples.py
@@ -1,7 +1,6 @@
import arvados
import collections
import crunchstat_summary.command
-import crunchstat_summary.summarizer
import difflib
import glob
import gzip
@@ -9,17 +8,17 @@ import mock
import os
import unittest
-
TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
+
class ReportDiff(unittest.TestCase):
- def diff_known_report(self, logfile, summarizer):
+ def diff_known_report(self, logfile, cmd):
expectfile = logfile+'.report'
expect = open(expectfile).readlines()
- self.diff_report(summarizer, expect, expectfile=expectfile)
+ self.diff_report(cmd, expect, expectfile=expectfile)
- def diff_report(self, summarizer, expect, expectfile=None):
- got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
+ def diff_report(self, cmd, expect, expectfile=None):
+ got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
expect, got, fromfile=expectfile, tofile="(generated)")))
@@ -30,9 +29,9 @@ class SummarizeFile(ReportDiff):
logfile = os.path.join(TESTS_DIR, fnm)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--log-file', logfile])
- summarizer = crunchstat_summary.command.Command(args).summarizer()
- summarizer.run()
- self.diff_known_report(logfile, summarizer)
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(logfile, cmd)
class SummarizeJob(ReportDiff):
@@ -52,9 +51,9 @@ class SummarizeJob(ReportDiff):
mock_cr().open.return_value = gzip.open(self.logfile)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_job_uuid])
- summarizer = crunchstat_summary.command.Command(args).summarizer()
- summarizer.run()
- self.diff_known_report(self.logfile, summarizer)
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(self.logfile, cmd)
mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
mock_cr.assert_called_with(self.fake_log_id)
mock_cr().open.assert_called_with('fake-logfile.txt')
@@ -113,8 +112,8 @@ class SummarizePipeline(ReportDiff):
mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--pipeline-instance', self.fake_instance['uuid']])
- summarizer = crunchstat_summary.command.Command(args).summarizer()
- summarizer.run()
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
job_report = [
line for line in open(logfile+'.report').readlines()
@@ -126,7 +125,7 @@ class SummarizePipeline(ReportDiff):
job_report + ['\n'] +
['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
job_report)
- self.diff_report(summarizer, expect)
+ self.diff_report(cmd, expect)
mock_cr.assert_has_calls(
[
mock.call('fake-log-pdh-0'),
commit 6f5082f86c8143917305ac6bb7f9559d3e364597
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Dec 22 16:16:15 2015 -0500
7883: Aim 5% below GiB memory size boundaries.
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 49b67ff..f5d27d4 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -12,6 +12,11 @@ import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
+# Recommend memory constraints that are this multiple of an integral
+# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
+# that have amounts like 7.5 GiB according to the kernel.)
+AVAILABLE_RAM_RATIO = 0.95
+
class Summarizer(object):
existing_constraints = {}
@@ -159,7 +164,7 @@ class Summarizer(object):
int(used_cores))
def _recommend_ram(self):
- """Recommend asking for 2048 MiB RAM if max rss was 1248 MiB"""
+ """Recommend asking for (2048*0.95) MiB RAM if max rss was 1248 MiB"""
used_ram = self.stats_max['mem']['rss']
if used_ram == float('-Inf'):
@@ -167,14 +172,16 @@ class Summarizer(object):
return
used_ram = math.ceil(float(used_ram) / (1<<20))
asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
- if asked_ram is None or math.ceil(used_ram/(1<<10)) < asked_ram/(1<<10):
+ if asked_ram is None or (
+ math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10)) <
+ (asked_ram/AVAILABLE_RAM_RATIO)/(1<<10)):
yield (
- '#!! {} never used more than {} MiB RAM -- '
+ '#!! {} max RSS was {} MiB -- '
'try runtime_constraints "min_ram_mb_per_node":{}'
).format(
self.label,
int(used_ram),
- int(math.ceil(used_ram/(1<<10))*(1<<10)))
+ int(math.ceil((used_ram/AVAILABLE_RAM_RATIO)/(1<<10))*(1<<10)*AVAILABLE_RAM_RATIO))
def _format(self, val):
"""Return a string representation of a stat.
commit 610272beac73af1b4880bfad9f1e23d86910dee8
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Dec 21 16:53:37 2015 -0500
7883: Recommend more economical values for resource_constraints.
diff --git a/tools/crunchstat-summary/bin/crunchstat-summary b/tools/crunchstat-summary/bin/crunchstat-summary
index f42d30b..c32b50e 100755
--- a/tools/crunchstat-summary/bin/crunchstat-summary
+++ b/tools/crunchstat-summary/bin/crunchstat-summary
@@ -4,8 +4,11 @@ from __future__ import print_function
import crunchstat_summary.command
import crunchstat_summary.summarizer
+import logging
import sys
+logging.getLogger().addHandler(logging.StreamHandler())
+
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
s = crunchstat_summary.command.Command(args).summarizer()
s.run()
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 1b7f950..49b67ff 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,21 @@ from __future__ import print_function
import arvados
import collections
import functools
+import itertools
+import logging
+import math
import re
import sys
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
class Summarizer(object):
- def __init__(self, logdata):
+ existing_constraints = {}
+
+ def __init__(self, logdata, label='job'):
self._logdata = logdata
+ self.label = label
def run(self):
# stats_max: {category: {stat: val}}
@@ -64,9 +72,9 @@ class Summarizer(object):
this_interval_s = val
continue
elif not (this_interval_s > 0):
- print("BUG? interval stat given with duration {!r}".
- format(this_interval_s),
- file=sys.stderr)
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
continue
else:
stat = stat + '__rate'
@@ -75,12 +83,7 @@ class Summarizer(object):
self.task_stats[task_id][category][stat] = val
if val > self.stats_max[category][stat]:
self.stats_max[category][stat] = val
-
- def report(self):
- return "\n".join(self._report_gen()) + "\n"
-
- def _report_gen(self):
- job_tot = collections.defaultdict(
+ self.job_tot = collections.defaultdict(
functools.partial(collections.defaultdict, int))
for task_id, task_stat in self.task_stats.iteritems():
for category, stat_last in task_stat.iteritems():
@@ -88,7 +91,14 @@ class Summarizer(object):
if stat in ['cpus', 'cache', 'swap', 'rss']:
# meaningless stats like 16 cpu cores x 5 tasks = 80
continue
- job_tot[category][stat] += val
+ self.job_tot[category][stat] += val
+
+ def report(self):
+ return "\n".join(itertools.chain(
+ self._report_gen(),
+ self._recommend_gen())) + "\n"
+
+ def _report_gen(self):
yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
for category, stat_max in sorted(self.stats_max.iteritems()):
for stat, val in sorted(stat_max.iteritems()):
@@ -96,7 +106,7 @@ class Summarizer(object):
continue
max_rate = self._format(stat_max.get(stat+'__rate', '-'))
val = self._format(val)
- tot = self._format(job_tot[category].get(stat, '-'))
+ tot = self._format(self.job_tot[category].get(stat, '-'))
yield "\t".join([category, stat, str(val), max_rate, tot])
for args in (
('Max CPU time spent by a single task: {}s',
@@ -106,7 +116,8 @@ class Summarizer(object):
self.stats_max['cpu']['user+sys__rate'],
lambda x: x * 100),
('Overall CPU usage: {}%',
- job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+ self.job_tot['cpu']['user+sys'] /
+ self.job_tot['time']['elapsed'],
lambda x: x * 100),
('Max memory used by a single task: {}GB',
self.stats_max['mem']['rss'],
@@ -124,6 +135,47 @@ class Summarizer(object):
val = transform(val)
yield "# "+format_string.format(self._format(val))
+ def _recommend_gen(self):
+ return itertools.chain(
+ self._recommend_cpu(),
+ self._recommend_ram())
+
+ def _recommend_cpu(self):
+ """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+ cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+ if cpu_max_rate == float('-Inf'):
+ logger.warning('%s: no CPU usage data', self.label)
+ return
+ used_cores = int(math.ceil(cpu_max_rate))
+ asked_cores = self.existing_constraints.get('min_cores_per_node')
+ if asked_cores is None or used_cores < asked_cores:
+ yield (
+ '#!! {} max CPU usage was {}% -- '
+ 'try runtime_constraints "min_cores_per_node":{}'
+ ).format(
+ self.label,
+ int(math.ceil(cpu_max_rate*100)),
+ int(used_cores))
+
+ def _recommend_ram(self):
+ """Recommend asking for 2048 MiB RAM if max rss was 1248 MiB"""
+
+ used_ram = self.stats_max['mem']['rss']
+ if used_ram == float('-Inf'):
+ logger.warning('%s: no memory usage data', self.label)
+ return
+ used_ram = math.ceil(float(used_ram) / (1<<20))
+ asked_ram = self.existing_constraints.get('min_ram_mb_per_node')
+ if asked_ram is None or math.ceil(used_ram/(1<<10)) < asked_ram/(1<<10):
+ yield (
+ '#!! {} never used more than {} MiB RAM -- '
+ 'try runtime_constraints "min_ram_mb_per_node":{}'
+ ).format(
+ self.label,
+ int(used_ram),
+ int(math.ceil(used_ram/(1<<10))*(1<<10)))
+
def _format(self, val):
"""Return a string representation of a stat.
@@ -133,6 +185,7 @@ class Summarizer(object):
else:
return '{}'.format(val)
+
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id):
collection = arvados.collection.CollectionReader(collection_id)
@@ -141,7 +194,9 @@ class CollectionSummarizer(Summarizer):
raise ValueError(
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
- super(CollectionSummarizer, self).__init__(collection.open(filenames[0]))
+ super(CollectionSummarizer, self).__init__(
+ collection.open(filenames[0]))
+
class JobSummarizer(CollectionSummarizer):
def __init__(self, job):
@@ -150,12 +205,15 @@ class JobSummarizer(CollectionSummarizer):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
+ self.label = self.job['uuid']
+ self.existing_constraints = self.job.get('runtime_constraints', {})
if not self.job['log']:
raise ValueError(
"job {} has no log; live summary not implemented".format(
self.job['uuid']))
super(JobSummarizer, self).__init__(self.job['log'])
+
class PipelineSummarizer():
def __init__(self, pipeline_instance_uuid):
arv = arvados.api('v1')
@@ -164,16 +222,17 @@ class PipelineSummarizer():
self.summarizers = collections.OrderedDict()
for cname, component in instance['components'].iteritems():
if 'job' not in component:
- print("{}: skipping component with no job assigned".format(
- cname), file=sys.stderr)
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
elif component['job'].get('log') is None:
- print("{}: skipping component with no log available".format(
- cname), file=sys.stderr)
+ logger.warning(
+ "%s: skipping job %s with no log available",
+ cname, component['job'].get('uuid'))
else:
- print("{}: reading log from {}".format(
- cname, component['job']['log']), file=sys.stderr)
- summarizer = CollectionSummarizer(component['job']['log'])
- summarizer.job_uuid = component['job']['uuid']
+ logger.debug(
+ "%s: reading log from %s", cname, component['job']['log'])
+ summarizer = JobSummarizer(component['job'])
+ summarizer.label = cname
self.summarizers[cname] = summarizer
def run(self):
@@ -184,7 +243,7 @@ class PipelineSummarizer():
txt = ''
for cname, summarizer in self.summarizers.iteritems():
txt += '### Summary for {} ({})\n'.format(
- cname, summarizer.job_uuid)
+ cname, summarizer.job['uuid'])
txt += summarizer.report()
txt += '\n'
return txt
diff --git a/tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report b/tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
index ef7beb1..c94cd24 100644
--- a/tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
+++ b/tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
@@ -28,3 +28,5 @@ time elapsed 80 - 80
# Max memory used by a single task: 0.35GB
# Max network traffic in a single task: 1.79GB
# Max network speed in a single interval: 42.58MB/s
+#!! job max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! job never used more than 334 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
diff --git a/tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report b/tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
index 38af3e7..e711824 100644
--- a/tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
+++ b/tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
@@ -15,3 +15,4 @@ time elapsed 2 - 4
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+#!! job never used more than 1 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
diff --git a/tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report b/tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
index 7e42d61..5772cb4 100644
--- a/tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
+++ b/tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
@@ -15,3 +15,4 @@ time elapsed 2 - 3
# Overall CPU usage: 0.00%
# Max memory used by a single task: 0.00GB
# Max network traffic in a single task: 0.00GB
+#!! job never used more than 1 MiB RAM -- try runtime_constraints "min_ram_mb_per_node":1024
diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py
index 1bce693..a19d7ad 100644
--- a/tools/crunchstat-summary/tests/test_examples.py
+++ b/tools/crunchstat-summary/tests/test_examples.py
@@ -24,7 +24,7 @@ class ReportDiff(unittest.TestCase):
expect, got, fromfile=expectfile, tofile="(generated)")))
-class ExampleLogsTestCase(ReportDiff):
+class SummarizeFile(ReportDiff):
def test_example_files(self):
for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
logfile = os.path.join(TESTS_DIR, fnm)
@@ -35,22 +35,28 @@ class ExampleLogsTestCase(ReportDiff):
self.diff_known_report(logfile, summarizer)
-class LookupJobUUID(ReportDiff):
- fake_uuid = 'zzzzz-8i9sb-jq0ekny1xou3zoh'
+class SummarizeJob(ReportDiff):
+ fake_job_uuid = 'zzzzz-8i9sb-jjjjjjjjjjjjjjj'
+ fake_log_id = 'fake-log-collection-id'
+ fake_job = {
+ 'uuid': fake_job_uuid,
+ 'log': fake_log_id,
+ }
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
@mock.patch('arvados.collection.CollectionReader')
@mock.patch('arvados.api')
- def test_job_uuid(self, mock_api, mock_cr):
- logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
- mock_api().jobs().get().execute.return_value = {'log': 'fake-uuid'}
+ def test_job_report(self, mock_api, mock_cr):
+ mock_api().jobs().get().execute.return_value = self.fake_job
mock_cr().__iter__.return_value = ['fake-logfile.txt']
- mock_cr().open.return_value = gzip.open(logfile)
+ mock_cr().open.return_value = gzip.open(self.logfile)
args = crunchstat_summary.command.ArgumentParser().parse_args(
- ['--job', self.fake_uuid])
+ ['--job', self.fake_job_uuid])
summarizer = crunchstat_summary.command.Command(args).summarizer()
summarizer.run()
- self.diff_known_report(logfile, summarizer)
- mock_api().jobs().get.assert_called_with(uuid=self.fake_uuid)
+ self.diff_known_report(self.logfile, summarizer)
+ mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+ mock_cr.assert_called_with(self.fake_log_id)
mock_cr().open.assert_called_with('fake-logfile.txt')
@@ -63,12 +69,20 @@ class SummarizePipeline(ReportDiff):
'job': {
'uuid': 'zzzzz-8i9sb-000000000000000',
'log': 'fake-log-pdh-0',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 1024,
+ 'min_cores_per_node': 1,
+ },
},
}],
['bar', {
'job': {
'uuid': 'zzzzz-8i9sb-000000000000001',
'log': 'fake-log-pdh-1',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 1024,
+ 'min_cores_per_node': 1,
+ },
},
}],
['no-job-assigned', {}],
@@ -81,6 +95,10 @@ class SummarizePipeline(ReportDiff):
'job': {
'uuid': 'zzzzz-8i9sb-000000000000002',
'log': 'fake-log-pdh-2',
+ 'runtime_constraints': {
+ 'min_ram_mb_per_node': 1024,
+ 'min_cores_per_node': 1,
+ },
},
}]]),
}
@@ -98,13 +116,16 @@ class SummarizePipeline(ReportDiff):
summarizer = crunchstat_summary.command.Command(args).summarizer()
summarizer.run()
+ job_report = [
+ line for line in open(logfile+'.report').readlines()
+ if not line.startswith('#!! ')]
expect = (
['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
- open(logfile+'.report').readlines() + ['\n'] +
+ job_report + ['\n'] +
['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
- open(logfile+'.report').readlines() + ['\n'] +
+ job_report + ['\n'] +
['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
- open(logfile+'.report').readlines())
+ job_report)
self.diff_report(summarizer, expect)
mock_cr.assert_has_calls(
[
commit f878293dfe6c4a20312ff2db658dabc27ed60a8f
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Dec 21 14:42:44 2015 -0500
7883: Add --pipeline-instance mode: generate a report for each finished component.
diff --git a/tools/crunchstat-summary/bin/crunchstat-summary b/tools/crunchstat-summary/bin/crunchstat-summary
index 662d783..f42d30b 100755
--- a/tools/crunchstat-summary/bin/crunchstat-summary
+++ b/tools/crunchstat-summary/bin/crunchstat-summary
@@ -7,6 +7,6 @@ import crunchstat_summary.summarizer
import sys
args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
+s = crunchstat_summary.command.Command(args).summarizer()
s.run()
print(s.report(), end='')
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index 8186e5d..fc37190 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -1,4 +1,8 @@
import argparse
+import gzip
+import sys
+
+from crunchstat_summary import summarizer
class ArgumentParser(argparse.ArgumentParser):
@@ -10,5 +14,27 @@ class ArgumentParser(argparse.ArgumentParser):
'--job', type=str, metavar='UUID',
help='Look up the specified job and read its log data from Keep')
src.add_argument(
+ '--pipeline-instance', type=str, metavar='UUID',
+ help='Summarize each component of the given pipeline instance')
+ src.add_argument(
'--log-file', type=str,
help='Read log data from a regular file')
+
+
+class Command(object):
+ def __init__(self, args):
+ self.args = args
+
+ def summarizer(self):
+ if self.args.pipeline_instance:
+ return summarizer.PipelineSummarizer(self.args.pipeline_instance)
+ elif self.args.job:
+ return summarizer.JobSummarizer(self.args.job)
+ elif self.args.log_file:
+ if self.args.log_file.endswith('.gz'):
+ fh = gzip.open(self.args.log_file)
+ else:
+ fh = open(self.args.log_file)
+ return summarizer.Summarizer(fh)
+ else:
+ return summarizer.Summarizer(sys.stdin)
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ac0964b..1b7f950 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,14 +3,13 @@ from __future__ import print_function
import arvados
import collections
import functools
-import gzip
import re
import sys
class Summarizer(object):
- def __init__(self, args):
- self.args = args
+ def __init__(self, logdata):
+ self._logdata = logdata
def run(self):
# stats_max: {category: {stat: val}}
@@ -20,7 +19,7 @@ class Summarizer(object):
# task_stats: {task_id: {category: {stat: val}}}
self.task_stats = collections.defaultdict(
functools.partial(collections.defaultdict, dict))
- for line in self._logdata():
+ for line in self._logdata:
m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
if m:
task_id = m.group('seq')
@@ -35,6 +34,8 @@ class Summarizer(object):
if m.group('category').endswith(':'):
# "notice:" etc.
continue
+ elif m.group('category') == 'error':
+ continue
task_id = m.group('seq')
this_interval_s = None
for group in ['current', 'interval']:
@@ -44,10 +45,15 @@ class Summarizer(object):
words = m.group(group).split(' ')
stats = {}
for val, stat in zip(words[::2], words[1::2]):
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
+ try:
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ raise ValueError(
+ 'Error parsing {} stat in "{}": {!r}'.format(
+ stat, line, e))
if 'user' in stats or 'sys' in stats:
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
if 'tx' in stats or 'rx' in stats:
@@ -127,25 +133,58 @@ class Summarizer(object):
else:
return '{}'.format(val)
- def _logdata(self):
- if self.args.log_file:
- if self.args.log_file.endswith('.gz'):
- return gzip.open(self.args.log_file)
- else:
- return open(self.args.log_file)
- elif self.args.job:
- arv = arvados.api('v1')
- job = arv.jobs().get(uuid=self.args.job).execute()
- if not job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.args.job))
- collection = arvados.collection.CollectionReader(job['log'])
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- job.log, len(filenames)))
- return collection.open(filenames[0])
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id):
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ super(CollectionSummarizer, self).__init__(collection.open(filenames[0]))
+
+class JobSummarizer(CollectionSummarizer):
+ def __init__(self, job):
+ arv = arvados.api('v1')
+ if isinstance(job, str):
+ self.job = arv.jobs().get(uuid=job).execute()
else:
- return sys.stdin
+ self.job = job
+ if not self.job['log']:
+ raise ValueError(
+ "job {} has no log; live summary not implemented".format(
+ self.job['uuid']))
+ super(JobSummarizer, self).__init__(self.job['log'])
+
+class PipelineSummarizer():
+ def __init__(self, pipeline_instance_uuid):
+ arv = arvados.api('v1')
+ instance = arv.pipeline_instances().get(
+ uuid=pipeline_instance_uuid).execute()
+ self.summarizers = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ print("{}: skipping component with no job assigned".format(
+ cname), file=sys.stderr)
+ elif component['job'].get('log') is None:
+ print("{}: skipping component with no log available".format(
+ cname), file=sys.stderr)
+ else:
+ print("{}: reading log from {}".format(
+ cname, component['job']['log']), file=sys.stderr)
+ summarizer = CollectionSummarizer(component['job']['log'])
+ summarizer.job_uuid = component['job']['uuid']
+ self.summarizers[cname] = summarizer
+
+ def run(self):
+ for summarizer in self.summarizers.itervalues():
+ summarizer.run()
+
+ def report(self):
+ txt = ''
+ for cname, summarizer in self.summarizers.iteritems():
+ txt += '### Summary for {} ({})\n'.format(
+ cname, summarizer.job_uuid)
+ txt += summarizer.report()
+ txt += '\n'
+ return txt
diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py
index dbc3843..1bce693 100644
--- a/tools/crunchstat-summary/tests/test_examples.py
+++ b/tools/crunchstat-summary/tests/test_examples.py
@@ -1,22 +1,115 @@
+import arvados
+import collections
import crunchstat_summary.command
import crunchstat_summary.summarizer
import difflib
import glob
+import gzip
+import mock
import os
import unittest
-class ExampleLogsTestCase(unittest.TestCase):
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
+
+class ReportDiff(unittest.TestCase):
+ def diff_known_report(self, logfile, summarizer):
+ expectfile = logfile+'.report'
+ expect = open(expectfile).readlines()
+ self.diff_report(summarizer, expect, expectfile=expectfile)
+
+ def diff_report(self, summarizer, expect, expectfile=None):
+ got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
+ self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+ expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class ExampleLogsTestCase(ReportDiff):
def test_example_files(self):
- dirname = os.path.dirname(os.path.abspath(__file__))
- for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
- logfile = os.path.join(dirname, fnm)
+ for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+ logfile = os.path.join(TESTS_DIR, fnm)
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--log-file', logfile])
- summarizer = crunchstat_summary.summarizer.Summarizer(args)
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
summarizer.run()
- got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
- expectfile = logfile+'.report'
- expect = open(expectfile).readlines()
- self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
- expect, got, fromfile=expectfile, tofile="(generated)")))
+ self.diff_known_report(logfile, summarizer)
+
+
+class LookupJobUUID(ReportDiff):
+ fake_uuid = 'zzzzz-8i9sb-jq0ekny1xou3zoh'
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_job_uuid(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().jobs().get().execute.return_value = {'log': 'fake-uuid'}
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.return_value = gzip.open(logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_uuid])
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
+ summarizer.run()
+ self.diff_known_report(logfile, summarizer)
+ mock_api().jobs().get.assert_called_with(uuid=self.fake_uuid)
+ mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+ fake_instance = {
+ 'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+ 'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+ 'components': collections.OrderedDict([
+ ['foo', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000000',
+ 'log': 'fake-log-pdh-0',
+ },
+ }],
+ ['bar', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000001',
+ 'log': 'fake-log-pdh-1',
+ },
+ }],
+ ['no-job-assigned', {}],
+ ['unfinished-job', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+ },
+ }],
+ ['baz', {
+ 'job': {
+ 'uuid': 'zzzzz-8i9sb-000000000000002',
+ 'log': 'fake-log-pdh-2',
+ },
+ }]]),
+ }
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_pipeline(self, mock_api, mock_cr):
+ logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+ mock_api().pipeline_instances().get().execute. \
+ return_value = self.fake_instance
+ mock_cr().__iter__.return_value = ['fake-logfile.txt']
+ mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--pipeline-instance', self.fake_instance['uuid']])
+ summarizer = crunchstat_summary.command.Command(args).summarizer()
+ summarizer.run()
+
+ expect = (
+ ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+ open(logfile+'.report').readlines() + ['\n'] +
+ ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+ open(logfile+'.report').readlines() + ['\n'] +
+ ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+ open(logfile+'.report').readlines())
+ self.diff_report(summarizer, expect)
+ mock_cr.assert_has_calls(
+ [
+ mock.call('fake-log-pdh-0'),
+ mock.call('fake-log-pdh-1'),
+ mock.call('fake-log-pdh-2'),
+ ], any_order=True)
+ mock_cr().open.assert_called_with('fake-logfile.txt')
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list