[ARVADOS] updated: 886865621dc1abb5d5666dddc9e43866397e1f5d

git at public.curoverse.com git at public.curoverse.com
Sun Feb 7 20:21:33 EST 2016


Summary of changes:
 .../crunchstat_summary/reader.py                   | 81 ++++++++++++++++++++++
 .../crunchstat_summary/summarizer.py               | 61 ++--------------
 2 files changed, 85 insertions(+), 57 deletions(-)
 create mode 100644 tools/crunchstat-summary/crunchstat_summary/reader.py

  discards  3d625ef22b42affa956f48948351bc60dd4298ac (commit)
       via  886865621dc1abb5d5666dddc9e43866397e1f5d (commit)
       via  584da9d4af5ce5928373c7166f320882649c1a6b (commit)
       via  b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (3d625ef22b42affa956f48948351bc60dd4298ac)
            \
             N -- N -- N (886865621dc1abb5d5666dddc9e43866397e1f5d)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 886865621dc1abb5d5666dddc9e43866397e1f5d
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:19:45 2016 -0500

    8341: Move reader classes to reader.py.

diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+    def __init__(self, collection_id):
+        logger.debug('load collection %s', collection_id)
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._label = job_uuid
+        self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if len(self._buffer) == 0:
+            if not self._buffer_page():
+                raise StopIteration
+        return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
 import arvados
 import collections
 import crunchstat_summary.chartjs
+import crunchstat_summary.reader
 import datetime
 import functools
 import itertools
 import math
 import re
 import sys
-import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionReader(object):
-    def __init__(self, collection_id):
-        logger.debug('load collection %s', collection_id)
-        collection = arvados.collection.CollectionReader(collection_id)
-        filenames = [filename for filename in collection]
-        if len(filenames) != 1:
-            raise ValueError(
-                "collection {} has {} files; need exactly one".format(
-                    collection_id, len(filenames)))
-        self._reader = collection.open(filenames[0])
-
-    def __iter__(self):
-        return iter(self._reader)
-
-
-class LiveLogReader(object):
-    def __init__(self, job_uuid):
-        logger.debug('load stderr events for job %s', job_uuid)
-        self._filters = [
-            ['object_uuid', '=', job_uuid],
-            ['event_type', '=', 'stderr']]
-        self._buffer = collections.deque()
-        self._got = 0
-        self._label = job_uuid
-        self._last_id = 0
-        self._start_getting_next_page()
-
-    def _start_getting_next_page(self):
-        self._thread = threading.Thread(target=self._get_next_page)
-        self._thread.daemon = True
-        self._thread.start()
-
-    def _get_next_page(self):
-        page = arvados.api().logs().index(
-            limit=1000,
-            order=['id asc'],
-            filters=self._filters + [['id','>',str(self._last_id)]],
-        ).execute()
-        self._got += len(page['items'])
-        logger.debug(
-            '%s: received %d of %d log events',
-            self._label, self._got,
-            self._got + page['items_available'] - len(page['items']))
-        self._page = page
-
-    def _buffer_page(self):
-        """Wait for current worker, copy results to _buffer, start next worker.
-
-        Return True if anything was added to the buffer."""
-        if self._thread is None:
-            return False
-        self._thread.join()
-        self._thread = None
-        page = self._page
-        if len(page['items']) == 0:
-            return False
-        if len(page['items']) < page['items_available']:
-            self._start_getting_next_page()
-        for i in page['items']:
-            for line in i['properties']['text'].split('\n'):
-                self._buffer.append(line)
-            self._last_id = i['id']
-        return True
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        if len(self._buffer) == 0:
-            if not self._buffer_page():
-                raise StopIteration
-        return self._buffer.popleft() + '\n'
-
-
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            CollectionReader(collection_id), **kwargs)
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
         else:
             self.job = job
         if self.job['log']:
-            rdr = CollectionReader(self.job['log'])
+            rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
             label = self.job['uuid']
         else:
-            rdr = LiveLogReader(self.job['uuid'])
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
             label = self.job['uuid'] + ' (partial)'
         super(JobSummarizer, self).__init__(rdr, **kwargs)
         self.label = label

commit 584da9d4af5ce5928373c7166f320882649c1a6b
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:15:00 2016 -0500

    8341: Use a worker thread to get page N+1 of logs while parsing page N.

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
 import math
 import re
 import sys
+import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
             ['event_type', '=', 'stderr']]
         self._buffer = collections.deque()
         self._got = 0
-        self._got_all = False
         self._label = job_uuid
         self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
 
     def __iter__(self):
         return self
 
     def next(self):
-        if self._buffer is None:
-            raise StopIteration
         if len(self._buffer) == 0:
-            if self._got_all:
+            if not self._buffer_page():
                 raise StopIteration
-            page = arvados.api().logs().index(
-                limit=1000,
-                order=['id asc'],
-                filters=self._filters + [['id','>',str(self._last_id)]],
-            ).execute()
-            self._got += len(page['items'])
-            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
-            if len(page['items']) == 0:
-                self._got_all = True
-                self._buffer = None
-                raise StopIteration
-            elif len(page['items']) == page['items_available']:
-                # Don't try to fetch any more after this page
-                self._got_all = True
-            for i in page['items']:
-                for line in i['properties']['text'].split('\n'):
-                    self._buffer.append(line)
-                self._last_id = i['id']
         return self._buffer.popleft() + '\n'
 
 

commit b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 19:43:02 2016 -0500

    8341: Get job log from logs API if the log has not been written to Keep yet.

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
                 logger.debug('%s: done %s', self.label, uuid)
                 continue
 
-            m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
 
@@ -327,8 +327,8 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionSummarizer(Summarizer):
-    def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+    def __init__(self, collection_id):
         logger.debug('load collection %s', collection_id)
         collection = arvados.collection.CollectionReader(collection_id)
         filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
             raise ValueError(
                 "collection {} has {} files; need exactly one".format(
                     collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._got_all = False
+        self._label = job_uuid
+        self._last_id = 0
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self._buffer is None:
+            raise StopIteration
+        if len(self._buffer) == 0:
+            if self._got_all:
+                raise StopIteration
+            page = arvados.api().logs().index(
+                limit=1000,
+                order=['id asc'],
+                filters=self._filters + [['id','>',str(self._last_id)]],
+            ).execute()
+            self._got += len(page['items'])
+            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+            if len(page['items']) == 0:
+                self._got_all = True
+                self._buffer = None
+                raise StopIteration
+            elif len(page['items']) == page['items_available']:
+                # Don't try to fetch any more after this page
+                self._got_all = True
+            for i in page['items']:
+                for line in i['properties']['text'].split('\n'):
+                    self._buffer.append(line)
+                self._last_id = i['id']
+        return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            collection.open(filenames[0]), **kwargs)
+            CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
     def __init__(self, job, **kwargs):
         arv = arvados.api('v1')
         if isinstance(job, basestring):
             self.job = arv.jobs().get(uuid=job).execute()
         else:
             self.job = job
-        if not self.job['log']:
-            raise ValueError(
-                "job {} has no log; live summary not implemented".format(
-                    self.job['uuid']))
-        super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
-        self.label = self.job['uuid']
+        if self.job['log']:
+            rdr = CollectionReader(self.job['log'])
+            label = self.job['uuid']
+        else:
+            rdr = LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
         self.existing_constraints = self.job.get('runtime_constraints', {})
 
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list