[ARVADOS] updated: 886865621dc1abb5d5666dddc9e43866397e1f5d
git at public.curoverse.com
git at public.curoverse.com
Sun Feb 7 20:21:33 EST 2016
Summary of changes:
.../crunchstat_summary/reader.py | 81 ++++++++++++++++++++++
.../crunchstat_summary/summarizer.py | 61 ++--------------
2 files changed, 85 insertions(+), 57 deletions(-)
create mode 100644 tools/crunchstat-summary/crunchstat_summary/reader.py
discards 3d625ef22b42affa956f48948351bc60dd4298ac (commit)
via 886865621dc1abb5d5666dddc9e43866397e1f5d (commit)
via 584da9d4af5ce5928373c7166f320882649c1a6b (commit)
via b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (3d625ef22b42affa956f48948351bc60dd4298ac)
\
N -- N -- N (886865621dc1abb5d5666dddc9e43866397e1f5d)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 886865621dc1abb5d5666dddc9e43866397e1f5d
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:19:45 2016 -0500
8341: Move reader classes to reader.py.
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._label = job_uuid
+ self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if len(self._buffer) == 0:
+ if not self._buffer_page():
+ raise StopIteration
+ return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
import math
import re
import sys
-import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionReader(object):
- def __init__(self, collection_id):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
- self._reader = collection.open(filenames[0])
-
- def __iter__(self):
- return iter(self._reader)
-
-
-class LiveLogReader(object):
- def __init__(self, job_uuid):
- logger.debug('load stderr events for job %s', job_uuid)
- self._filters = [
- ['object_uuid', '=', job_uuid],
- ['event_type', '=', 'stderr']]
- self._buffer = collections.deque()
- self._got = 0
- self._label = job_uuid
- self._last_id = 0
- self._start_getting_next_page()
-
- def _start_getting_next_page(self):
- self._thread = threading.Thread(target=self._get_next_page)
- self._thread.daemon = True
- self._thread.start()
-
- def _get_next_page(self):
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug(
- '%s: received %d of %d log events',
- self._label, self._got,
- self._got + page['items_available'] - len(page['items']))
- self._page = page
-
- def _buffer_page(self):
- """Wait for current worker, copy results to _buffer, start next worker.
-
- Return True if anything was added to the buffer."""
- if self._thread is None:
- return False
- self._thread.join()
- self._thread = None
- page = self._page
- if len(page['items']) == 0:
- return False
- if len(page['items']) < page['items_available']:
- self._start_getting_next_page()
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
- return True
-
- def __iter__(self):
- return self
-
- def next(self):
- if len(self._buffer) == 0:
- if not self._buffer_page():
- raise StopIteration
- return self._buffer.popleft() + '\n'
-
-
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- CollectionReader(collection_id), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
else:
self.job = job
if self.job['log']:
- rdr = CollectionReader(self.job['log'])
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
label = self.job['uuid']
else:
- rdr = LiveLogReader(self.job['uuid'])
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
label = self.job['uuid'] + ' (partial)'
super(JobSummarizer, self).__init__(rdr, **kwargs)
self.label = label
commit 584da9d4af5ce5928373c7166f320882649c1a6b
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:15:00 2016 -0500
8341: Use a worker thread to get page N+1 of logs while parsing page N.
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
import math
import re
import sys
+import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
['event_type', '=', 'stderr']]
self._buffer = collections.deque()
self._got = 0
- self._got_all = False
self._label = job_uuid
self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
def __iter__(self):
return self
def next(self):
- if self._buffer is None:
- raise StopIteration
if len(self._buffer) == 0:
- if self._got_all:
+ if not self._buffer_page():
raise StopIteration
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
- if len(page['items']) == 0:
- self._got_all = True
- self._buffer = None
- raise StopIteration
- elif len(page['items']) == page['items_available']:
- # Don't try to fetch any more after this page
- self._got_all = True
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
return self._buffer.popleft() + '\n'
commit b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 19:43:02 2016 -0500
8341: Get job log from logs API if the log has not been written to Keep yet.
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
@@ -327,8 +327,8 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionSummarizer(Summarizer):
- def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+ def __init__(self, collection_id):
logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
raise ValueError(
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._got_all = False
+ self._label = job_uuid
+ self._last_id = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self._buffer is None:
+ raise StopIteration
+ if len(self._buffer) == 0:
+ if self._got_all:
+ raise StopIteration
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+ if len(page['items']) == 0:
+ self._got_all = True
+ self._buffer = None
+ raise StopIteration
+ elif len(page['items']) == page['items_available']:
+ # Don't try to fetch any more after this page
+ self._got_all = True
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
+ if self.job['log']:
+ rdr = CollectionReader(self.job['log'])
+ label = self.job['uuid']
+ else:
+ rdr = LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list