[ARVADOS] updated: cd42a47667cb6876acf7a557d1cde96fc214bf47

Git user git at public.curoverse.com
Tue Aug 15 18:20:29 EDT 2017


Summary of changes:
 lib/crunchstat/crunchstat.go                       |  10 +-
 .../crunchstat_summary/command.py                  |   4 +
 .../crunchstat_summary/summarizer.py               | 362 +++++++++++++--------
 ...r_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz | Bin 0 -> 622 bytes
 ...-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report |  23 ++
 tools/crunchstat-summary/tests/test_examples.py    |  30 ++
 6 files changed, 282 insertions(+), 147 deletions(-)
 create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz
 create mode 100644 tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report

       via  cd42a47667cb6876acf7a557d1cde96fc214bf47 (commit)
       via  68fee3aff2bc0e189827956720de58c3e8668bdc (commit)
       via  6699c983ac7c8506d17a60323ba71478ea2ac20d (commit)
       via  0a4e2d2940214cf91d9333f7a49b73159ee34d3a (commit)
      from  cf0656348eb1e4c5858ca944da6e9448cd22a894 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit cd42a47667cb6876acf7a557d1cde96fc214bf47
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Aug 15 16:29:48 2017 -0400

    11309: Prefix errors with "notice:" or "warning:" to aid parsing.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>

diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
index f4915c0..056ef0d 100644
--- a/lib/crunchstat/crunchstat.go
+++ b/lib/crunchstat/crunchstat.go
@@ -93,7 +93,7 @@ func (r *Reporter) Stop() {
 func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
 	content, err := ioutil.ReadAll(in)
 	if err != nil {
-		r.Logger.Print(err)
+		r.Logger.Printf("warning: %v", err)
 	}
 	return content, err
 }
@@ -169,7 +169,7 @@ func (r *Reporter) getContainerNetStats() (io.Reader, error) {
 		statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
 		stats, err := ioutil.ReadFile(statsFilename)
 		if err != nil {
-			r.Logger.Print(err)
+			r.Logger.Printf("notice: %v", err)
 			continue
 		}
 		return strings.NewReader(string(stats)), nil
@@ -416,7 +416,7 @@ func (r *Reporter) waitForCIDFile() bool {
 		select {
 		case <-ticker.C:
 		case <-r.done:
-			r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+			r.Logger.Printf("warning: CID never appeared in %+q: %v", r.CIDFile, err)
 			return false
 		}
 	}
@@ -439,9 +439,9 @@ func (r *Reporter) waitForCgroup() bool {
 		select {
 		case <-ticker.C:
 		case <-warningTimer:
-			r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+			r.Logger.Printf("warning: cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
 		case <-r.done:
-			r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+			r.Logger.Printf("warning: cgroup stats files never appeared for %v", r.CID)
 			return false
 		}
 	}

commit 68fee3aff2bc0e189827956720de58c3e8668bdc
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Aug 15 16:26:18 2017 -0400

    11309: Fix parsing and labels. Add --threads option.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>

diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index 046f01c..a70e4b2 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -34,6 +34,9 @@ class ArgumentParser(argparse.ArgumentParser):
             '--format', type=str, choices=('html', 'text'), default='text',
             help='Report format')
         self.add_argument(
+            '--threads', type=int, default=8,
+            help='Maximum worker threads to run')
+        self.add_argument(
             '--verbose', '-v', action='count', default=0,
             help='Log more information (once for progress, twice for debug)')
 
@@ -46,6 +49,7 @@ class Command(object):
     def run(self):
         kwargs = {
             'skip_child_jobs': self.args.skip_child_jobs,
+            'threads': self.args.threads,
         }
         if self.args.pipeline_instance:
             self.summer = summarizer.NewSummarizer(self.args.pipeline_instance, **kwargs)
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 60f1f70..e8a842d 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -41,9 +41,10 @@ class Task(object):
 
 
 class Summarizer(object):
-    def __init__(self, logdata, label=None, skip_child_jobs=False):
+    def __init__(self, logdata, label=None, skip_child_jobs=False, uuid=None, **kwargs):
         self._logdata = logdata
 
+        self.uuid = uuid
         self.label = label
         self.starttime = None
         self.finishtime = None
@@ -69,55 +70,55 @@ class Summarizer(object):
 
     def run(self):
         logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+        detected_crunch1 = False
         for line in self._logdata:
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
-            if m:
-                seq = int(m.group('seq'))
-                uuid = m.group('task_uuid')
-                self.seq_to_uuid[seq] = uuid
-                logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
-                continue
+            if not detected_crunch1 and '-8i9sb-' in line:
+                detected_crunch1 = True
 
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
-            if m:
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-                elapsed = int(m.group('elapsed'))
-                self.task_stats[task_id]['time'] = {'elapsed': elapsed}
-                if elapsed > self.stats_max['time']['elapsed']:
-                    self.stats_max['time']['elapsed'] = elapsed
-                continue
+            if detected_crunch1:
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
+                if m:
+                    seq = int(m.group('seq'))
+                    uuid = m.group('task_uuid')
+                    self.seq_to_uuid[seq] = uuid
+                    logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+                    continue
 
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
-            if m:
-                uuid = m.group('uuid')
-                if self._skip_child_jobs:
-                    logger.warning('%s: omitting stats from child job %s'
-                                   ' because --skip-child-jobs flag is on',
-                                   self.label, uuid)
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+                if m:
+                    task_id = self.seq_to_uuid[int(m.group('seq'))]
+                    elapsed = int(m.group('elapsed'))
+                    self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+                    if elapsed > self.stats_max['time']['elapsed']:
+                        self.stats_max['time']['elapsed'] = elapsed
                     continue
-                logger.debug('%s: follow %s', self.label, uuid)
-                child_summarizer = ProcessSummarizer(uuid)
-                child_summarizer.stats_max = self.stats_max
-                child_summarizer.task_stats = self.task_stats
-                child_summarizer.tasks = self.tasks
-                child_summarizer.starttime = self.starttime
-                child_summarizer.run()
-                logger.debug('%s: done %s', self.label, uuid)
-                continue
 
-            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-            if m:
-                # crunch1 job
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
+                m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+                if m:
+                    uuid = m.group('uuid')
+                    if self._skip_child_jobs:
+                        logger.warning('%s: omitting stats from child job %s'
+                                       ' because --skip-child-jobs flag is on',
+                                       self.label, uuid)
+                        continue
+                    logger.debug('%s: follow %s', self.label, uuid)
+                    child_summarizer = ProcessSummarizer(uuid)
+                    child_summarizer.stats_max = self.stats_max
+                    child_summarizer.task_stats = self.task_stats
+                    child_summarizer.tasks = self.tasks
+                    child_summarizer.starttime = self.starttime
+                    child_summarizer.run()
+                    logger.debug('%s: done %s', self.label, uuid)
+                    continue
+
+                m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                if not m:
+                    continue
             else:
+                # crunch2
                 m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
-                if m:
-                    # crunch2 container (seq/task don't apply)
-                    task_id = 'container'
-                else:
-                    # not a crunchstat log
+                if not m:
                     continue
-            task = self.tasks[task_id]
 
             if self.label is None:
                 try:
@@ -129,11 +130,17 @@ class Summarizer(object):
                 continue
             elif m.group('category') in ('error', 'caught'):
                 continue
-            elif m.group('category') == 'read':
+            elif m.group('category') in ['read', 'open', 'cgroup', 'CID']:
                 # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                # (crunchstat formatting fixed, but old logs still say this)
+                # (old logs are less careful with unprefixed error messages)
                 continue
 
+            if detected_crunch1:
+                task_id = self.seq_to_uuid[int(m.group('seq'))]
+            else:
+                task_id = 'container'
+            task = self.tasks[task_id]
+
             # Use the first and last crunchstat timestamps as
             # approximations of starttime and finishtime.
             timestamp = m.group('timestamp')
@@ -410,47 +417,47 @@ def NewSummarizer(process, **kwargs):
         process = None
         arv = arvados.api('v1', model=OrderedJsonModel())
 
-    if re.search('-dz642-', uuid):
+    if '-dz642-' in uuid:
         if process is None:
             process = arv.containers().get(uuid=uuid).execute()
-        return ContainerSummarizer(process, **kwargs)
-    elif re.search('-xvhdp-', uuid):
+        klass = ContainerTreeSummarizer
+    elif '-xvhdp-' in uuid:
         if process is None:
-            ctrReq = arv.container_requests().get(uuid=uuid).execute()
-            ctrUUID = ctrReq['container_uuid']
-            process = arv.containers().get(uuid=ctrUUID).execute()
-        return ContainerSummarizer(process, **kwargs)
-    elif re.search('-8i9sb-', uuid):
+            process = arv.container_requests().get(uuid=uuid).execute()
+        klass = ContainerTreeSummarizer
+    elif '-8i9sb-' in uuid:
         if process is None:
             process = arv.jobs().get(uuid=uuid).execute()
-        return JobSummarizer(process, **kwargs)
-    elif re.search('-d1hrv-', uuid):
+        klass = JobSummarizer
+    elif '-d1hrv-' in uuid:
         if process is None:
             process = arv.pipeline_instances().get(uuid=uuid).execute()
-        return PipelineSummarizer(process, **kwargs)
+        klass = PipelineSummarizer
+    elif '-4zz18-' in uuid:
+        return CollectionSummarizer(collection_id=uuid)
     else:
         raise ArgumentError("Unrecognized uuid %s", uuid)
+    return klass(process, uuid=uuid, **kwargs)
 
 
 class ProcessSummarizer(Summarizer):
     """Process is a job, pipeline, container, or container request."""
 
-    def __init__(self, process, **kwargs):
+    def __init__(self, process, label=None, **kwargs):
         rdr = None
         self.process = process
+        if label is None:
+            label = self.process.get('name', self.process['uuid'])
         if self.process.get('log'):
             try:
                 rdr = crunchstat_summary.reader.CollectionReader(self.process['log'])
             except arvados.errors.NotFoundError as e:
                 logger.warning("Trying event logs after failing to read "
                                "log collection %s: %s", self.process['log'], e)
-            else:
-                label = self.process['uuid']
         if rdr is None:
             rdr = crunchstat_summary.reader.LiveLogReader(self.process['uuid'])
-            label = self.process['uuid'] + ' (partial)'
-        super(ProcessSummarizer, self).__init__(rdr, **kwargs)
-        self.label = label
+            label = label + ' (partial)'
+        super(ProcessSummarizer, self).__init__(rdr, label=label, **kwargs)
         self.existing_constraints = self.process.get('runtime_constraints', {})
 
 
@@ -462,26 +469,23 @@ class ContainerSummarizer(ProcessSummarizer):
     pass
 
 
-class PipelineSummarizer(object):
-    def __init__(self, instance, **kwargs):
-        self.summarizers = collections.OrderedDict()
-        for cname, component in instance['components'].iteritems():
-            if 'job' not in component:
-                logger.warning(
-                    "%s: skipping component with no job assigned", cname)
-            else:
-                logger.info(
-                    "%s: job %s", cname, component['job']['uuid'])
-                summarizer = JobSummarizer(component['job'], **kwargs)
-                summarizer.label = '{} {}'.format(
-                    cname, component['job']['uuid'])
-                self.summarizers[cname] = summarizer
-        self.label = instance['uuid']
+class MultiSummarizer(object):
+    def __init__(self, children={}, label=None, threads=1, **kwargs):
+        self.throttle = threading.Semaphore(threads)
+        self.children = children
+        self.label = label
+
+    def run_and_release(self, target, *args, **kwargs):
+        try:
+            return target(*args, **kwargs)
+        finally:
+            self.throttle.release()
 
     def run(self):
         threads = []
-        for summarizer in self.summarizers.itervalues():
-            t = threading.Thread(target=summarizer.run)
+        for child in self.children.itervalues():
+            self.throttle.acquire()
+            t = threading.Thread(target=self.run_and_release, args=(child.run, ))
             t.daemon = True
             t.start()
             threads.append(t)
@@ -490,12 +494,70 @@ class PipelineSummarizer(object):
 
     def text_report(self):
         txt = ''
-        for cname, summarizer in self.summarizers.iteritems():
-            txt += '### Summary for {} ({})\n'.format(
-                cname, summarizer.process['uuid'])
-            txt += summarizer.text_report()
+        for cname, child in self.children.iteritems():
+            if len(self.children) > 1:
+                txt += '### Summary for {} ({})\n'.format(
+                    cname, child.process['uuid'])
+            txt += child.text_report()
             txt += '\n'
         return txt
 
     def html_report(self):
-        return WEBCHART_CLASS(self.label, self.summarizers.itervalues()).html()
+        return WEBCHART_CLASS(self.label, self.children.itervalues()).html()
+
+
+class PipelineSummarizer(MultiSummarizer):
+    def __init__(self, instance, **kwargs):
+        children = collections.OrderedDict()
+        for cname, component in instance['components'].iteritems():
+            if 'job' not in component:
+                logger.warning(
+                    "%s: skipping component with no job assigned", cname)
+            else:
+                logger.info(
+                    "%s: job %s", cname, component['job']['uuid'])
+                summarizer = JobSummarizer(component['job'], **kwargs)
+                summarizer.label = '{} {}'.format(
+                    cname, component['job']['uuid'])
+                children[cname] = summarizer
+        super(PipelineSummarizer, self).__init__(
+            children=children,
+            label=instance['uuid'],
+            **kwargs)
+
+
+class ContainerTreeSummarizer(MultiSummarizer):
+    def __init__(self, root, **kwargs):
+        arv = arvados.api('v1', model=OrderedJsonModel())
+
+        label = kwargs.pop('label', None) or root.get('name') or root['uuid']
+        root['name'] = label
+
+        children = collections.OrderedDict()
+        todo = collections.deque((root, ))
+        while len(todo) > 0:
+            current = todo.popleft()
+            label = current['name']
+            if current['uuid'].find('-xvhdp-') > 0:
+                current = arv.containers().get(uuid=current['container_uuid']).execute()
+            children[current['uuid']] = ContainerSummarizer(
+                current, label=label, **kwargs)
+            page_filters = []
+            while True:
+                items = arv.container_requests().index(
+                    order=['uuid asc'],
+                    filters=page_filters+[
+                        ['requesting_container_uuid', '=', current['uuid']]],
+                ).execute()['items']
+                if not items:
+                    break
+                page_filters = [['uuid', '>', items[-1]['uuid']]]
+                for cr in items:
+                    if cr['container_uuid']:
+                        logger.debug('%s: container req %s', current['uuid'], cr['uuid'])
+                        cr['name'] = label + ' / ' + (cr.get('name') or cr['uuid'])
+                        todo.append(cr)
+        super(ContainerTreeSummarizer, self).__init__(
+            children=children,
+            label=root['name'],
+            **kwargs)
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz
new file mode 100644
index 0000000..8b069e7
Binary files /dev/null and b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz differ
diff --git a/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
new file mode 100644
index 0000000..daabb32
--- /dev/null
+++ b/tools/crunchstat-summary/tests/container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz.report
@@ -0,0 +1,23 @@
+category	metric	task_max	task_max_rate	job_total
+cpu	cpus	20	-	-
+cpu	sys	0.82	0.08	0.82
+cpu	user	2.31	0.22	2.31
+cpu	user+sys	3.13	0.30	3.13
+mem	cache	23846912	-	-
+mem	pgmajfault	121	-	121
+mem	rss	65470464	-	-
+mem	swap	0	-	-
+net:eth0	rx	500762	951.15	500762
+net:eth0	tx	36242	226.61	36242
+net:eth0	tx+rx	537004	1177.76	537004
+# Number of tasks: 1
+# Max CPU time spent by a single task: 3.13s
+# Max CPU usage in a single interval: 29.89%
+# Overall CPU usage: 0%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 30% -- try runtime_constraints "min_cores_per_node":1
+#!! container max RSS was 63 MiB -- try runtime_constraints "min_ram_mb_per_node":972
diff --git a/tools/crunchstat-summary/tests/test_examples.py b/tools/crunchstat-summary/tests/test_examples.py
index d060bec..6e8604c 100644
--- a/tools/crunchstat-summary/tests/test_examples.py
+++ b/tools/crunchstat-summary/tests/test_examples.py
@@ -59,6 +59,36 @@ class SummarizeEdgeCases(unittest.TestCase):
         s.run()
 
 
+class SummarizeContainer(ReportDiff):
+    fake_container = {
+        'uuid': '9tee4-dz642-mjfb0i5hzojp16a',
+        'log': '9tee4-4zz18-ihyzym9tcwjwg4r',
+    }
+    fake_request = {
+        'uuid': '9tee4-xvhdp-uper95jktm10d3w',
+        'name': 'container',
+        'container_uuid': fake_container['uuid'],
+    }
+    logfile = os.path.join(
+        TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz')
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_container(self, mock_api, mock_cr):
+        mock_api().container_requests().index().execute.return_value = {'items':[]}
+        mock_api().container_requests().get().execute.return_value = self.fake_request
+        mock_api().containers().get().execute.return_value = self.fake_container
+        mock_cr().__iter__.return_value = [
+            'crunch-run.txt', 'stderr.txt', 'node-info.txt',
+            'container.json', 'crunchstat.txt']
+        mock_cr().open.return_value = gzip.open(self.logfile)
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--job', self.fake_request['uuid']])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+        self.diff_known_report(self.logfile, cmd)
+
+
 class SummarizeJob(ReportDiff):
     fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
     fake_log_id = 'fake-log-collection-id'

commit 6699c983ac7c8506d17a60323ba71478ea2ac20d
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Aug 15 11:21:44 2017 -0400

    11309: 9001: Fix catch-all-exceptions antipattern.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 5692ba6..60f1f70 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -119,89 +119,88 @@ class Summarizer(object):
                     continue
             task = self.tasks[task_id]
 
-            try:
-                if self.label is None:
+            if self.label is None:
+                try:
                     self.label = m.group('job_uuid')
-                    logger.debug('%s: using job uuid as label', self.label)
-                if m.group('category').endswith(':'):
-                    # "stderr crunchstat: notice: ..."
-                    continue
-                elif m.group('category') in ('error', 'caught'):
-                    continue
-                elif m.group('category') == 'read':
-                    # "stderr crunchstat: read /proc/1234/net/dev: ..."
-                    # (crunchstat formatting fixed, but old logs still say this)
-                    continue
+                except IndexError:
+                    self.label = 'container'
+            if m.group('category').endswith(':'):
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif m.group('category') in ('error', 'caught'):
+                continue
+            elif m.group('category') == 'read':
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (crunchstat formatting fixed, but old logs still say this)
+                continue
 
-                # Use the first and last crunchstat timestamps as
-                # approximations of starttime and finishtime.
-                timestamp = m.group('timestamp')
-                if timestamp[10:11] == '_':
-                    timestamp = datetime.datetime.strptime(
-                        timestamp, '%Y-%m-%d_%H:%M:%S')
-                elif timestamp[10:11] == 'T':
-                    timestamp = datetime.datetime.strptime(
-                        timestamp[:19], '%Y-%m-%dT%H:%M:%S')
-                else:
-                    raise ValueError("Cannot parse timestamp {!r}".format(
-                        timestamp))
-
-                if not task.starttime:
-                    task.starttime = timestamp
-                    logger.debug('%s: task %s starttime %s',
-                                 self.label, task_id, timestamp)
-                task.finishtime = timestamp
-
-                if not self.starttime:
-                    self.starttime = timestamp
-                self.finishtime = timestamp
-
-                this_interval_s = None
-                for group in ['current', 'interval']:
-                    if not m.group(group):
-                        continue
-                    category = m.group('category')
-                    words = m.group(group).split(' ')
-                    stats = {}
+            # Use the first and last crunchstat timestamps as
+            # approximations of starttime and finishtime.
+            timestamp = m.group('timestamp')
+            if timestamp[10:11] == '_':
+                timestamp = datetime.datetime.strptime(
+                    timestamp, '%Y-%m-%d_%H:%M:%S')
+            elif timestamp[10:11] == 'T':
+                timestamp = datetime.datetime.strptime(
+                    timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+            else:
+                raise ValueError("Cannot parse timestamp {!r}".format(
+                    timestamp))
+
+            if not task.starttime:
+                task.starttime = timestamp
+                logger.debug('%s: task %s starttime %s',
+                             self.label, task_id, timestamp)
+            task.finishtime = timestamp
+
+            if not self.starttime:
+                self.starttime = timestamp
+            self.finishtime = timestamp
+
+            this_interval_s = None
+            for group in ['current', 'interval']:
+                if not m.group(group):
+                    continue
+                category = m.group('category')
+                words = m.group(group).split(' ')
+                stats = {}
+                try:
                     for val, stat in zip(words[::2], words[1::2]):
-                        try:
-                            if '.' in val:
-                                stats[stat] = float(val)
-                            else:
-                                stats[stat] = int(val)
-                        except ValueError as e:
-                            raise ValueError(
-                                'Error parsing {} stat: {!r}'.format(
-                                    stat, e))
-                    if 'user' in stats or 'sys' in stats:
-                        stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
-                    if 'tx' in stats or 'rx' in stats:
-                        stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
-                    for stat, val in stats.iteritems():
-                        if group == 'interval':
-                            if stat == 'seconds':
-                                this_interval_s = val
-                                continue
-                            elif not (this_interval_s > 0):
-                                logger.error(
-                                    "BUG? interval stat given with duration {!r}".
-                                    format(this_interval_s))
-                                continue
-                            else:
-                                stat = stat + '__rate'
-                                val = val / this_interval_s
-                                if stat in ['user+sys__rate', 'tx+rx__rate']:
-                                    task.series[category, stat].append(
-                                        (timestamp - self.starttime, val))
+                        if '.' in val:
+                            stats[stat] = float(val)
+                        else:
+                            stats[stat] = int(val)
+                except ValueError as e:
+                    logger.warning('Error parsing {} stat: {!r}'.format(
+                        stat, e))
+                    continue
+                if 'user' in stats or 'sys' in stats:
+                    stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
+                if 'tx' in stats or 'rx' in stats:
+                    stats['tx+rx'] = stats.get('tx', 0) + stats.get('rx', 0)
+                for stat, val in stats.iteritems():
+                    if group == 'interval':
+                        if stat == 'seconds':
+                            this_interval_s = val
+                            continue
+                        elif not (this_interval_s > 0):
+                            logger.error(
+                                "BUG? interval stat given with duration {!r}".
+                                format(this_interval_s))
+                            continue
                         else:
-                            if stat in ['rss']:
+                            stat = stat + '__rate'
+                            val = val / this_interval_s
+                            if stat in ['user+sys__rate', 'tx+rx__rate']:
                                 task.series[category, stat].append(
                                     (timestamp - self.starttime, val))
-                            self.task_stats[task_id][category][stat] = val
-                        if val > self.stats_max[category][stat]:
-                            self.stats_max[category][stat] = val
-            except Exception as e:
-                logger.info('Skipping malformed line: {}Error was: {}\n'.format(line, e))
+                    else:
+                        if stat in ['rss']:
+                            task.series[category, stat].append(
+                                (timestamp - self.starttime, val))
+                        self.task_stats[task_id][category][stat] = val
+                    if val > self.stats_max[category][stat]:
+                        self.stats_max[category][stat] = val
         logger.debug('%s: done parsing', self.label)
 
         self.job_tot = collections.defaultdict(

commit 0a4e2d2940214cf91d9333f7a49b73159ee34d3a
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Aug 14 16:40:55 2017 -0400

    11309: Accommodate crunch2 crunchstat log format.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index cfc01da..5692ba6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -105,9 +105,19 @@ class Summarizer(object):
                 logger.debug('%s: done %s', self.label, uuid)
                 continue
 
-            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
-            if not m:
-                continue
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+            if m:
+                # crunch1 job
+                task_id = self.seq_to_uuid[int(m.group('seq'))]
+            else:
+                m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+                if m:
+                    # crunch2 container (seq/task don't apply)
+                    task_id = 'container'
+                else:
+                    # not a crunchstat log
+                    continue
+            task = self.tasks[task_id]
 
             try:
                 if self.label is None:
@@ -122,13 +132,20 @@ class Summarizer(object):
                     # "stderr crunchstat: read /proc/1234/net/dev: ..."
                     # (crunchstat formatting fixed, but old logs still say this)
                     continue
-                task_id = self.seq_to_uuid[int(m.group('seq'))]
-                task = self.tasks[task_id]
 
                 # Use the first and last crunchstat timestamps as
                 # approximations of starttime and finishtime.
-                timestamp = datetime.datetime.strptime(
-                    m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+                timestamp = m.group('timestamp')
+                if timestamp[10:11] == '_':
+                    timestamp = datetime.datetime.strptime(
+                        timestamp, '%Y-%m-%d_%H:%M:%S')
+                elif timestamp[10:11] == 'T':
+                    timestamp = datetime.datetime.strptime(
+                        timestamp[:19], '%Y-%m-%dT%H:%M:%S')
+                else:
+                    raise ValueError("Cannot parse timestamp {!r}".format(
+                        timestamp))
+
                 if not task.starttime:
                     task.starttime = timestamp
                     logger.debug('%s: task %s starttime %s',

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list