[ARVADOS] updated: 6c950260b253d28e78579b493c8eb3eabe0add11

git at public.curoverse.com git at public.curoverse.com
Mon Feb 8 14:33:03 EST 2016


Summary of changes:
 .../crunchstat_summary/command.py                  |  3 +-
 .../crunchstat_summary/reader.py                   | 69 ++++++++++++++++++++++
 .../crunchstat_summary/summarizer.py               | 28 ++++-----
 3 files changed, 83 insertions(+), 17 deletions(-)
 create mode 100644 tools/crunchstat-summary/crunchstat_summary/reader.py

       via  6c950260b253d28e78579b493c8eb3eabe0add11 (commit)
       via  d7e8f7c787b7706937f95c3ed2a5086616d48514 (commit)
       via  3e7037e6383f57b0d1b4b627cd9feb27f05af13d (commit)
       via  5bda5bf3aedc0621abeed901a608adb7db6030e6 (commit)
       via  f6bb371a170d0a74db6abf9df0f65aabe08d7cf9 (commit)
      from  a13c14cfc7fabe4f4da48edd57a086d2d8953a03 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 6c950260b253d28e78579b493c8eb3eabe0add11
Merge: a13c14c d7e8f7c
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Feb 8 14:32:45 2016 -0500

    Merge branch '8341-live-crunchstat-summary' refs #8341


commit d7e8f7c787b7706937f95c3ed2a5086616d48514
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Feb 8 14:18:42 2016 -0500

    8341: Use a Queue of lines and one thread, instead of a succession of threads and a deque of buffers.

diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
index 52d7e80..049b48f 100644
--- a/tools/crunchstat-summary/crunchstat_summary/reader.py
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -1,7 +1,7 @@
 from __future__ import print_function
 
 import arvados
-import collections
+import Queue
 import threading
 
 from crunchstat_summary import logger
@@ -23,59 +23,47 @@ class CollectionReader(object):
 
 
 class LiveLogReader(object):
+    EOF = None
+
     def __init__(self, job_uuid):
         logger.debug('load stderr events for job %s', job_uuid)
         self._filters = [
             ['object_uuid', '=', job_uuid],
             ['event_type', '=', 'stderr']]
-        self._buffer = collections.deque()
-        self._got = 0
         self._label = job_uuid
-        self._last_id = 0
-        self._start_getting_next_page()
-
-    def _start_getting_next_page(self):
-        self._thread = threading.Thread(target=self._get_next_page)
-        self._thread.daemon = True
-        self._thread.start()
 
-    def _get_next_page(self):
-        page = arvados.api().logs().index(
-            limit=1000,
-            order=['id asc'],
-            filters=self._filters + [['id','>',str(self._last_id)]],
-        ).execute()
-        self._got += len(page['items'])
-        logger.debug(
-            '%s: received %d of %d log events',
-            self._label, self._got,
-            self._got + page['items_available'] - len(page['items']))
-        self._page = page
-
-    def _buffer_page(self):
-        """Wait for current worker, copy results to _buffer, start next worker.
-
-        Return True if anything was added to the buffer."""
-        if self._thread is None:
-            return False
-        self._thread.join()
-        self._thread = None
-        page = self._page
-        if len(page['items']) == 0:
-            return False
-        if len(page['items']) < page['items_available']:
-            self._start_getting_next_page()
-        for i in page['items']:
-            for line in i['properties']['text'].split('\n'):
-                self._buffer.append(line)
-            self._last_id = i['id']
-        return True
+    def _get_all_pages(self):
+        got = 0
+        last_id = 0
+        while True:
+            page = arvados.api().logs().index(
+                limit=1000,
+                order=['id asc'],
+                filters=self._filters + [['id','>',str(last_id)]],
+            ).execute(num_retries=2)
+            got += len(page['items'])
+            logger.debug(
+                '%s: received %d of %d log events',
+                self._label, got,
+                got + page['items_available'] - len(page['items']))
+            for i in page['items']:
+                for line in i['properties']['text'].split('\n'):
+                    self._queue.put(line+'\n')
+                last_id = i['id']
+            if (len(page['items']) == 0 or
+                len(page['items']) >= page['items_available']):
+                break
+        self._queue.put(self.EOF)
 
     def __iter__(self):
+        self._queue = Queue.Queue()
+        self._thread = threading.Thread(target=self._get_all_pages)
+        self._thread.daemon = True
+        self._thread.start()
         return self
 
     def next(self):
-        if len(self._buffer) == 0:
-            if not self._buffer_page():
-                raise StopIteration
-        return self._buffer.popleft() + '\n'
+        line = self._queue.get()
+        if line is self.EOF:
+            raise StopIteration
+        return line

commit 3e7037e6383f57b0d1b4b627cd9feb27f05af13d
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:19:45 2016 -0500

    8341: Move reader classes to reader.py.

diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+    def __init__(self, collection_id):
+        logger.debug('load collection %s', collection_id)
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._label = job_uuid
+        self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if len(self._buffer) == 0:
+            if not self._buffer_page():
+                raise StopIteration
+        return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
 import arvados
 import collections
 import crunchstat_summary.chartjs
+import crunchstat_summary.reader
 import datetime
 import functools
 import itertools
 import math
 import re
 import sys
-import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionReader(object):
-    def __init__(self, collection_id):
-        logger.debug('load collection %s', collection_id)
-        collection = arvados.collection.CollectionReader(collection_id)
-        filenames = [filename for filename in collection]
-        if len(filenames) != 1:
-            raise ValueError(
-                "collection {} has {} files; need exactly one".format(
-                    collection_id, len(filenames)))
-        self._reader = collection.open(filenames[0])
-
-    def __iter__(self):
-        return iter(self._reader)
-
-
-class LiveLogReader(object):
-    def __init__(self, job_uuid):
-        logger.debug('load stderr events for job %s', job_uuid)
-        self._filters = [
-            ['object_uuid', '=', job_uuid],
-            ['event_type', '=', 'stderr']]
-        self._buffer = collections.deque()
-        self._got = 0
-        self._label = job_uuid
-        self._last_id = 0
-        self._start_getting_next_page()
-
-    def _start_getting_next_page(self):
-        self._thread = threading.Thread(target=self._get_next_page)
-        self._thread.daemon = True
-        self._thread.start()
-
-    def _get_next_page(self):
-        page = arvados.api().logs().index(
-            limit=1000,
-            order=['id asc'],
-            filters=self._filters + [['id','>',str(self._last_id)]],
-        ).execute()
-        self._got += len(page['items'])
-        logger.debug(
-            '%s: received %d of %d log events',
-            self._label, self._got,
-            self._got + page['items_available'] - len(page['items']))
-        self._page = page
-
-    def _buffer_page(self):
-        """Wait for current worker, copy results to _buffer, start next worker.
-
-        Return True if anything was added to the buffer."""
-        if self._thread is None:
-            return False
-        self._thread.join()
-        self._thread = None
-        page = self._page
-        if len(page['items']) == 0:
-            return False
-        if len(page['items']) < page['items_available']:
-            self._start_getting_next_page()
-        for i in page['items']:
-            for line in i['properties']['text'].split('\n'):
-                self._buffer.append(line)
-            self._last_id = i['id']
-        return True
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        if len(self._buffer) == 0:
-            if not self._buffer_page():
-                raise StopIteration
-        return self._buffer.popleft() + '\n'
-
-
 class CollectionSummarizer(Summarizer):
     def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            CollectionReader(collection_id), **kwargs)
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
         else:
             self.job = job
         if self.job['log']:
-            rdr = CollectionReader(self.job['log'])
+            rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
             label = self.job['uuid']
         else:
-            rdr = LiveLogReader(self.job['uuid'])
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
             label = self.job['uuid'] + ' (partial)'
         super(JobSummarizer, self).__init__(rdr, **kwargs)
         self.label = label

commit 5bda5bf3aedc0621abeed901a608adb7db6030e6
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 20:15:00 2016 -0500

    8341: Use a worker thread to get page N+1 of logs while parsing page N.

diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
 import math
 import re
 import sys
+import threading
 
 from arvados.api import OrderedJsonModel
 from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
             ['event_type', '=', 'stderr']]
         self._buffer = collections.deque()
         self._got = 0
-        self._got_all = False
         self._label = job_uuid
         self._last_id = 0
+        self._start_getting_next_page()
+
+    def _start_getting_next_page(self):
+        self._thread = threading.Thread(target=self._get_next_page)
+        self._thread.daemon = True
+        self._thread.start()
+
+    def _get_next_page(self):
+        page = arvados.api().logs().index(
+            limit=1000,
+            order=['id asc'],
+            filters=self._filters + [['id','>',str(self._last_id)]],
+        ).execute()
+        self._got += len(page['items'])
+        logger.debug(
+            '%s: received %d of %d log events',
+            self._label, self._got,
+            self._got + page['items_available'] - len(page['items']))
+        self._page = page
+
+    def _buffer_page(self):
+        """Wait for current worker, copy results to _buffer, start next worker.
+
+        Return True if anything was added to the buffer."""
+        if self._thread is None:
+            return False
+        self._thread.join()
+        self._thread = None
+        page = self._page
+        if len(page['items']) == 0:
+            return False
+        if len(page['items']) < page['items_available']:
+            self._start_getting_next_page()
+        for i in page['items']:
+            for line in i['properties']['text'].split('\n'):
+                self._buffer.append(line)
+            self._last_id = i['id']
+        return True
 
     def __iter__(self):
         return self
 
     def next(self):
-        if self._buffer is None:
-            raise StopIteration
         if len(self._buffer) == 0:
-            if self._got_all:
+            if not self._buffer_page():
                 raise StopIteration
-            page = arvados.api().logs().index(
-                limit=1000,
-                order=['id asc'],
-                filters=self._filters + [['id','>',str(self._last_id)]],
-            ).execute()
-            self._got += len(page['items'])
-            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
-            if len(page['items']) == 0:
-                self._got_all = True
-                self._buffer = None
-                raise StopIteration
-            elif len(page['items']) == page['items_available']:
-                # Don't try to fetch any more after this page
-                self._got_all = True
-            for i in page['items']:
-                for line in i['properties']['text'].split('\n'):
-                    self._buffer.append(line)
-                self._last_id = i['id']
         return self._buffer.popleft() + '\n'
 
 

commit f6bb371a170d0a74db6abf9df0f65aabe08d7cf9
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Feb 7 19:43:02 2016 -0500

    8341: Get job log from logs API if the log has not been written to Keep yet.

diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index a9dfc83..78638c6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -13,7 +13,8 @@ class ArgumentParser(argparse.ArgumentParser):
         src = self.add_mutually_exclusive_group()
         src.add_argument(
             '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            help='Look up the specified job and read its log data from Keep'
+            ' (or from the Arvados event log, if the job is still running)')
         src.add_argument(
             '--pipeline-instance', type=str, metavar='UUID',
             help='Summarize each component of the given pipeline instance')
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
                 logger.debug('%s: done %s', self.label, uuid)
                 continue
 
-            m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
 
@@ -327,8 +327,8 @@ class Summarizer(object):
             return '{}'.format(val)
 
 
-class CollectionSummarizer(Summarizer):
-    def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+    def __init__(self, collection_id):
         logger.debug('load collection %s', collection_id)
         collection = arvados.collection.CollectionReader(collection_id)
         filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
             raise ValueError(
                 "collection {} has {} files; need exactly one".format(
                     collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self._filters = [
+            ['object_uuid', '=', job_uuid],
+            ['event_type', '=', 'stderr']]
+        self._buffer = collections.deque()
+        self._got = 0
+        self._got_all = False
+        self._label = job_uuid
+        self._last_id = 0
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self._buffer is None:
+            raise StopIteration
+        if len(self._buffer) == 0:
+            if self._got_all:
+                raise StopIteration
+            page = arvados.api().logs().index(
+                limit=1000,
+                order=['id asc'],
+                filters=self._filters + [['id','>',str(self._last_id)]],
+            ).execute()
+            self._got += len(page['items'])
+            logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+            if len(page['items']) == 0:
+                self._got_all = True
+                self._buffer = None
+                raise StopIteration
+            elif len(page['items']) == page['items_available']:
+                # Don't try to fetch any more after this page
+                self._got_all = True
+            for i in page['items']:
+                for line in i['properties']['text'].split('\n'):
+                    self._buffer.append(line)
+                self._last_id = i['id']
+        return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id, **kwargs):
         super(CollectionSummarizer, self).__init__(
-            collection.open(filenames[0]), **kwargs)
+            CollectionReader(collection_id), **kwargs)
         self.label = collection_id
 
 
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
     def __init__(self, job, **kwargs):
         arv = arvados.api('v1')
         if isinstance(job, basestring):
             self.job = arv.jobs().get(uuid=job).execute()
         else:
             self.job = job
-        if not self.job['log']:
-            raise ValueError(
-                "job {} has no log; live summary not implemented".format(
-                    self.job['uuid']))
-        super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
-        self.label = self.job['uuid']
+        if self.job['log']:
+            rdr = CollectionReader(self.job['log'])
+            label = self.job['uuid']
+        else:
+            rdr = LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
         self.existing_constraints = self.job.get('runtime_constraints', {})
 
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list