[ARVADOS] created: d0bf7a1ff103285e54433d3bcb67c2138b534542
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 1 11:51:53 EDT 2014
at d0bf7a1ff103285e54433d3bcb67c2138b534542 (commit)
commit d0bf7a1ff103285e54433d3bcb67c2138b534542
Merge: bb6f6fb 9c5dc27
Author: Misha Zatsman <misha at curoverse.com>
Date: Tue Apr 1 15:39:18 2014 +0000
Merge branch 'master' of git.curoverse.com:arvados into 1968-monitor-disk-usage
commit bb6f6fb4021045b86ef58b3a975cf60b58807fa2
Author: Misha Zatsman <misha at curoverse.com>
Date: Thu Mar 27 00:45:02 2014 +0000
Deleted unused code, added code to read blocks on keep server.
diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index 4251987..847aa7e 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -5,6 +5,7 @@ import arvados
import argparse
import pprint
import re
+import urllib2
from collections import defaultdict
from math import log
@@ -55,54 +56,6 @@ class CollectionInfo:
return CollectionInfo.all_by_uuid[uuid]
-def nonNegativeIntegerFromString(s, base):
- """Returns the value s represents in the specified base as long as it is greater than zero, otherwise None."""
- try:
- value = int(s, base)
- if value >= 0:
- return value
- except ValueError:
- pass # We'll cover this in the next line.
- return None
-
-# TODO(misha): Add tests.
-BLOCK_HASH_LENGTH = 32
-def hashAndSizeFromBlockLocator(candidate):
- """Returns the [integer hash, integer size] from a block locator.
-
- The returned hash will be None iff this is not a valid block locator.
- The returned size will be None if this is not a valid block locator
- or it is a valid block locator which does not specify the size.
-
- A valid block locator is a non-negative integer (the hash) encoded
- in 32 hex digits. After that it may be followed by a '+' in which
- case the '+' should be followed by the non-negative size (decimal
- encoded). Potentially followed by a '+' and more stuff.
-
- Source: https://arvados.org/projects/arvados/wiki/Keep_manifest_format
-"""
- NOT_A_BLOCK_LOCATOR = [None,None]
- block_locator_tokens = candidate.split('+')
-
- if len(block_locator_tokens) == 0:
- return NOT_A_BLOCK_LOCATOR
-
- hash_candidate = block_locator_tokens[0]
- if len(hash_candidate) != BLOCK_HASH_LENGTH:
- return NOT_A_BLOCK_LOCATOR
-
- block_hash = nonNegativeIntegerFromString(hash_candidate, 16)
- if not block_hash:
- return NOT_A_BLOCK_LOCATOR
-
- block_size = None
- if len(block_locator_tokens) > 1:
- block_size = nonNegativeIntegerFromString(block_locator_tokens[1], 10)
- if not block_size:
- return NOT_A_BLOCK_LOCATOR
-
- return block_hash, block_size
-
def extractUuid(candidate):
""" Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
@@ -255,6 +208,29 @@ def reportUserDiskUsage():
fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+def getKeepServers():
+ response = arv.keep_disks().list().execute()
+ return [[keep_server['service_host'], keep_server['service_port']]
+ for keep_server in response['items']]
+
+
+def getKeepBlocks(keep_servers):
+ blocks = []
+ for host,port in keep_servers:
+ response = urllib2.urlopen('http://%s:%d/index' % (host, port))
+ blocks.append([line.split(' ')
+ for line in response.read().split('\n')
+ if line])
+ return blocks
+
+
+def computeReplication(keep_blocks):
+ block_to_replication = defaultdict(lambda: 0)
+ for server_blocks in keep_blocks:
+ for block_uuid, _ in server_blocks:
+ block_to_replication[block_uuid] += 1
+ return block_to_replication
+
# This is the main flow here
@@ -320,3 +296,15 @@ NUM_COLS = 4
user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
reportUserDiskUsage()
+
+print 'Getting Keep Servers'
+keep_servers = getKeepServers()
+
+print keep_servers
+
+print 'Getting Blocks from each Keep Server.'
+keep_blocks = getKeepBlocks(keep_servers)
+
+block_to_replication = computeReplication(keep_blocks)
+
+print 'average replication level is %f' % (float(sum(block_to_replication.values())) / len(block_to_replication))
commit 264c7fdb6c651ef73cfe4f0a280bf6672846eada
Author: Misha Zatsman <misha at curoverse.com>
Date: Wed Mar 26 23:11:43 2014 +0000
Updated message to be clearer.
diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index 6784c75..4251987 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -128,7 +128,7 @@ def buildCollectionsList():
else:
collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
- print ('Returned %d of %d results' %
+ print ('Returned %d of %d collections.' %
(len(collections_list_response['items']),
collections_list_response['items_available']))
commit 1ea7294edb4aa80f13d805ef29875a39caab471c
Author: Misha Zatsman <misha at curoverse.com>
Date: Wed Mar 26 22:49:01 2014 +0000
Initial commit of datamanager.py command line utility. So far all information comes from the python SDK. Future versions will talk to the keep servers as well.
diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
new file mode 100755
index 0000000..6784c75
--- /dev/null
+++ b/services/datamanager/datamanager.py
@@ -0,0 +1,322 @@
+#! /usr/bin/env python
+
+import arvados
+
+import argparse
+import pprint
+import re
+
+from collections import defaultdict
+from math import log
+from operator import itemgetter
+
+arv = arvados.api('v1')
+
+# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
+byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
+def fileSizeFormat(value):
+ exponent = 0 if value == 0 else int(log(value, 1024))
+ return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
+ byteunits[exponent])
+
+def byteSizeFromValidUuid(valid_uuid):
+ return int(valid_uuid.split('+')[1])
+
+class CollectionInfo:
+ all_by_uuid = {}
+
+ def __init__(self, uuid):
+ if CollectionInfo.all_by_uuid.has_key(uuid):
+ raise ValueError('Collection for uuid "%s" already exists.' % uuid)
+ self.uuid = uuid
+ self.block_uuids = set() # uuids of keep blocks in this collection
+ self.reader_uuids = set() # uuids of users who can read this collection
+ self.persister_uuids = set() # uuids of users who want this collection saved
+ CollectionInfo.all_by_uuid[uuid] = self
+
+ def byte_size(self):
+ return sum(map(byteSizeFromValidUuid, self.block_uuids))
+
+ def __str__(self):
+ return ('CollectionInfo uuid: %s\n'
+ ' %d block(s) containing %s\n'
+ ' reader_uuids: %s\n'
+ ' persister_uuids: %s' %
+ (self.uuid,
+ len(self.block_uuids),
+ fileSizeFormat(self.byte_size()),
+ pprint.pformat(self.reader_uuids, indent = 15),
+ pprint.pformat(self.persister_uuids, indent = 15)))
+
+ @staticmethod
+ def get(uuid):
+ if not CollectionInfo.all_by_uuid.has_key(uuid):
+ CollectionInfo(uuid)
+ return CollectionInfo.all_by_uuid[uuid]
+
+
+def nonNegativeIntegerFromString(s, base):
+ """Returns the value s represents in the specified base as long as it is greater than zero, otherwise None."""
+ try:
+ value = int(s, base)
+ if value >= 0:
+ return value
+ except ValueError:
+ pass # We'll cover this in the next line.
+ return None
+
+# TODO(misha): Add tests.
+BLOCK_HASH_LENGTH = 32
+def hashAndSizeFromBlockLocator(candidate):
+ """Returns the [integer hash, integer size] from a block locator.
+
+ The returned hash will be None iff this is not a valid block locator.
+ The returned size will be None if this is not a valid block locator
+ or it is a valid block locator which does not specify the size.
+
+ A valid block locator is a non-negative integer (the hash) encoded
+ in 32 hex digits. After that it may be followed by a '+' in which
+ case the '+' should be followed by the non-negative size (decimal
+ encoded). Potentially followed by a '+' and more stuff.
+
+ Source: https://arvados.org/projects/arvados/wiki/Keep_manifest_format
+"""
+ NOT_A_BLOCK_LOCATOR = [None,None]
+ block_locator_tokens = candidate.split('+')
+
+ if len(block_locator_tokens) == 0:
+ return NOT_A_BLOCK_LOCATOR
+
+ hash_candidate = block_locator_tokens[0]
+ if len(hash_candidate) != BLOCK_HASH_LENGTH:
+ return NOT_A_BLOCK_LOCATOR
+
+ block_hash = nonNegativeIntegerFromString(hash_candidate, 16)
+ if not block_hash:
+ return NOT_A_BLOCK_LOCATOR
+
+ block_size = None
+ if len(block_locator_tokens) > 1:
+ block_size = nonNegativeIntegerFromString(block_locator_tokens[1], 10)
+ if not block_size:
+ return NOT_A_BLOCK_LOCATOR
+
+ return block_hash, block_size
+
+def extractUuid(candidate):
+ """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
+ match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
+ return match and match.group(1)
+
+def checkUserIsAdmin():
+ current_user = arv.users().current().execute()
+
+ if not current_user['is_admin']:
+ # TODO(misha): Use a logging framework here
+ print ('Warning current user %s (%s - %s) does not have admin access '
+ 'and will not see much of the data.' %
+ (current_user['full_name'],
+ current_user['email'],
+ current_user['uuid']))
+ if args.require_admin_user:
+ print 'Exiting, rerun with --no-require-admin-user if you wish to continue.'
+ exit(1)
+
+def buildCollectionsList():
+ if args.uuid:
+ return [args.uuid,]
+ else:
+ collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
+
+ print ('Returned %d of %d results' %
+ (len(collections_list_response['items']),
+ collections_list_response['items_available']))
+
+ return [item['uuid'] for item in collections_list_response['items']]
+
+
+def readCollections(collection_uuids):
+ for collection_uuid in collection_uuids:
+ collection_block_uuids = set()
+ collection_response = arv.collections().get(uuid=collection_uuid).execute()
+ collection_info = CollectionInfo.get(collection_uuid)
+ manifest_lines = collection_response['manifest_text'].split('\n')
+
+ if args.verbose:
+ print 'Manifest text for %s:' % collection_uuid
+ pprint.pprint(manifest_lines)
+
+ for manifest_line in manifest_lines:
+ if manifest_line:
+ manifest_tokens = manifest_line.split(' ')
+ if args.verbose:
+ print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
+ stream_name = manifest_tokens[0]
+
+ line_block_uuids = set(filter(None,
+ [extractUuid(candidate)
+ for candidate in manifest_tokens[1:]]))
+ collection_info.block_uuids.update(line_block_uuids)
+
+ # file_tokens = [token
+ # for token in manifest_tokens[1:]
+ # if extractUuid(token) is None]
+
+ # # Sort file tokens by start position in case they aren't already
+ # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
+
+ # if args.verbose:
+ # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
+ # print 'file_tokens: ' + pprint.pformat(file_tokens)
+
+
+def readLinks():
+ link_classes = set()
+
+ for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+ collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
+ link_classes.update([link['link_class'] for link in collection_links_response['items']])
+ for link in collection_links_response['items']:
+ if link['link_class'] == 'permission':
+ collection_info.reader_uuids.add(link['tail_uuid'])
+ elif link['link_class'] == 'resources':
+ collection_info.persister_uuids.add(link['tail_uuid'])
+
+ print 'Found the following link classes:'
+ pprint.pprint(link_classes)
+
+def reportMostPopularCollections():
+ most_popular_collections = sorted(
+ CollectionInfo.all_by_uuid.values(),
+ key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
+ reverse=True)[:10]
+
+ print 'Most popular Collections:'
+ for collection_info in most_popular_collections:
+ print collection_info
+
+
+def buildMaps():
+ for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+ for block_uuid in collection_info.block_uuids:
+ block_to_collections[block_uuid].add(collection_uuid)
+ block_to_readers[block_uuid].update(collection_info.reader_uuids)
+ block_to_persisters[block_uuid].update(collection_info.persister_uuids)
+ for reader_uuid in collection_info.reader_uuids:
+ reader_to_collections[reader_uuid].add(collection_uuid)
+ reader_to_blocks[reader_uuid].update(collection_info.block_uuids)
+ for persister_uuid in collection_info.persister_uuids:
+ persister_to_collections[persister_uuid].add(collection_uuid)
+ persister_to_blocks[persister_uuid].update(collection_info.block_uuids)
+
+
+def itemsByValueLength(original):
+ return sorted(original.items(),
+ key=lambda item:len(item[1]),
+ reverse=True)
+
+
+def reportBusiestUsers():
+ busiest_readers = itemsByValueLength(reader_to_collections)
+ print 'The busiest readers are:'
+ for reader,collections in busiest_readers:
+ print '%s reading %d collections.' % (reader, len(collections))
+ busiest_persisters = itemsByValueLength(persister_to_collections)
+ print 'The busiest persisters are:'
+ for persister,collections in busiest_persisters:
+ print '%s reading %d collections.' % (persister, len(collections))
+
+
+def reportUserDiskUsage():
+ for user, blocks in reader_to_blocks.items():
+ user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
+ byteSizeFromValidUuid,
+ blocks))
+ user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
+ lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
+ len(block_to_readers[block_uuid])),
+ blocks))
+ for user, blocks in persister_to_blocks.items():
+ user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
+ byteSizeFromValidUuid,
+ blocks))
+ user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
+ lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
+ len(block_to_persisters[block_uuid])),
+ blocks))
+ print ('user: unweighted readable block size, weighted readable block size, '
+ 'unweighted persisted block size, weighted persisted block size:')
+ for user, usage in user_to_usage.items():
+ print ('%s: %s %s %s %s' %
+ (user,
+ fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
+ fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
+ fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
+
+
+
+# This is the main flow here
+
+parser = argparse.ArgumentParser(description='Report on keep disks.')
+parser.add_argument('-m',
+ '--max-api-results',
+ type=int,
+ default=5000,
+ help=('The max results to get at once.'))
+parser.add_argument('-v',
+ '--verbose',
+ help='increase output verbosity',
+ action='store_true')
+parser.add_argument('-u',
+ '--uuid',
+ help='uuid of specific collection to process')
+parser.add_argument('--require-admin-user',
+ action='store_true',
+ help='Fail if the user is not an admin [default]')
+parser.add_argument('--no-require-admin-user',
+ dest='require_admin_user',
+ action='store_false',
+ help='Allow users without admin permissions with only a warning.')
+args = parser.parse_args()
+
+checkUserIsAdmin()
+
+print 'Building Collection List'
+collection_uuids = filter(None, [extractUuid(candidate)
+ for candidate in buildCollectionsList()])
+
+print 'Reading Collections'
+readCollections(collection_uuids)
+
+if args.verbose:
+ pprint.pprint(CollectionInfo.all_by_uuid)
+
+print 'Reading Links'
+readLinks()
+
+reportMostPopularCollections()
+
+# These maps all map from uuids to a set of uuids
+# The sets all contain collection uuids.
+block_to_collections = defaultdict(set) # keep blocks
+reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
+persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
+block_to_readers = defaultdict(set)
+block_to_persisters = defaultdict(set)
+reader_to_blocks = defaultdict(set)
+persister_to_blocks = defaultdict(set)
+
+print 'Building Maps'
+buildMaps()
+
+reportBusiestUsers()
+
+UNWEIGHTED_READ_SIZE_COL = 0
+WEIGHTED_READ_SIZE_COL = 1
+UNWEIGHTED_PERSIST_SIZE_COL = 2
+WEIGHTED_PERSIST_SIZE_COL = 3
+NUM_COLS = 4
+user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
+
+reportUserDiskUsage()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list