[ARVADOS] updated: b691f2fad8598f12d21d87b772e6de3314ee2c1a
git at public.curoverse.com
git at public.curoverse.com
Sun Feb 7 20:26:20 EST 2016
Summary of changes:
tools/crunchstat-summary/crunchstat_summary/command.py | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
discards 886865621dc1abb5d5666dddc9e43866397e1f5d (commit)
discards 584da9d4af5ce5928373c7166f320882649c1a6b (commit)
discards b3cd2a91250de92f38dff98e21fcc5b4b2e2ffa7 (commit)
via b691f2fad8598f12d21d87b772e6de3314ee2c1a (commit)
via 11cc9a6f3f2c282979473db6afe7da5676f8f230 (commit)
via 0cba5993c14a095f6edcbbbebb301caffa8914ec (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (886865621dc1abb5d5666dddc9e43866397e1f5d)
\
N -- N -- N (b691f2fad8598f12d21d87b772e6de3314ee2c1a)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit b691f2fad8598f12d21d87b772e6de3314ee2c1a
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:19:45 2016 -0500
8341: Move reader classes to reader.py.
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._label = job_uuid
+ self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if len(self._buffer) == 0:
+ if not self._buffer_page():
+ raise StopIteration
+ return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
import math
import re
import sys
-import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionReader(object):
- def __init__(self, collection_id):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
- self._reader = collection.open(filenames[0])
-
- def __iter__(self):
- return iter(self._reader)
-
-
-class LiveLogReader(object):
- def __init__(self, job_uuid):
- logger.debug('load stderr events for job %s', job_uuid)
- self._filters = [
- ['object_uuid', '=', job_uuid],
- ['event_type', '=', 'stderr']]
- self._buffer = collections.deque()
- self._got = 0
- self._label = job_uuid
- self._last_id = 0
- self._start_getting_next_page()
-
- def _start_getting_next_page(self):
- self._thread = threading.Thread(target=self._get_next_page)
- self._thread.daemon = True
- self._thread.start()
-
- def _get_next_page(self):
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug(
- '%s: received %d of %d log events',
- self._label, self._got,
- self._got + page['items_available'] - len(page['items']))
- self._page = page
-
- def _buffer_page(self):
- """Wait for current worker, copy results to _buffer, start next worker.
-
- Return True if anything was added to the buffer."""
- if self._thread is None:
- return False
- self._thread.join()
- self._thread = None
- page = self._page
- if len(page['items']) == 0:
- return False
- if len(page['items']) < page['items_available']:
- self._start_getting_next_page()
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
- return True
-
- def __iter__(self):
- return self
-
- def next(self):
- if len(self._buffer) == 0:
- if not self._buffer_page():
- raise StopIteration
- return self._buffer.popleft() + '\n'
-
-
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- CollectionReader(collection_id), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
else:
self.job = job
if self.job['log']:
- rdr = CollectionReader(self.job['log'])
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
label = self.job['uuid']
else:
- rdr = LiveLogReader(self.job['uuid'])
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
label = self.job['uuid'] + ' (partial)'
super(JobSummarizer, self).__init__(rdr, **kwargs)
self.label = label
commit 11cc9a6f3f2c282979473db6afe7da5676f8f230
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:15:00 2016 -0500
8341: Use a worker thread to get page N+1 of logs while parsing page N.
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
import math
import re
import sys
+import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
['event_type', '=', 'stderr']]
self._buffer = collections.deque()
self._got = 0
- self._got_all = False
self._label = job_uuid
self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
def __iter__(self):
return self
def next(self):
- if self._buffer is None:
- raise StopIteration
if len(self._buffer) == 0:
- if self._got_all:
+ if not self._buffer_page():
raise StopIteration
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
- if len(page['items']) == 0:
- self._got_all = True
- self._buffer = None
- raise StopIteration
- elif len(page['items']) == page['items_available']:
- # Don't try to fetch any more after this page
- self._got_all = True
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
return self._buffer.popleft() + '\n'
commit 0cba5993c14a095f6edcbbbebb301caffa8914ec
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 19:43:02 2016 -0500
8341: Get job log from logs API if the log has not been written to Keep yet.
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index a9dfc83..78638c6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -13,7 +13,8 @@ class ArgumentParser(argparse.ArgumentParser):
src = self.add_mutually_exclusive_group()
src.add_argument(
'--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep')
+ help='Look up the specified job and read its log data from Keep'
+ ' (or from the Arvados event log, if the job is still running)')
src.add_argument(
'--pipeline-instance', type=str, metavar='UUID',
help='Summarize each component of the given pipeline instance')
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
@@ -327,8 +327,8 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionSummarizer(Summarizer):
- def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+ def __init__(self, collection_id):
logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
raise ValueError(
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._got_all = False
+ self._label = job_uuid
+ self._last_id = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self._buffer is None:
+ raise StopIteration
+ if len(self._buffer) == 0:
+ if self._got_all:
+ raise StopIteration
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+ if len(page['items']) == 0:
+ self._got_all = True
+ self._buffer = None
+ raise StopIteration
+ elif len(page['items']) == page['items_available']:
+ # Don't try to fetch any more after this page
+ self._got_all = True
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
+ if self.job['log']:
+ rdr = CollectionReader(self.job['log'])
+ label = self.job['uuid']
+ else:
+ rdr = LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list