[ARVADOS] updated: cd42a47667cb6876acf7a557d1cde96fc214bf47
Git user
git at public.curoverse.com
Tue Aug 15 18:20:29 EDT 2017
Summary of changes:
lib/crunchstat/crunchstat.go | 10 +-
.../crunchstat_summary/command.py | 4 +
.../crunchstat_summary/summarizer.py | 362 +++++++++++++--------
...r_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz | Bin 0 -> 622 bytes
...-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report | 23 ++
tools/crunchstat-summary/tests/test_examples.py | 30 ++
6 files changed, 282 insertions(+), 147 deletions(-)
create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz
create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
via cd42a47667cb6876acf7a557d1cde96fc214bf47 (commit)
via 68fee3aff2bc0e189827956720de58c3e8668bdc (commit)
via 6699c983ac7c8506d17a60323ba71478ea2ac20d (commit)
via 0a4e2d2940214cf91d9333f7a49b73159ee34d3a (commit)
from cf0656348eb1e4c5858ca944da6e9448cd22a894 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit cd42a47667cb6876acf7a557d1cde96fc214bf47
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 15 16:29:48 2017 -0400
11309: Prefix errors with "notice:" or "warning:" to aid parsing.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index f4915c0..056ef0d 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -93,7 +93,7 @@ func (r *Reporter) Stop() {
func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("warning: %v", err)
}
return content, err
}
@@ -169,7 +169,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- r.Logger.Print(err)
+ r.Logger.Printf("notice: %v", err)
continue
}
return strings.NewReader(string(stats)), nil
@@ -416,7 +416,7 @@ func (r *Reporter) waitForCIDFile() bool {
select {
case <-ticker.C:
case <-r.done:
- r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+ r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
return false
}
}
@@ -439,9 +439,9 @@ func (r *Reporter) waitForCgroup() bool {
select {
case <-ticker.C:
case <-warningTimer:
- r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+ r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
case <-r.done:
- r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+ r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
return false
}
}
commit 68fee3aff2bc0e189827956720de58c3e8668bdc
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 15 16:26:18 2017 -0400
11309: Fix parsing and labels. Add --threads option.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index 046f01c..a70e4b2 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -34,6 +34,9 @@ class ArgumentParser(argparse.ArgumentParser):
'--format', type=str, choices=('html', 'text'), default='text',
help='Report format')
self.add_argument(
+ '--threads', type=int, default=8,
+ help='Maximum worker threads to run')
+ self.add_argument(
'--verbose', '-v', action='count', default=0,
help='Log more information (once for progress, twice for debug)')
@@ -46,6 +49,7 @@ class Command(object):
def run(self):
kwargs = {
'skip_child_jobs': self.args.skip_child_jobs,
+ 'threads': self.args.threads,
}
if self.args.pipeline_instance:
self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 60f1f70..e8a842d 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -41,9 +41,10 @@ class Task(object):
class Summarizer(object):
- def __init__(self, logdata, label=None, skip_child_jobs=False):
+ def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
self._logdata = logdata
+ self.uuid = uuid
self.label = label
self.starttime = None
self.finishtime = None
@@ -69,55 +70,55 @@ class Summarizer(object):
def run(self):
logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+ detected_crunch1 = False
for line in self._logdata:
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
- if m:
- seq = int(m.group('seq'))
- uuid = m.group('task_uuid')
- self.seq_to_uuid[seq] = uuid
- logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
- continue
+ if not detected_crunch1 and '-8i9sb-' in line:
+ detected_crunch1 = True
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
- if m:
- task_id = self.seq_to_uuid[int(m.group('seq'))]
- elapsed = int(m.group('elapsed'))
- self.task_stats[task_id]['time'] = {'elapsed': elapsed}
- if elapsed > self.stats_max['time']['elapsed']:
- self.stats_max['time']['elapsed'] = elapsed
- continue
+ if detected_crunch1:
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+ if m:
+ seq = int(m.group('seq'))
+ uuid = m.group('task_uuid')
+ self.seq_to_uuid[seq] = uuid
+ logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+ continue
- m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
- if m:
- uuid = m.group('uuid')
- if self._skip_child_jobs:
- logger.warning('%s: omitting stats from child job %s'
- ' because --skip-child-jobs flag is on',
- self.label, uuid)
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+ if m:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ elapsed = int(m.group('elapsed'))
+ self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+ if elapsed > self.stats_max['time']['elapsed']:
+ self.stats_max['time']['elapsed'] = elapsed
continue
- logger.debug('%s: follow %s', self.label, uuid)
- child_summarizer = ProcessSummarizer(uuid)
- child_summarizer.stats_max = self.stats_max
- child_summarizer.task_stats = self.task_stats
- child_summarizer.tasks = self.tasks
- child_summarizer.starttime = self.starttime
- child_summarizer.run()
- logger.debug('%s: done %s', self.label, uuid)
- continue
- m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
- if m:
- # crunch1 job
- task_id = self.seq_to_uuid[int(m.group('seq'))]
+ m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+ if m:
+ uuid = m.group('uuid')
+ if self._skip_child_jobs:
+ logger.warning('%s: omitting stats from child job %s'
+ ' because --skip-child-jobs flag is on',
+ self.label, uuid)
+ continue
+ logger.debug('%s: follow %s', self.label, uuid)
+ child_summarizer = ProcessSummarizer(uuid)
+ child_summarizer.stats_max = self.stats_max
+ child_summarizer.task_stats = self.task_stats
+ child_summarizer.tasks = self.tasks
+ child_summarizer.starttime = self.starttime
+ child_summarizer.run()
+ logger.debug('%s: done %s', self.label, uuid)
+ continue
+
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if not m:
+ continue
else:
+ # crunch2
m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
- if m:
- # crunch2 container (seq/task don't apply)
- task_id = 'container'
- else:
- # not a crunchstat log
+ if not m:
continue
- task = self.tasks[task_id]
if self.label is None:
try:
@@ -129,11 +130,17 @@ class Summarizer(object):
continue
elif m.group('category') in ('error', 'caught'):
continue
- elif m.group('category') == 'read':
+ elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
# "stderr crunchstat: read /proc/1234/net/dev: ..."
- # (crunchstat formatting fixed, but old logs still say this)
+ # (old logs are less careful with unprefixed error messages)
continue
+ if detected_crunch1:
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ else:
+ task_id = 'container'
+ task = self.tasks[task_id]
+
# Use the first and last crunchstat timestamps as
# approximations of starttime and finishtime.
timestamp = m.group('timestamp')
@@ -410,47 +417,47 @@ def NewSummarizer(process, **kwargs):
process = None
arv = arvados.api('v1', model=OrderedJsonModel())
- if re.search('-dz642-', uuid):
+ if '-dz642-' in uuid:
if process is None:
process = arv.containers().get(uuid=uuid).execute()
- return ContainerSummarizer(process, **kwargs)
- elif re.search('-xvhdp-', uuid):
+ klass = ContainerTreeSummarizer
+ elif '-xvhdp-' in uuid:
if process is None:
- ctrReq = arv.container_requests().get(uuid=uuid).execute()
- ctrUUID = ctrReq['container_uuid']
- process = arv.containers().get(uuid=ctrUUID).execute()
- return ContainerSummarizer(process, **kwargs)
- elif re.search('-8i9sb-', uuid):
+ process = arv.container_requests().get(uuid=uuid).execute()
+ klass = ContainerTreeSummarizer
+ elif '-8i9sb-' in uuid:
if process is None:
process = arv.jobs().get(uuid=uuid).execute()
- return JobSummarizer(process, **kwargs)
- elif re.search('-d1hrv-', uuid):
+ klass = JobSummarizer
+ elif '-d1hrv-' in uuid:
if process is None:
process = arv.pipeline_instances().get(uuid=uuid).execute()
- return PipelineSummarizer(process, **kwargs)
+ klass = PipelineSummarizer
+ elif '-4zz18-' in uuid:
+ return CollectionSummarizer(collection_id=uuid)
else:
raise ArgumentError("Unrecognized uuid %s", uuid)
+ return klass(process, uuid=uuid, **kwargs)
class ProcessSummarizer(Summarizer):
"""Process is a job, pipeline, container, or container request."""
- def __init__(self, process, **kwargs):
+ def __init__(self, process, label=None, **kwargs):
rdr = None
self.process = process
+ if label is None:
+ label = self.process.get('name', self.process['uuid'])
if self.process.get('log'):
try:
rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
except arvados.errors.NotFoundError as e:
logger.warning("Trying event logs after failing to read "
"log collection %s: %s", self.process['log'], e)
- else:
- label = self.process['uuid']
if rdr is None:
rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
- label = self.process['uuid'] + ' (partial)'
- super(ProcessSummarizer, self).__init__(rdr, **kwargs)
- self.label = label
+ label = label + ' (partial)'
+ super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
self.existing_constraints = self.process.get('runtime_constraints', {})
@@ -462,26 +469,23 @@ class ContainerSummarizer(ProcessSummarizer):
pass
-class PipelineSummarizer(object):
- def __init__(self, instance, **kwargs):
- self.summarizers = collections.OrderedDict()
- for cname, component in instance['components'].iteritems():
- if 'job' not in component:
- logger.warning(
- "%s: skipping component with no job assigned", cname)
- else:
- logger.info(
- "%s: job %s", cname, component['job']['uuid'])
- summarizer = JobSummarizer(component['job'], **kwargs)
- summarizer.label = '{} {}'.format(
- cname, component['job']['uuid'])
- self.summarizers[cname] = summarizer
- self.label = instance['uuid']
+class MultiSummarizer(object):
+ def __init__(self, children={}, label=None, threads=1, **kwargs):
+ self.throttle = threading.Semaphore(threads)
+ self.children = children
+ self.label = label
+
+ def run_and_release(self, target, *args, **kwargs):
+ try:
+ return target(*args, **kwargs)
+ finally:
+ self.throttle.release()
def run(self):
threads = []
- for summarizer in self.summarizers.itervalues():
- t = threading.Thread(target=summarizer.run)
+ for child in self.children.itervalues():
+ self.throttle.acquire()
+ t = threading.Thread(target=self.run_and_release, args=(child.run, ))
t.daemon = True
t.start()
threads.append(t)
@@ -490,12 +494,70 @@ class PipelineSummarizer(object):
def text_report(self):
txt = ''
- for cname, summarizer in self.summarizers.iteritems():
- txt += '### Summary for {} ({})\n'.format(
- cname, summarizer.process['uuid'])
- txt += summarizer.text_report()
+ for cname, child in self.children.iteritems():
+ if len(self.children) > 1:
+ txt += '### Summary for {} ({})\n'.format(
+ cname, child.process['uuid'])
+ txt += child.text_report()
txt += '\n'
return txt
def html_report(self):
- return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()
+ return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
+
+
+class PipelineSummarizer(MultiSummarizer):
+ def __init__(self, instance, **kwargs):
+ children = collections.OrderedDict()
+ for cname, component in instance['components'].iteritems():
+ if 'job' not in component:
+ logger.warning(
+ "%s: skipping component with no job assigned", cname)
+ else:
+ logger.info(
+ "%s: job %s", cname, component['job']['uuid'])
+ summarizer = JobSummarizer(component['job'], **kwargs)
+ summarizer.label = '{} {}'.format(
+ cname, component['job']['uuid'])
+ children[cname] = summarizer
+ super(PipelineSummarizer, self).__init__(
+ children=children,
+ label=instance['uuid'],
+ **kwargs)
+
+
+class ContainerTreeSummarizer(MultiSummarizer):
+ def __init__(self, root, **kwargs):
+ arv = arvados.api('v1', model=OrderedJsonModel())
+
+ label = kwargs.pop('label', None) or root.get('name') or root['uuid']
+ root['name'] = label
+
+ children = collections.OrderedDict()
+ todo = collections.deque((root, ))
+ while len(todo) > 0:
+ current = todo.popleft()
+ label = current['name']
+ if current['uuid'].find('-xvhdp-') > 0:
+ current = arv.containers().get(uuid=current['container_uuid']).execute()
+ children[current['uuid']] = ContainerSummarizer(
+ current, label=label, **kwargs)
+ page_filters = []
+ while True:
+ items = arv.container_requests().index(
+ order=['uuid asc'],
+ filters=page_filters+[
+ ['requesting_container_uuid', '=', current['uuid']]],
+ ).execute()['items']
+ if not items:
+ break
+ page_filters = [['uuid', '>', items[-1]['uuid']]]
+ for cr in items:
+ if cr['container_uuid']:
+ logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
+ cr['name'] = label + ' / ' + (cr.get('name') or cr['uuid'])
+ todo.append(cr)
+ super(ContainerTreeSummarizer, self).__init__(
+ children=children,
+ label=root['name'],
+ **kwargs)
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz
new file mode 100644
index 0000000..8b069e7
Binary files /dev/null and b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz differ
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
new file mode 100644
index 0000000..daabb32
--- /dev/null
+++ b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
@@ -0,0 +1,23 @@
+category metric task_max task_max_rate job_total
+cpu cpus 20 - -
+cpu sys 0.82 0.08 0.82
+cpu user 2.31 0.22 2.31
+cpu user+sys 3.13 0.30 3.13
+mem cache 23846912 - -
+mem pgmajfault 121 - 121
+mem rss 65470464 - -
+mem swap 0 - -
+net:eth0 rx 500762 951.15 500762
+net:eth0 tx 36242 226.61 36242
+net:eth0 tx+rx 537004 1177.76 537004
+# Number of tasks: 1
+# Max CPU time spent by a single task: 3.13s
+# Max CPU usage in a single interval: 29.89%
+# Overall CPU usage: 0%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 30% -- try runtime_constraints "min_cores_per_node":1
+#!! container max RSS was 63 MiB -- try runtime_constraints "min_ram_mb_per_node":972
diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py
index d060bec..6e8604c 100644
--- a/tools/crunchstat-summary/tests/test_examples.py
+++ b/tools/crunchstat-summary/tests/test_examples.py
@@ -59,6 +59,36 @@ class SummarizeEdgeCases(unittest.TestCase):
s.run()
+class SummarizeContainer(ReportDiff):
+ fake_container = {
+ 'uuid': '9tee4-dz642-mjfb0i5hzojp16a',
+ 'log': '9tee4-4zz18-ihyzym9tcwjwg4r',
+ }
+ fake_request = {
+ 'uuid': '9tee4-xvhdp-uper95jktm10d3w',
+ 'name': 'container',
+ 'container_uuid': fake_container['uuid'],
+ }
+ logfile = os.path.join(
+ TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz')
+
+ @mock.patch('arvados.collection.CollectionReader')
+ @mock.patch('arvados.api')
+ def test_container(self, mock_api, mock_cr):
+ mock_api().container_requests().index().execute.return_value = {'items':[]}
+ mock_api().container_requests().get().execute.return_value = self.fake_request
+ mock_api().containers().get().execute.return_value = self.fake_container
+ mock_cr().__iter__.return_value = [
+ 'crunch-run.txt', 'stderr.txt', 'node-info.txt',
+ 'container.json', 'crunchstat.txt']
+ mock_cr().open.return_value = gzip.open(self.logfile)
+ args = crunchstat_summary.command.ArgumentParser().parse_args(
+ ['--job', self.fake_request['uuid']])
+ cmd = crunchstat_summary.command.Command(args)
+ cmd.run()
+ self.diff_known_report(self.logfile, cmd)
+
+
class SummarizeJob(ReportDiff):
fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
fake_log_id = 'fake-log-collection-id'
commit 6699c983ac7c8506d17a60323ba71478ea2ac20d
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 15 11:21:44 2017 -0400
11309: 9001: Fix catch-all-exceptions antipattern.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 5692ba6..60f1f70 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -119,89 +119,88 @@ class Summarizer(object):
continue
task = self.tasks[task_id]
- try:
- if self.label is None:
+ if self.label is None:
+ try:
self.label = m.group('job_uuid')
- logger.debug('%s: using job uuid as label', self.label)
- if m.group('category').endswith(':'):
- # "stderr crunchstat: notice: ..."
- continue
- elif m.group('category') in ('error', 'caught'):
- continue
- elif m.group('category') == 'read':
- # "stderr crunchstat: read /proc/1234/net/dev: ..."
- # (crunchstat formatting fixed, but old logs still say this)
- continue
+ except IndexError:
+ self.label = 'container'
+ if m.group('category').endswith(':'):
+ # "stderr crunchstat: notice: ..."
+ continue
+ elif m.group('category') in ('error', 'caught'):
+ continue
+ elif m.group('category') == 'read':
+ # "stderr crunchstat: read /proc/1234/net/dev: ..."
+ # (crunchstat formatting fixed, but old logs still say this)
+ continue
- # Use the first and last crunchstat timestamps as
- # approximations of starttime and finishtime.
- timestamp = m.group('timestamp')
- if timestamp[10:11] == '_':
- timestamp = datetime.datetime.strptime(
- timestamp, '%Y-%m-%d_%H:%M:%S')
- elif timestamp[10:11] == 'T':
- timestamp = datetime.datetime.strptime(
- timestamp[:19], '%Y-%m-%dT%H:%M:%S')
- else:
- raise ValueError("Cannot parse timestamp {!r}".format(
- timestamp))
-
- if not task.starttime:
- task.starttime = timestamp
- logger.debug('%s: task %s starttime %s',
- self.label, task_id, timestamp)
- task.finishtime = timestamp
-
- if not self.starttime:
- self.starttime = timestamp
- self.finishtime = timestamp
-
- this_interval_s = None
- for group in ['current', 'interval']:
- if not m.group(group):
- continue
- category = m.group('category')
- words = m.group(group).split(' ')
- stats = {}
+ # Use the first and last crunchstat timestamps as
+ # approximations of starttime and finishtime.
+ timestamp = m.group('timestamp')
+ if timestamp[10:11] == '_':
+ timestamp = datetime.datetime.strptime(
+ timestamp, '%Y-%m-%d_%H:%M:%S')
+ elif timestamp[10:11] == 'T':
+ timestamp = datetime.datetime.strptime(
+ timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+ else:
+ raise ValueError("Cannot parse timestamp {!r}".format(
+ timestamp))
+
+ if not task.starttime:
+ task.starttime = timestamp
+ logger.debug('%s: task %s starttime %s',
+ self.label, task_id, timestamp)
+ task.finishtime = timestamp
+
+ if not self.starttime:
+ self.starttime = timestamp
+ self.finishtime = timestamp
+
+ this_interval_s = None
+ for group in ['current', 'interval']:
+ if not m.group(group):
+ continue
+ category = m.group('category')
+ words = m.group(group).split(' ')
+ stats = {}
+ try:
for val, stat in zip(words[::2], words[1::2]):
- try:
- if '.' in val:
- stats[stat] = float(val)
- else:
- stats[stat] = int(val)
- except ValueError as e:
- raise ValueError(
- 'Error parsing {} stat: {!r}'.format(
- stat, e))
- if 'user' in stats or 'sys' in stats:
- stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
- if 'tx' in stats or 'rx' in stats:
- stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
- for stat, val in stats.iteritems():
- if group == 'interval':
- if stat == 'seconds':
- this_interval_s = val
- continue
- elif not (this_interval_s > 0):
- logger.error(
- "BUG? interval stat given with duration {!r}".
- format(this_interval_s))
- continue
- else:
- stat = stat + '__rate'
- val = val / this_interval_s
- if stat in ['user+sys__rate', 'tx+rx__rate']:
- task.series[category, stat].append(
- (timestamp - self.starttime, val))
+ if '.' in val:
+ stats[stat] = float(val)
+ else:
+ stats[stat] = int(val)
+ except ValueError as e:
+ logger.warning('Error parsing {} stat: {!r}'.format(
+ stat, e))
+ continue
+ if 'user' in stats or 'sys' in stats:
+ stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
+ if 'tx' in stats or 'rx' in stats:
+ stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
+ for stat, val in stats.iteritems():
+ if group == 'interval':
+ if stat == 'seconds':
+ this_interval_s = val
+ continue
+ elif not (this_interval_s > 0):
+ logger.error(
+ "BUG? interval stat given with duration {!r}".
+ format(this_interval_s))
+ continue
else:
- if stat in ['rss']:
+ stat = stat + '__rate'
+ val = val / this_interval_s
+ if stat in ['user+sys__rate', 'tx+rx__rate']:
task.series[category, stat].append(
(timestamp - self.starttime, val))
- self.task_stats[task_id][category][stat] = val
- if val > self.stats_max[category][stat]:
- self.stats_max[category][stat] = val
- except Exception as e:
- logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
+ else:
+ if stat in ['rss']:
+ task.series[category, stat].append(
+ (timestamp - self.starttime, val))
+ self.task_stats[task_id][category][stat] = val
+ if val > self.stats_max[category][stat]:
+ self.stats_max[category][stat] = val
logger.debug('%s: done parsing', self.label)
self.job_tot = collections.defaultdict(
commit 0a4e2d2940214cf91d9333f7a49b73159ee34d3a
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Aug 14 16:40:55 2017 -0400
11309: Accommodate crunch2 crunchstat log format.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index cfc01da..5692ba6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -105,9 +105,19 @@ class Summarizer(object):
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
- if not m:
- continue
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if m:
+ # crunch1 job
+ task_id = self.seq_to_uuid[int(m.group('seq'))]
+ else:
+ m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ if m:
+ # crunch2 container (seq/task don't apply)
+ task_id = 'container'
+ else:
+ # not a crunchstat log
+ continue
+ task = self.tasks[task_id]
try:
if self.label is None:
@@ -122,13 +132,20 @@ class Summarizer(object):
# "stderr crunchstat: read /proc/1234/net/dev: ..."
# (crunchstat formatting fixed, but old logs still say this)
continue
- task_id = self.seq_to_uuid[int(m.group('seq'))]
- task = self.tasks[task_id]
# Use the first and last crunchstat timestamps as
# approximations of starttime and finishtime.
- timestamp = datetime.datetime.strptime(
- m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+ timestamp = m.group('timestamp')
+ if timestamp[10:11] == '_':
+ timestamp = datetime.datetime.strptime(
+ timestamp, '%Y-%m-%d_%H:%M:%S')
+ elif timestamp[10:11] == 'T':
+ timestamp = datetime.datetime.strptime(
+ timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+ else:
+ raise ValueError("Cannot parse timestamp {!r}".format(
+ timestamp))
+
if not task.starttime:
task.starttime = timestamp
logger.debug('%s: task %s starttime %s',
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list