[ARVADOS] updated: b691f2fad8598f12d21d87b772e6de3314ee2c1a

git at public.curoverse.com git at public.curoverse.com
Sun Feb 7 20:26:20 EST 2016


Summary of changes:
 tools/crunchstat-summary/crunchstat_summary/command.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

  discards  886865621dc1abb5d5666dddc9e43866397e1f5d (commit)
  discards  584da9d4af5ce5928373c7166f320882649c1a6b (commit)
  discards  b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7 (commit)
       via  b691f2fad8598f12d21d87b772e6de3314ee2c1a (commit)
       via  11cc9a6f3f2c282979473db6afe7da5676f8f230 (commit)
       via  0cba5993c14a095f6edcbbbebb301caffa8914ec (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (886865621dc1abb5d5666dddc9e43866397e1f5d)
            \
             N -- N -- N (b691f2fad8598f12d21d87b772e6de3314ee2c1a)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b691f2fad8598f12d21d87b772e6de3314ee2c1a
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:19:45 2016 -0500

    8341: Move reader classes to reader.py.

diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+    def __init__(self, collection_id):
+        logger.debug('load collection %s', collection_id)
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._label = job_uuid
+        self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if len(self._buffer) == 0:
+            if not self._buffer_page():
+                raise StopIteration
+        return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
 import arvados
 import collections
 import crunchstat_summary.chartjs
+import crunchstat_summary.reader
 import datetime
 import functools
 import itertools
 import math
 import re
 import sys
-import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionReader(object):
-    def __init__(self, collection_id):
-        logger.debug('load collection %s', collection_id)
-        collection = arvados.collection.CollectionReader(collection_id)
-        filenames = [filename for filename in collection]
-        if len(filenames) != 1:
-            raise ValueError(
-                "collection {} has {} files; need exactly one".format(
-                    collection_id, len(filenames)))
-        self._reader = collection.open(filenames[0])
-
-    def __iter__(self):
-        return iter(self._reader)
-
-
-class LiveLogReader(object):
-    def __init__(self, job_uuid):
-        logger.debug('load stderr events for job %s', job_uuid)
-        self._filters = [
-            ['object_uuid', '=', job_uuid],
-            ['event_type', '=', 'stderr']]
-        self._buffer = collections.deque()
-        self._got = 0
-        self._label = job_uuid
-        self._last_id = 0
-        self._start_getting_next_page()
-
-    def _start_getting_next_page(self):
-        self._thread = threading.Thread(target=self._get_next_page)
-        self._thread.daemon = True
-        self._thread.start()
-
-    def _get_next_page(self):
-        page = arvados.api().logs().index(
-            limit=1000,
-            order=['id asc'],
-            filters=self._filters + [['id','>',str(self._last_id)]],
-        ).execute()
-        self._got += len(page['items'])
-        logger.debug(
-            '%s: received %d of %d log events',
-            self._label, self._got,
-            self._got + page['items_available'] - len(page['items']))
-        self._page = page
-
-    def _buffer_page(self):
-        """Wait for current worker, copy results to _buffer, start next worker.
-
-        Return True if anything was added to the buffer."""
-        if self._thread is None:
-            return False
-        self._thread.join()
-        self._thread = None
-        page = self._page
-        if len(page['items']) == 0:
-            return False
-        if len(page['items']) < page['items_available']:
-            self._start_getting_next_page()
-        for i in page['items']:
-            for line in i['properties']['text'].split('\n'):
-                self._buffer.append(line)
-            self._last_id = i['id']
-        return True
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        if len(self._buffer) == 0:
-            if not self._buffer_page():
-                raise StopIteration
-        return self._buffer.popleft() + '\n'
-
-
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            CollectionReader(collection_id), **kwargs)
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
         else:
             self.job = job
         if self.job['log']:
-            rdr = CollectionReader(self.job['log'])
+            rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
             label = self.job['uuid']
         else:
-            rdr = LiveLogReader(self.job['uuid'])
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
             label = self.job['uuid'] + ' (partial)'
         super(JobSummarizer, self).__init__(rdr, **kwargs)
         self.label = label

commit 11cc9a6f3f2c282979473db6afe7da5676f8f230
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:15:00 2016 -0500

    8341: Use a worker thread to get page N+1 of logs while parsing page N.

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
 import math
 import re
 import sys
+import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
             ['event_type', '=', 'stderr']]
         self._buffer = collections.deque()
         self._got = 0
-        self._got_all = False
         self._label = job_uuid
         self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
 
     def __iter__(self):
         return self
 
     def next(self):
-        if self._buffer is None:
-            raise StopIteration
         if len(self._buffer) == 0:
-            if self._got_all:
+            if not self._buffer_page():
                 raise StopIteration
-            page = arvados.api().logs().index(
-                limit=1000,
-                order=['id asc'],
-                filters=self._filters + [['id','>',str(self._last_id)]],
-            ).execute()
-            self._got += len(page['items'])
-            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
-            if len(page['items']) == 0:
-                self._got_all = True
-                self._buffer = None
-                raise StopIteration
-            elif len(page['items']) == page['items_available']:
-                # Don't try to fetch any more after this page
-                self._got_all = True
-            for i in page['items']:
-                for line in i['properties']['text'].split('\n'):
-                    self._buffer.append(line)
-                self._last_id = i['id']
         return self._buffer.popleft() + '\n'
 
 

commit 0cba5993c14a095f6edcbbbebb301caffa8914ec
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 19:43:02 2016 -0500

    8341: Get job log from logs API if the log has not been written to Keep yet.

diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index a9dfc83..78638c6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -13,7 +13,8 @@ class ArgumentParser(argparse.ArgumentParser):
         src = self.add_mutually_exclusive_group()
         src.add_argument(
             '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            help='Look up the specified job and read its log data from Keep'
+            ' (or from the Arvados event log, if the job is still running)')
         src.add_argument(
             '--pipeline-instance', type=str, metavar='UUID',
             help='Summarize each component of the given pipeline instance')
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
                 logger.debug('%s: done %s', self.label, uuid)
                 continue
 
-            m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
 
@@ -327,8 +327,8 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionSummarizer(Summarizer):
-    def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+    def __init__(self, collection_id):
         logger.debug('load collection %s', collection_id)
         collection = arvados.collection.CollectionReader(collection_id)
         filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
             raise ValueError(
                 "collection {} has {} files; need exactly one".format(
                     collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._got_all = False
+        self._label = job_uuid
+        self._last_id = 0
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self._buffer is None:
+            raise StopIteration
+        if len(self._buffer) == 0:
+            if self._got_all:
+                raise StopIteration
+            page = arvados.api().logs().index(
+                limit=1000,
+                order=['id asc'],
+                filters=self._filters + [['id','>',str(self._last_id)]],
+            ).execute()
+            self._got += len(page['items'])
+            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+            if len(page['items']) == 0:
+                self._got_all = True
+                self._buffer = None
+                raise StopIteration
+            elif len(page['items']) == page['items_available']:
+                # Don't try to fetch any more after this page
+                self._got_all = True
+            for i in page['items']:
+                for line in i['properties']['text'].split('\n'):
+                    self._buffer.append(line)
+                self._last_id = i['id']
+        return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            collection.open(filenames[0]), **kwargs)
+            CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
     def __init__(self, job, **kwargs):
         arv = arvados.api('v1')
         if isinstance(job, basestring):
             self.job = arv.jobs().get(uuid=job).execute()
         else:
             self.job = job
-        if not self.job['log']:
-            raise ValueError(
-                "job {} has no log; live summary not implemented".format(
-                    self.job['uuid']))
-        super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
-        self.label = self.job['uuid']
+        if self.job['log']:
+            rdr = CollectionReader(self.job['log'])
+            label = self.job['uuid']
+        else:
+            rdr = LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
         self.existing_constraints = self.job.get('runtime_constraints', {})
 
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list