[ARVADOS] updated: 6c950260b253d28e78579b493c8eb3eabe0add11
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 8 14:33:03 EST 2016
Summary of changes:
.../crunchstat_summary/command.py | 3 +-
.../crunchstat_summary/reader.py | 69 ++++++++++++++++++++++
.../crunchstat_summary/summarizer.py | 28 ++++-----
3 files changed, 83 insertions(+), 17 deletions(-)
create mode 100644 tools/crunchstat-summary/crunchstat_summary/reader.py
via 6c950260b253d28e78579b493c8eb3eabe0add11 (commit)
via d7e8f7c787b7706937f95c3ed2a5086616d48514 (commit)
via 3e7037e6383f57b0d1b4b627cd9feb27f05af13d (commit)
via 5bda5bf3aedc0621abeed901a608adb7db6030e6 (commit)
via f6bb371a170d0a74db6abf9df0f65aabe08d7cf9 (commit)
from a13c14cfc7fabe4f4da48edd57a086d2d8953a03 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6c950260b253d28e78579b493c8eb3eabe0add11
Merge: a13c14c d7e8f7c
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Feb 8 14:32:45 2016 -0500
Merge branch '8341-live-crunchstat-summary' refs #8341
commit d7e8f7c787b7706937f95c3ed2a5086616d48514
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Feb 8 14:18:42 2016 -0500
8341: Use a Queue of lines and one thread, instead of a succession of threads and a deque of buffers.
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
index 52d7e80..049b48f 100644
--- a/tools/crunchstat-summary/crunchstat_summary/reader.py
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -1,7 +1,7 @@
from __future__ import print_function
import arvados
-import collections
+import Queue
import threading
from crunchstat_summary import logger
@@ -23,59 +23,47 @@ class CollectionReader(object):
class LiveLogReader(object):
+ EOF = None
+
def __init__(self, job_uuid):
logger.debug('load stderr events for job %s', job_uuid)
self._filters = [
['object_uuid', '=', job_uuid],
['event_type', '=', 'stderr']]
- self._buffer = collections.deque()
- self._got = 0
self._label = job_uuid
- self._last_id = 0
- self._start_getting_next_page()
-
- def _start_getting_next_page(self):
- self._thread = threading.Thread(target=self._get_next_page)
- self._thread.daemon = True
- self._thread.start()
- def _get_next_page(self):
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug(
- '%s: received %d of %d log events',
- self._label, self._got,
- self._got + page['items_available'] - len(page['items']))
- self._page = page
-
- def _buffer_page(self):
- """Wait for current worker, copy results to _buffer, start next worker.
-
- Return True if anything was added to the buffer."""
- if self._thread is None:
- return False
- self._thread.join()
- self._thread = None
- page = self._page
- if len(page['items']) == 0:
- return False
- if len(page['items']) < page['items_available']:
- self._start_getting_next_page()
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
- return True
+ def _get_all_pages(self):
+ got = 0
+ last_id = 0
+ while True:
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(last_id)]],
+ ).execute(num_retries=2)
+ got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, got,
+ got + page['items_available'] - len(page['items']))
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._queue.put(line+'\n')
+ last_id = i['id']
+ if (len(page['items']) == 0 or
+ len(page['items']) >= page['items_available']):
+ break
+ self._queue.put(self.EOF)
def __iter__(self):
+ self._queue = Queue.Queue()
+ self._thread = threading.Thread(target=self._get_all_pages)
+ self._thread.daemon = True
+ self._thread.start()
return self
def next(self):
- if len(self._buffer) == 0:
- if not self._buffer_page():
- raise StopIteration
- return self._buffer.popleft() + '\n'
+ line = self._queue.get()
+ if line is self.EOF:
+ raise StopIteration
+ return line
commit 3e7037e6383f57b0d1b4b627cd9feb27f05af13d
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:19:45 2016 -0500
8341: Move reader classes to reader.py.
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644
index 0000000..52d7e80
--- /dev/null
+++ b/tools/crunchstat-summary/crunchstat_summary/reader.py
@@ -0,0 +1,81 @@
+from __future__ import print_function
+
+import arvados
+import collections
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+ def __init__(self, collection_id):
+ logger.debug('load collection %s', collection_id)
+ collection = arvados.collection.CollectionReader(collection_id)
+ filenames = [filename for filename in collection]
+ if len(filenames) != 1:
+ raise ValueError(
+ "collection {} has {} files; need exactly one".format(
+ collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._label = job_uuid
+ self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if len(self._buffer) == 0:
+ if not self._buffer_page():
+ raise StopIteration
+ return self._buffer.popleft() + '\n'
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index ad96e52..48bec6a 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -3,13 +3,13 @@ from __future__ import print_function
import arvados
import collections
import crunchstat_summary.chartjs
+import crunchstat_summary.reader
import datetime
import functools
import itertools
import math
import re
import sys
-import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -328,84 +328,10 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionReader(object):
- def __init__(self, collection_id):
- logger.debug('load collection %s', collection_id)
- collection = arvados.collection.CollectionReader(collection_id)
- filenames = [filename for filename in collection]
- if len(filenames) != 1:
- raise ValueError(
- "collection {} has {} files; need exactly one".format(
- collection_id, len(filenames)))
- self._reader = collection.open(filenames[0])
-
- def __iter__(self):
- return iter(self._reader)
-
-
-class LiveLogReader(object):
- def __init__(self, job_uuid):
- logger.debug('load stderr events for job %s', job_uuid)
- self._filters = [
- ['object_uuid', '=', job_uuid],
- ['event_type', '=', 'stderr']]
- self._buffer = collections.deque()
- self._got = 0
- self._label = job_uuid
- self._last_id = 0
- self._start_getting_next_page()
-
- def _start_getting_next_page(self):
- self._thread = threading.Thread(target=self._get_next_page)
- self._thread.daemon = True
- self._thread.start()
-
- def _get_next_page(self):
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug(
- '%s: received %d of %d log events',
- self._label, self._got,
- self._got + page['items_available'] - len(page['items']))
- self._page = page
-
- def _buffer_page(self):
- """Wait for current worker, copy results to _buffer, start next worker.
-
- Return True if anything was added to the buffer."""
- if self._thread is None:
- return False
- self._thread.join()
- self._thread = None
- page = self._page
- if len(page['items']) == 0:
- return False
- if len(page['items']) < page['items_available']:
- self._start_getting_next_page()
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
- return True
-
- def __iter__(self):
- return self
-
- def next(self):
- if len(self._buffer) == 0:
- if not self._buffer_page():
- raise StopIteration
- return self._buffer.popleft() + '\n'
-
-
class CollectionSummarizer(Summarizer):
def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- CollectionReader(collection_id), **kwargs)
+ crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
self.label = collection_id
@@ -417,10 +343,10 @@ class JobSummarizer(Summarizer):
else:
self.job = job
if self.job['log']:
- rdr = CollectionReader(self.job['log'])
+ rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
label = self.job['uuid']
else:
- rdr = LiveLogReader(self.job['uuid'])
+ rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
label = self.job['uuid'] + ' (partial)'
super(JobSummarizer, self).__init__(rdr, **kwargs)
self.label = label
commit 5bda5bf3aedc0621abeed901a608adb7db6030e6
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 20:15:00 2016 -0500
8341: Use a worker thread to get page N+1 of logs while parsing page N.
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 8b9bfa3..ad96e52 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -9,6 +9,7 @@ import itertools
import math
import re
import sys
+import threading
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
@@ -350,37 +351,54 @@ class LiveLogReader(object):
['event_type', '=', 'stderr']]
self._buffer = collections.deque()
self._got = 0
- self._got_all = False
self._label = job_uuid
self._last_id = 0
+ self._start_getting_next_page()
+
+ def _start_getting_next_page(self):
+ self._thread = threading.Thread(target=self._get_next_page)
+ self._thread.daemon = True
+ self._thread.start()
+
+ def _get_next_page(self):
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug(
+ '%s: received %d of %d log events',
+ self._label, self._got,
+ self._got + page['items_available'] - len(page['items']))
+ self._page = page
+
+ def _buffer_page(self):
+ """Wait for current worker, copy results to _buffer, start next worker.
+
+ Return True if anything was added to the buffer."""
+ if self._thread is None:
+ return False
+ self._thread.join()
+ self._thread = None
+ page = self._page
+ if len(page['items']) == 0:
+ return False
+ if len(page['items']) < page['items_available']:
+ self._start_getting_next_page()
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return True
def __iter__(self):
return self
def next(self):
- if self._buffer is None:
- raise StopIteration
if len(self._buffer) == 0:
- if self._got_all:
+ if not self._buffer_page():
raise StopIteration
- page = arvados.api().logs().index(
- limit=1000,
- order=['id asc'],
- filters=self._filters + [['id','>',str(self._last_id)]],
- ).execute()
- self._got += len(page['items'])
- logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
- if len(page['items']) == 0:
- self._got_all = True
- self._buffer = None
- raise StopIteration
- elif len(page['items']) == page['items_available']:
- # Don't try to fetch any more after this page
- self._got_all = True
- for i in page['items']:
- for line in i['properties']['text'].split('\n'):
- self._buffer.append(line)
- self._last_id = i['id']
return self._buffer.popleft() + '\n'
commit f6bb371a170d0a74db6abf9df0f65aabe08d7cf9
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Feb 7 19:43:02 2016 -0500
8341: Get job log from logs API if the log has not been written to Keep yet.
diff --git a/tools/crunchstat-summary/crunchstat_summary/command.py b/tools/crunchstat-summary/crunchstat_summary/command.py
index a9dfc83..78638c6 100644
--- a/tools/crunchstat-summary/crunchstat_summary/command.py
+++ b/tools/crunchstat-summary/crunchstat_summary/command.py
@@ -13,7 +13,8 @@ class ArgumentParser(argparse.ArgumentParser):
src = self.add_mutually_exclusive_group()
src.add_argument(
'--job', type=str, metavar='UUID',
- help='Look up the specified job and read its log data from Keep')
+ help='Look up the specified job and read its log data from Keep'
+ ' (or from the Arvados event log, if the job is still running)')
src.add_argument(
'--pipeline-instance', type=str, metavar='UUID',
help='Summarize each component of the given pipeline instance')
diff --git a/tools/crunchstat-summary/crunchstat_summary/summarizer.py b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
index 486f0e7..8b9bfa3 100644
--- a/tools/crunchstat-summary/crunchstat_summary/summarizer.py
+++ b/tools/crunchstat-summary/crunchstat_summary/summarizer.py
@@ -90,7 +90,7 @@ class Summarizer(object):
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>\S+) (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
if not m:
continue
@@ -327,8 +327,8 @@ class Summarizer(object):
return '{}'.format(val)
-class CollectionSummarizer(Summarizer):
- def __init__(self, collection_id, **kwargs):
+class CollectionReader(object):
+ def __init__(self, collection_id):
logger.debug('load collection %s', collection_id)
collection = arvados.collection.CollectionReader(collection_id)
filenames = [filename for filename in collection]
@@ -336,24 +336,76 @@ class CollectionSummarizer(Summarizer):
raise ValueError(
"collection {} has {} files; need exactly one".format(
collection_id, len(filenames)))
+ self._reader = collection.open(filenames[0])
+
+ def __iter__(self):
+ return iter(self._reader)
+
+
+class LiveLogReader(object):
+ def __init__(self, job_uuid):
+ logger.debug('load stderr events for job %s', job_uuid)
+ self._filters = [
+ ['object_uuid', '=', job_uuid],
+ ['event_type', '=', 'stderr']]
+ self._buffer = collections.deque()
+ self._got = 0
+ self._got_all = False
+ self._label = job_uuid
+ self._last_id = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self._buffer is None:
+ raise StopIteration
+ if len(self._buffer) == 0:
+ if self._got_all:
+ raise StopIteration
+ page = arvados.api().logs().index(
+ limit=1000,
+ order=['id asc'],
+ filters=self._filters + [['id','>',str(self._last_id)]],
+ ).execute()
+ self._got += len(page['items'])
+ logger.debug('%s: received %d of %d log events', self._label, self._got, self._got + page['items_available'] - len(page['items']))
+ if len(page['items']) == 0:
+ self._got_all = True
+ self._buffer = None
+ raise StopIteration
+ elif len(page['items']) == page['items_available']:
+ # Don't try to fetch any more after this page
+ self._got_all = True
+ for i in page['items']:
+ for line in i['properties']['text'].split('\n'):
+ self._buffer.append(line)
+ self._last_id = i['id']
+ return self._buffer.popleft() + '\n'
+
+
+class CollectionSummarizer(Summarizer):
+ def __init__(self, collection_id, **kwargs):
super(CollectionSummarizer, self).__init__(
- collection.open(filenames[0]), **kwargs)
+ CollectionReader(collection_id), **kwargs)
self.label = collection_id
-class JobSummarizer(CollectionSummarizer):
+class JobSummarizer(Summarizer):
def __init__(self, job, **kwargs):
arv = arvados.api('v1')
if isinstance(job, basestring):
self.job = arv.jobs().get(uuid=job).execute()
else:
self.job = job
- if not self.job['log']:
- raise ValueError(
- "job {} has no log; live summary not implemented".format(
- self.job['uuid']))
- super(JobSummarizer, self).__init__(self.job['log'], **kwargs)
- self.label = self.job['uuid']
+ if self.job['log']:
+ rdr = CollectionReader(self.job['log'])
+ label = self.job['uuid']
+ else:
+ rdr = LiveLogReader(self.job['uuid'])
+ label = self.job['uuid'] + ' (partial)'
+ super(JobSummarizer, self).__init__(rdr, **kwargs)
+ self.label = label
self.existing_constraints = self.job.get('runtime_constraints', {})
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list