[ARVADOS] updated: 42c9be2a3187291502efcebefc09ce603fd31106

git at public.curoverse.com git at public.curoverse.com
Tue Apr 8 20:00:30 EDT 2014


Summary of changes:
 services/datamanager/datamanager.py |   69 +++++++++++++++++++++++++++++------
 1 files changed, 57 insertions(+), 12 deletions(-)

       via  42c9be2a3187291502efcebefc09ce603fd31106 (commit)
      from  b586267c7ff0a5973ecf2f83b3fa5eb452669cfc (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 42c9be2a3187291502efcebefc09ce603fd31106
Author: Misha Zatsman <misha at curoverse.com>
Date:   Tue Apr 8 17:30:27 2014 +0000

    Added ability to disable server by specifying port 0. Started keeping api reponse just in case. Started tracking replication level for peristers (although still working on using it everywhere). Added maxdict to make replication tracking easier.

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index ae9fd0b..a5931b6 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -28,7 +28,13 @@ def fileSizeFormat(value):
 def byteSizeFromValidUuid(valid_uuid):
   return int(valid_uuid.split('+')[1])
 
+class maxdict(dict):
+  """A dictionary that holds the largest value entered for each key."""
+  def addValue(self, key, value):
+    dict.__setitem__(self, key, max(dict.get(self, key), value))
+
 class CollectionInfo:
+  DEFAULT_PERSISTER_REPLICATION_LEVEL=2
   all_by_uuid = {}
 
   def __init__(self, uuid):
@@ -38,6 +44,11 @@ class CollectionInfo:
     self.block_uuids = set()  # uuids of keep blocks in this collection
     self.reader_uuids = set()  # uuids of users who can read this collection
     self.persister_uuids = set()  # uuids of users who want this collection saved
+    # map from user uuid to replication level they desire
+    self.persister_replication = maxdict()
+
+    # The whole api response in case we need anything else later.
+    self.api_response = []
     CollectionInfo.all_by_uuid[uuid] = self
 
   def byteSize(self):
@@ -47,12 +58,12 @@ class CollectionInfo:
     return ('CollectionInfo uuid: %s\n'
             '               %d block(s) containing %s\n'
             '               reader_uuids: %s\n'
-            '               persister_uuids: %s' %
+            '               persister_replication: %s' %
             (self.uuid,
              len(self.block_uuids),
              fileSizeFormat(self.byteSize()),
              pprint.pformat(self.reader_uuids, indent = 15),
-             pprint.pformat(self.persister_uuids, indent = 15)))
+             pprint.pformat(self.persister_replication, indent = 15)))
 
   @staticmethod
   def get(uuid):
@@ -98,6 +109,7 @@ def readCollections(collection_uuids):
     collection_block_uuids = set()
     collection_response = arv.collections().get(uuid=collection_uuid).execute()
     collection_info = CollectionInfo.get(collection_uuid)
+    collection_info.api_response = collection_response
     manifest_lines = collection_response['manifest_text'].split('\n')
 
     if args.verbose:
@@ -138,6 +150,12 @@ def readLinks():
       if link['link_class'] == 'permission':
         collection_info.reader_uuids.add(link['tail_uuid'])
       elif link['link_class'] == 'resources':
+        replication_level = link['properties'].get(
+          'replication',
+          CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
+        collection_info.persister_replication.addValue(
+          link['tail_uuid'],
+          replication_level)
         collection_info.persister_uuids.add(link['tail_uuid'])
 
   print 'Found the following link classes:'
@@ -146,7 +164,7 @@ def readLinks():
 def reportMostPopularCollections():
   most_popular_collections = sorted(
     CollectionInfo.all_by_uuid.values(),
-    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
+    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
     reverse=True)[:10]
 
   print 'Most popular Collections:'
@@ -257,7 +275,7 @@ parser.add_argument('-p',
                     '--port',
                     type=int,
                     default=9090,
-                    help=('The port number to serve on.'))
+                    help=('The port number to serve on. 0 means no server.'))
 parser.add_argument('-v',
                     '--verbose',
                     help='increase output verbosity',
@@ -453,9 +471,32 @@ class DataManagerHandler(BaseHTTPRequestHandler):
                        fileSizeFormat(collection.byteSize()))
       self.wfile.write('<P>Readers: %s\n' %
                        ', '.join(map(self.userLink, collection.reader_uuids)))
-      self.wfile.write('<P>Persisters: %s\n' %
-                       ', '.join(map(self.userLink,
-                                     collection.persister_uuids)))
+
+      if len(collection.persister_replication) == 0:
+        self.wfile.write('<P>No persisters\n')
+      else:
+        replication_to_users = defaultdict(set)
+        for user,replication in collection.persister_replication.items():
+          replication_to_users[replication].add(user)
+        replication_levels = sorted(replication_to_users.keys())
+
+        self.wfile.write('<P>%d persisters in %d replication levels maxing '
+                         'out at %dx replication:\n' %
+                         (len(collection.persister_replication),
+                          len(replication_levels),
+                          replication_levels[-1]))
+
+        # TODO(misha): This code is used twice, let's move it to a method.
+        self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+                         '<TH>'.join(['Replication Level ' + str(x)
+                                      for x in replication_levels]))
+        self.wfile.write('<TR>\n')
+        for replication_level in replication_levels:
+          users = replication_to_users[replication_level]
+          self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
+              map(self.userLink, users)))
+        self.wfile.write('</TR></TABLE>\n')
+
       replication_to_blocks = defaultdict(set)
       for block in collection.block_uuids:
         replication_to_blocks[block_to_replication[block]].add(block)
@@ -463,7 +504,8 @@ class DataManagerHandler(BaseHTTPRequestHandler):
       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
                        (len(collection.block_uuids), len(replication_levels)))
       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                       '<TH>'.join(['Replication Level ' + str(x) for x in replication_levels]))
+                       '<TH>'.join(['Replication Level ' + str(x)
+                                    for x in replication_levels]))
       self.wfile.write('<TR>\n')
       for replication_level in replication_levels:
         blocks = replication_to_blocks[replication_level]
@@ -497,8 +539,11 @@ class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
 
 #if __name__ == '__main__':
 
-loader = threading.Thread(target = loadAllData, name = 'loader')
-loader.start()
+if args.port == 0:
+  loadAllData()
+else:
+  loader = threading.Thread(target = loadAllData, name = 'loader')
+  loader.start()
 
-server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-server.serve_forever()
+  server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+  server.serve_forever()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list