[ARVADOS] updated: 42c9be2a3187291502efcebefc09ce603fd31106
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 8 20:00:30 EDT 2014
Summary of changes:
services/datamanager/datamanager.py | 69 +++++++++++++++++++++++++++++------
1 files changed, 57 insertions(+), 12 deletions(-)
via 42c9be2a3187291502efcebefc09ce603fd31106 (commit)
from b586267c7ff0a5973ecf2f83b3fa5eb452669cfc (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 42c9be2a3187291502efcebefc09ce603fd31106
Author: Misha Zatsman <misha at curoverse.com>
Date: Tue Apr 8 17:30:27 2014 +0000
Added ability to disable server by specifying port 0. Started keeping api reponse just in case. Started tracking replication level for peristers (although still working on using it everywhere). Added maxdict to make replication tracking easier.
diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index ae9fd0b..a5931b6 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -28,7 +28,13 @@ def fileSizeFormat(value):
def byteSizeFromValidUuid(valid_uuid):
return int(valid_uuid.split('+')[1])
+class maxdict(dict):
+ """A dictionary that holds the largest value entered for each key."""
+ def addValue(self, key, value):
+ dict.__setitem__(self, key, max(dict.get(self, key), value))
+
class CollectionInfo:
+ DEFAULT_PERSISTER_REPLICATION_LEVEL=2
all_by_uuid = {}
def __init__(self, uuid):
@@ -38,6 +44,11 @@ class CollectionInfo:
self.block_uuids = set() # uuids of keep blocks in this collection
self.reader_uuids = set() # uuids of users who can read this collection
self.persister_uuids = set() # uuids of users who want this collection saved
+ # map from user uuid to replication level they desire
+ self.persister_replication = maxdict()
+
+ # The whole api response in case we need anything else later.
+ self.api_response = []
CollectionInfo.all_by_uuid[uuid] = self
def byteSize(self):
@@ -47,12 +58,12 @@ class CollectionInfo:
return ('CollectionInfo uuid: %s\n'
' %d block(s) containing %s\n'
' reader_uuids: %s\n'
- ' persister_uuids: %s' %
+ ' persister_replication: %s' %
(self.uuid,
len(self.block_uuids),
fileSizeFormat(self.byteSize()),
pprint.pformat(self.reader_uuids, indent = 15),
- pprint.pformat(self.persister_uuids, indent = 15)))
+ pprint.pformat(self.persister_replication, indent = 15)))
@staticmethod
def get(uuid):
@@ -98,6 +109,7 @@ def readCollections(collection_uuids):
collection_block_uuids = set()
collection_response = arv.collections().get(uuid=collection_uuid).execute()
collection_info = CollectionInfo.get(collection_uuid)
+ collection_info.api_response = collection_response
manifest_lines = collection_response['manifest_text'].split('\n')
if args.verbose:
@@ -138,6 +150,12 @@ def readLinks():
if link['link_class'] == 'permission':
collection_info.reader_uuids.add(link['tail_uuid'])
elif link['link_class'] == 'resources':
+ replication_level = link['properties'].get(
+ 'replication',
+ CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
+ collection_info.persister_replication.addValue(
+ link['tail_uuid'],
+ replication_level)
collection_info.persister_uuids.add(link['tail_uuid'])
print 'Found the following link classes:'
@@ -146,7 +164,7 @@ def readLinks():
def reportMostPopularCollections():
most_popular_collections = sorted(
CollectionInfo.all_by_uuid.values(),
- key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
+ key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
reverse=True)[:10]
print 'Most popular Collections:'
@@ -257,7 +275,7 @@ parser.add_argument('-p',
'--port',
type=int,
default=9090,
- help=('The port number to serve on.'))
+ help=('The port number to serve on. 0 means no server.'))
parser.add_argument('-v',
'--verbose',
help='increase output verbosity',
@@ -453,9 +471,32 @@ class DataManagerHandler(BaseHTTPRequestHandler):
fileSizeFormat(collection.byteSize()))
self.wfile.write('<P>Readers: %s\n' %
', '.join(map(self.userLink, collection.reader_uuids)))
- self.wfile.write('<P>Persisters: %s\n' %
- ', '.join(map(self.userLink,
- collection.persister_uuids)))
+
+ if len(collection.persister_replication) == 0:
+ self.wfile.write('<P>No persisters\n')
+ else:
+ replication_to_users = defaultdict(set)
+ for user,replication in collection.persister_replication.items():
+ replication_to_users[replication].add(user)
+ replication_levels = sorted(replication_to_users.keys())
+
+ self.wfile.write('<P>%d persisters in %d replication levels maxing '
+ 'out at %dx replication:\n' %
+ (len(collection.persister_replication),
+ len(replication_levels),
+ replication_levels[-1]))
+
+ # TODO(misha): This code is used twice, let's move it to a method.
+ self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+ '<TH>'.join(['Replication Level ' + str(x)
+ for x in replication_levels]))
+ self.wfile.write('<TR>\n')
+ for replication_level in replication_levels:
+ users = replication_to_users[replication_level]
+ self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
+ map(self.userLink, users)))
+ self.wfile.write('</TR></TABLE>\n')
+
replication_to_blocks = defaultdict(set)
for block in collection.block_uuids:
replication_to_blocks[block_to_replication[block]].add(block)
@@ -463,7 +504,8 @@ class DataManagerHandler(BaseHTTPRequestHandler):
self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
(len(collection.block_uuids), len(replication_levels)))
self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
- '<TH>'.join(['Replication Level ' + str(x) for x in replication_levels]))
+ '<TH>'.join(['Replication Level ' + str(x)
+ for x in replication_levels]))
self.wfile.write('<TR>\n')
for replication_level in replication_levels:
blocks = replication_to_blocks[replication_level]
@@ -497,8 +539,11 @@ class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
#if __name__ == '__main__':
-loader = threading.Thread(target = loadAllData, name = 'loader')
-loader.start()
+if args.port == 0:
+ loadAllData()
+else:
+ loader = threading.Thread(target = loadAllData, name = 'loader')
+ loader.start()
-server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-server.serve_forever()
+ server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+ server.serve_forever()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list