[ARVADOS] updated: f4925bb0e7d9de484579b71087f181b9dbea5bff

git at public.curoverse.com git at public.curoverse.com
Thu Apr 10 20:07:50 EDT 2014


Summary of changes:
 services/datamanager/datamanager.py      |  163 ++++++++++++++++++++++++++----
 services/datamanager/datamanager_test.py |   41 ++++++++
 2 files changed, 184 insertions(+), 20 deletions(-)
 create mode 100755 services/datamanager/datamanager_test.py

       via  f4925bb0e7d9de484579b71087f181b9dbea5bff (commit)
       via  ab66542f7c9c2dd4909e1bf3d03ef5df5f95a6f9 (commit)
       via  ec4e2b3707c030dab6081db499adc5ac369e61bd (commit)
       via  b189f5ed7a6fd3585535b9d9d3d587115cf383e7 (commit)
       via  2fce84531f36daa2323276f0c6b147efe82852ec (commit)
       via  5254bebf574f0f72bb0321e944d089ad6f6c25e2 (commit)
      from  42c9be2a3187291502efcebefc09ce603fd31106 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f4925bb0e7d9de484579b71087f181b9dbea5bff
Author: Misha Zatsman <misha at curoverse.com>
Date:   Thu Apr 10 23:56:26 2014 +0000

    Added disk usage logging! logs are written to api server.
    
    Switched logic for readers, actual disk space is no longer examined,
    instad it is the sum of the byte size of the collections (including
    the manifest size) they requested to read. In rertrospect I should
    probably report both the requested size and the size of the blocks
    that actually appear on disk.
    
    split reportUserDiskUsage() into computeUserStorageUsage() and
    printUserStorageUsage(). Added logUserStorageUsage() to do the logging
    mentioned above.
    
    Moved command line argument parser out of the __main__ block so that
    it can be accessed by other packages who wish to support commandline
    arguments for this package even though they may have their own main
    block.
    
    Added new command line args for api server logging.
    
    refs #2579 #1757

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index 53f5e7c..ad7ac9c 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -150,6 +150,9 @@ def readLinks():
   link_classes = set()
 
   for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
+    # TODO(misha): We may not be seing all the links, but since items
+    # available does not return an accurate number, I don't knos how
+    # to confirm that we saw all of them.
     collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
     link_classes.update([link['link_class'] for link in collection_links_response['items']])
     for link in collection_links_response['items']:
@@ -286,13 +289,14 @@ def blockPersistedWeightedUsage(user_uuid, block_uuid):
     computeWeightedReplicationCosts(
       persister_replication_for_block.values())[user_replication])
 
-def reportUserDiskUsage():
+
+def computeUserStorageUsage():
   for user, blocks in reader_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
-        blockDiskUsage,
+        byteSizeFromValidUuid,
         blocks))
     user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
-        lambda block_uuid:(float(blockDiskUsage(block_uuid))/
+        lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
                                  len(block_to_readers[block_uuid])),
         blocks))
   for user, blocks in persister_to_blocks.items():
@@ -303,6 +307,7 @@ def reportUserDiskUsage():
         partial(blockPersistedWeightedUsage, user),
         blocks))
 
+def printUserStorageUsage():
   print ('user: unweighted readable block size, weighted readable block size, '
          'unweighted persisted block size, weighted persisted block size:')
   for user, usage in user_to_usage.items():
@@ -313,6 +318,24 @@ def reportUserDiskUsage():
             fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
             fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
 
+def logUserStorageUsage():
+  for user, usage in user_to_usage.items():
+    body = {}
+    # user could actually represent a user or a group. We don't set
+    # the object_type field since we don't know which we have.
+    body['object_uuid'] = user
+    body['event_type'] = args.user_storage_log_event_type
+    info = {}
+    info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+    info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
+    info['persisted_collections_total_bytes'] = (
+      usage[UNWEIGHTED_PERSIST_SIZE_COL])
+    info['persisted_collections_weighted_bytes'] = (
+      usage[WEIGHTED_PERSIST_SIZE_COL])
+    body['info'] = info
+    # TODO(misha): Confirm that this will throw an exception if it
+    # fails to create the log entry.
+    arv.logs().create(body=body).execute()
 
 def getKeepServers():
   response = arv.keep_disks().list().execute()
@@ -338,9 +361,55 @@ def computeReplication(keep_blocks):
 
 # This is the main flow here
 
+parser = argparse.ArgumentParser(description='Report on keep disks.')
+"""The command line argument parser we use.
+
+We only use it in the __main__ block, but leave it outside the block
+in case another package wants to use it or customize it by specifying
+it as a parent to their commandline parser.
+"""
+parser.add_argument('-m',
+                    '--max-api-results',
+                    type=int,
+                    default=5000,
+                    help=('The max results to get at once.'))
+parser.add_argument('-p',
+                    '--port',
+                    type=int,
+                    default=9090,
+                    help=('The port number to serve on. 0 means no server.'))
+parser.add_argument('-v',
+                    '--verbose',
+                    help='increase output verbosity',
+                    action='store_true')
+parser.add_argument('-u',
+                    '--uuid',
+                    help='uuid of specific collection to process')
+parser.add_argument('--require-admin-user',
+                    action='store_true',
+                    default=True,
+                    help='Fail if the user is not an admin [default]')
+parser.add_argument('--no-require-admin-user',
+                    dest='require_admin_user',
+                    action='store_false',
+                    help=('Allow users without admin permissions with '
+                          'only a warning.'))
+parser.add_argument('--log-to-workbench',
+                    action='store_true',
+                    default=False,
+                    help='Log findings to workbench')
+parser.add_argument('--no-log-to-workbench',
+                    dest='log_to_workbench',
+                    action='store_false',
+                    help='Don\'t log findings to workbench [default]')
+parser.add_argument('--user-storage-log-event-type',
+                    default='user-storage-report',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
 
 args = None
 
+# TODO(misha): Think about moving some of this to the __main__ block.
 log = logging.getLogger('arvados.services.datamanager')
 stderr_handler = logging.StreamHandler()
 log.setLevel(logging.INFO)
@@ -412,7 +481,10 @@ def loadAllData():
 
   log.info('average replication level is %f', (float(sum(block_to_replication.values())) / len(block_to_replication)))
 
-  reportUserDiskUsage()
+  computeUserStorageUsage()
+  printUserStorageUsage()
+  if args.log_to_workbench:
+    logUserStorageUsage()
 
   global all_data_loaded
   all_data_loaded = True
@@ -587,34 +659,6 @@ class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
   """Handle requests in a separate thread."""
 
 if __name__ == '__main__':
-  parser = argparse.ArgumentParser(description='Report on keep disks.')
-  parser.add_argument('-m',
-                      '--max-api-results',
-                      type=int,
-                      default=5000,
-                      help=('The max results to get at once.'))
-  parser.add_argument('-p',
-                      '--port',
-                      type=int,
-                      default=9090,
-                      help=('The port number to serve on. 0 means no server.'))
-  parser.add_argument('-v',
-                      '--verbose',
-                      help='increase output verbosity',
-                      action='store_true')
-  parser.add_argument('-u',
-                      '--uuid',
-                      help='uuid of specific collection to process')
-  parser.add_argument('--require-admin-user',
-                      action='store_true',
-                      default=True,
-                      help='Fail if the user is not an admin [default]')
-  parser.add_argument('--no-require-admin-user',
-                      dest='require_admin_user',
-                      action='store_false',
-                      help='Allow users without admin permissions with only a warning.')
-
-
   args = parser.parse_args()
 
 

commit ab66542f7c9c2dd4909e1bf3d03ef5df5f95a6f9
Author: Misha Zatsman <misha at curoverse.com>
Date:   Thu Apr 10 00:09:47 2014 +0000

    Added weighted persist disk usage. Removed debugging line I had forgotten. refs #2572.

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index be2d81b..53f5e7c 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -229,7 +229,8 @@ def computeWeightedReplicationCosts(replication_levels):
   """Computes the relative cost of varied replication levels.
 
   replication_levels: a tuple of integers representing the desired
-  replication level.
+  replication level. If n users want a replication level of x then x
+  should appear n times in replication_levels.
 
   Returns a dictionary from replication level to cost.
 
@@ -275,9 +276,16 @@ def computeWeightedReplicationCosts(replication_levels):
       total_interested -= count
     memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
 
-  print memo_computeWeightedReplicationCosts
   return memo_computeWeightedReplicationCosts[memo_key]
 
+def blockPersistedWeightedUsage(user_uuid, block_uuid):
+  persister_replication_for_block = block_to_persister_replication[block_uuid]
+  user_replication = persister_replication_for_block[user_uuid]
+  return (
+    byteSizeFromValidUuid(block_uuid) *
+    computeWeightedReplicationCosts(
+      persister_replication_for_block.values())[user_replication])
+
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
@@ -292,9 +300,9 @@ def reportUserDiskUsage():
         partial(blockPersistedUsage, user),
         blocks))
     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        lambda block_uuid:(float(blockDiskUsage(block_uuid))/
-                                 len(block_to_persisters[block_uuid])),
+        partial(blockPersistedWeightedUsage, user),
         blocks))
+
   print ('user: unweighted readable block size, weighted readable block size, '
          'unweighted persisted block size, weighted persisted block size:')
   for user, usage in user_to_usage.items():

commit ec4e2b3707c030dab6081db499adc5ac369e61bd
Author: Misha Zatsman <misha at curoverse.com>
Date:   Wed Apr 9 23:44:40 2014 +0000

    Added memoization to computeWeightedReplicationCosts.

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index bec43ef..be2d81b 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -224,6 +224,7 @@ def blockPersistedUsage(user_uuid, block_uuid):
   return (byteSizeFromValidUuid(block_uuid) *
           block_to_persister_replication[block_uuid].get(user_uuid, 0))
 
+memo_computeWeightedReplicationCosts = {}
 def computeWeightedReplicationCosts(replication_levels):
   """Computes the relative cost of varied replication levels.
 
@@ -235,7 +236,7 @@ def computeWeightedReplicationCosts(replication_levels):
   The basic thinking is that the cost of replicating at level x should
   be shared by everyone who wants replication of level x or higher.
 
-  For example, if I have two users who want 1 copy, one user who
+  For example, if we have two users who want 1 copy, one user who
   wants 3 copies and two users who want 6 copies:
   the input would be [1, 1, 3, 6, 6] (or any permutation)
 
@@ -256,22 +257,26 @@ def computeWeightedReplicationCosts(replication_levels):
   computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
   """
   replication_level_counts = sorted(Counter(replication_levels).items())
-  # The above, written to a string, could also serve as a hash key if
-  # we want to save on computation
-
-  last_level = 0
-  current_cost = 0
-  total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
-  cost_for_level = {}
-  for replication_level, count in replication_level_counts:
-    copies_added = replication_level - last_level
-    # compute marginal cost from last level and add it to the last cost
-    current_cost += copies_added / total_interested
-    cost_for_level[replication_level] = current_cost
-    # update invariants
-    last_level = replication_level
-    total_interested -= count
-  return cost_for_level
+
+  memo_key = str(replication_level_counts)
+
+  if not memo_key in memo_computeWeightedReplicationCosts:
+    last_level = 0
+    current_cost = 0
+    total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+    cost_for_level = {}
+    for replication_level, count in replication_level_counts:
+      copies_added = replication_level - last_level
+      # compute marginal cost from last level and add it to the last cost
+      current_cost += copies_added / total_interested
+      cost_for_level[replication_level] = current_cost
+      # update invariants
+      last_level = replication_level
+      total_interested -= count
+    memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
+
+  print memo_computeWeightedReplicationCosts
+  return memo_computeWeightedReplicationCosts[memo_key]
 
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():

commit b189f5ed7a6fd3585535b9d9d3d587115cf383e7
Author: Misha Zatsman <misha at curoverse.com>
Date:   Wed Apr 9 22:23:52 2014 +0000

    Added unittests for computeWeightedReplicationCosts. Moved arg parsing to main block so that I can use commandline flags for tests when testing. Fixed lots of bugs surfaced by tests.

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index 666df97..bec43ef 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -255,22 +255,23 @@ def computeWeightedReplicationCosts(replication_levels):
   computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
   computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
   """
-  replication_level_counts = sorted(Counter(replication_levels))
+  replication_level_counts = sorted(Counter(replication_levels).items())
   # The above, written to a string, could also serve as a hash key if
   # we want to save on computation
 
   last_level = 0
+  current_cost = 0
   total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
   cost_for_level = {}
   for replication_level, count in replication_level_counts:
-    # compute cost
     copies_added = replication_level - last_level
-    cost_for_level[replication_level] = (
-      cost_for_level[last_level] +
-      (total_interested * copies_added) / count)
+    # compute marginal cost from last level and add it to the last cost
+    current_cost += copies_added / total_interested
+    cost_for_level[replication_level] = current_cost
     # update invariants
     last_level = replication_level
     total_interested -= count
+  return cost_for_level
 
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
@@ -324,33 +325,8 @@ def computeReplication(keep_blocks):
 
 # This is the main flow here
 
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-parser.add_argument('-m',
-                    '--max-api-results',
-                    type=int,
-                    default=5000,
-                    help=('The max results to get at once.'))
-parser.add_argument('-p',
-                    '--port',
-                    type=int,
-                    default=9090,
-                    help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
-                    '--verbose',
-                    help='increase output verbosity',
-                    action='store_true')
-parser.add_argument('-u',
-                    '--uuid',
-                    help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
-                    action='store_true',
-                    default=True,
-                    help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
-                    dest='require_admin_user',
-                    action='store_false',
-                    help='Allow users without admin permissions with only a warning.')
-args = parser.parse_args()
+
+args = None
 
 log = logging.getLogger('arvados.services.datamanager')
 stderr_handler = logging.StreamHandler()
@@ -598,6 +574,37 @@ class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
   """Handle requests in a separate thread."""
 
 if __name__ == '__main__':
+  parser = argparse.ArgumentParser(description='Report on keep disks.')
+  parser.add_argument('-m',
+                      '--max-api-results',
+                      type=int,
+                      default=5000,
+                      help=('The max results to get at once.'))
+  parser.add_argument('-p',
+                      '--port',
+                      type=int,
+                      default=9090,
+                      help=('The port number to serve on. 0 means no server.'))
+  parser.add_argument('-v',
+                      '--verbose',
+                      help='increase output verbosity',
+                      action='store_true')
+  parser.add_argument('-u',
+                      '--uuid',
+                      help='uuid of specific collection to process')
+  parser.add_argument('--require-admin-user',
+                      action='store_true',
+                      default=True,
+                      help='Fail if the user is not an admin [default]')
+  parser.add_argument('--no-require-admin-user',
+                      dest='require_admin_user',
+                      action='store_false',
+                      help='Allow users without admin permissions with only a warning.')
+
+
+  args = parser.parse_args()
+
+
   if args.port == 0:
     loadAllData()
   else:
diff --git a/services/datamanager/datamanager_test.py b/services/datamanager/datamanager_test.py
new file mode 100755
index 0000000..0842c16
--- /dev/null
+++ b/services/datamanager/datamanager_test.py
@@ -0,0 +1,41 @@
+#! /usr/bin/env python
+
+import datamanager
+import unittest
+
+class TestComputeWeightedReplicationCosts(unittest.TestCase):
+  def test_obvious(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
+                     {1:1.0})
+
+  def test_simple(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
+                     {2:2.0})
+
+  def test_even_split(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
+                     {1:0.5})
+
+  def test_even_split_bigger(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
+                     {2:1.0})
+
+  def test_uneven_split(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
+                     {1:0.5, 2:1.5})
+
+  def test_uneven_split_bigger(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
+                     {1:0.5, 3:2.5})
+
+  def test_uneven_split_jumble(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
+                     {1:0.2, 3:0.7, 6:1.7, 10:5.7})
+
+  def test_documentation_example(self):
+    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
+                     {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
+
+
+if __name__ == '__main__':
+  unittest.main()

commit 2fce84531f36daa2323276f0c6b147efe82852ec
Author: Misha Zatsman <misha at curoverse.com>
Date:   Wed Apr 9 20:44:50 2014 +0000

    Added method to computed weighted cost of different replication levels. Added main method to aid in unit testing.

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index a016fb8..666df97 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -12,7 +12,7 @@ import threading
 import urllib2
 
 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict
+from collections import defaultdict, Counter
 from functools import partial
 from operator import itemgetter
 from SocketServer import ThreadingMixIn
@@ -224,6 +224,54 @@ def blockPersistedUsage(user_uuid, block_uuid):
   return (byteSizeFromValidUuid(block_uuid) *
           block_to_persister_replication[block_uuid].get(user_uuid, 0))
 
+def computeWeightedReplicationCosts(replication_levels):
+  """Computes the relative cost of varied replication levels.
+
+  replication_levels: a tuple of integers representing the desired
+  replication level.
+
+  Returns a dictionary from replication level to cost.
+
+  The basic thinking is that the cost of replicating at level x should
+  be shared by everyone who wants replication of level x or higher.
+
+  For example, if I have two users who want 1 copy, one user who
+  wants 3 copies and two users who want 6 copies:
+  the input would be [1, 1, 3, 6, 6] (or any permutation)
+
+  The cost of the first copy is shared by all 5 users, so they each
+  pay 1 copy / 5 users = 0.2.
+  The cost of the second and third copies shared by 3 users, so they
+  each pay 2 copies / 3 users = 0.67 (plus the above costs)
+  The cost of the fourth, fifth and sixth copies is shared by two
+  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
+
+  Here are some sample other examples:
+  computeWeightedReplicationCosts([1,]) -> {1:1.0}
+  computeWeightedReplicationCosts([2,]) -> {2:2.0}
+  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
+  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
+  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
+  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
+  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
+  """
+  replication_level_counts = sorted(Counter(replication_levels))
+  # The above, written to a string, could also serve as a hash key if
+  # we want to save on computation
+
+  last_level = 0
+  total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
+  cost_for_level = {}
+  for replication_level, count in replication_level_counts:
+    # compute cost
+    copies_added = replication_level - last_level
+    cost_for_level[replication_level] = (
+      cost_for_level[last_level] +
+      (total_interested * copies_added) / count)
+    # update invariants
+    last_level = replication_level
+    total_interested -= count
+
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
@@ -549,13 +597,12 @@ class DataManagerHandler(BaseHTTPRequestHandler):
 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
   """Handle requests in a separate thread."""
 
-#if __name__ == '__main__':
-
-if args.port == 0:
-  loadAllData()
-else:
-  loader = threading.Thread(target = loadAllData, name = 'loader')
-  loader.start()
+if __name__ == '__main__':
+  if args.port == 0:
+    loadAllData()
+  else:
+    loader = threading.Thread(target = loadAllData, name = 'loader')
+    loader.start()
 
-  server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-  server.serve_forever()
+    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+    server.serve_forever()

commit 5254bebf574f0f72bb0321e944d089ad6f6c25e2
Author: Misha Zatsman <misha at curoverse.com>
Date:   Wed Apr 9 16:56:55 2014 +0000

    Started reporting unweighted persist usage based on requested replication, not actual replication. refs #2572

diff --git a/services/datamanager/datamanager.py b/services/datamanager/datamanager.py
index a5931b6..a016fb8 100755
--- a/services/datamanager/datamanager.py
+++ b/services/datamanager/datamanager.py
@@ -13,6 +13,7 @@ import urllib2
 
 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
 from collections import defaultdict
+from functools import partial
 from operator import itemgetter
 from SocketServer import ThreadingMixIn
 
@@ -32,6 +33,11 @@ class maxdict(dict):
   """A dictionary that holds the largest value entered for each key."""
   def addValue(self, key, value):
     dict.__setitem__(self, key, max(dict.get(self, key), value))
+  def addValues(self, kv_pairs):
+    for key,value in kv_pairs:
+      self.addValue(key, value)
+  def addDict(self, d):
+    self.addValues(d.items())
 
 class CollectionInfo:
   DEFAULT_PERSISTER_REPLICATION_LEVEL=2
@@ -180,6 +186,8 @@ def buildMaps():
       block_to_collections[block_uuid].add(collection_uuid)
       block_to_readers[block_uuid].update(collection_info.reader_uuids)
       block_to_persisters[block_uuid].update(collection_info.persister_uuids)
+      block_to_persister_replication[block_uuid].addDict(
+        collection_info.persister_replication)
     for reader_uuid in collection_info.reader_uuids:
       reader_to_collections[reader_uuid].add(collection_uuid)
       reader_to_blocks[reader_uuid].update(block_uuids)
@@ -212,6 +220,9 @@ def blockDiskUsage(block_uuid):
   """
   return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
 
+def blockPersistedUsage(user_uuid, block_uuid):
+  return (byteSizeFromValidUuid(block_uuid) *
+          block_to_persister_replication[block_uuid].get(user_uuid, 0))
 
 def reportUserDiskUsage():
   for user, blocks in reader_to_blocks.items():
@@ -224,7 +235,7 @@ def reportUserDiskUsage():
         blocks))
   for user, blocks in persister_to_blocks.items():
     user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        blockDiskUsage,
+        partial(blockPersistedUsage, user),
         blocks))
     user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
         lambda block_uuid:(float(blockDiskUsage(block_uuid))/
@@ -309,6 +320,7 @@ reader_to_collections = defaultdict(set)  # collection(s) for which the user has
 persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
 block_to_readers = defaultdict(set)
 block_to_persisters = defaultdict(set)
+block_to_persister_replication = defaultdict(maxdict)
 reader_to_blocks = defaultdict(set)
 persister_to_blocks = defaultdict(set)
 
@@ -480,7 +492,7 @@ class DataManagerHandler(BaseHTTPRequestHandler):
           replication_to_users[replication].add(user)
         replication_levels = sorted(replication_to_users.keys())
 
-        self.wfile.write('<P>%d persisters in %d replication levels maxing '
+        self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
                          'out at %dx replication:\n' %
                          (len(collection.persister_replication),
                           len(replication_levels),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list