[ARVADOS] updated: 0ea1f67f70f942c4732f2269e31c3ddb7d63fc9e

Git user git at public.curoverse.com
Mon Jun 27 14:36:34 EDT 2016


Summary of changes:
 sdk/python/arvados/keep.py | 35 +++++++++++++++++++----------------
 1 file changed, 19 insertions(+), 16 deletions(-)

       via  0ea1f67f70f942c4732f2269e31c3ddb7d63fc9e (commit)
      from  bef2f8e7bc3bd70b286971885294e5da2729da7b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0ea1f67f70f942c4732f2269e31c3ddb7d63fc9e
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Mon Jun 27 15:36:17 2016 -0300

    9446: Applying Peter's review suggestions. refs #9446

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 682ca0e..778b909 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -489,8 +489,8 @@ class KeepClient(object):
             self.successful_copies = 0
             self.response = None
             self.successful_copies_lock = threading.Lock()
-            self.retries = copies
-            self.retries_notification = threading.Condition()
+            self.pending_tries = copies
+            self.pending_tries_notification = threading.Condition()
         
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
@@ -498,12 +498,13 @@ class KeepClient(object):
                 self.response = response
         
         def write_fail(self, ks, status_code):
-            with self.retries_notification:
-                self.retries += 1
-                self.retries_notification.notify()
+            with self.pending_tries_notification:
+                self.pending_tries += 1
+                self.pending_tries_notification.notify()
         
         def pending_copies(self):
-            return self.wanted_copies - self.successful_copies
+            with self.successful_copies_lock:
+                return self.wanted_copies - self.successful_copies
     
     
     class KeepWriterThreadPool(object):
@@ -535,8 +536,8 @@ class KeepClient(object):
                 worker.start()
             # Wait for finished work
             self.queue.join()
-            with self.queue.retries_notification:
-                self.queue.retries_notification.notify_all()
+            with self.queue.pending_tries_notification:
+                self.queue.pending_tries_notification.notify_all()
             for worker in self.workers:
                 worker.join()
         
@@ -555,12 +556,12 @@ class KeepClient(object):
         def run(self):
             while not self.queue.empty():
                 if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed retry
-                    with self.queue.retries_notification:
-                        if not self.queue.retries > 0:
-                            self.queue.retries_notification.wait()
+                    # Avoid overreplication, wait for some needed re-attempt
+                    with self.queue.pending_tries_notification:
+                        if self.queue.pending_tries <= 0:
+                            self.queue.pending_tries_notification.wait()
                             continue # try again when awake
-                        self.queue.retries -= 1
+                        self.queue.pending_tries -= 1
 
                     # Get to work
                     try:
@@ -592,15 +593,17 @@ class KeepClient(object):
                                           self.data_hash,
                                           result['status_code'],
                                           result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
+                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
+                    # Mark as done so the queue can be join()ed
+                    self.queue.task_done()
                 else:
                     # Remove the task from the queue anyways
                     try:
                         self.queue.get_nowait()
+                        # Mark as done so the queue can be join()ed
+                        self.queue.task_done()
                     except Queue.Empty:
                         continue
-                # Mark as done so the queue can be join()ed
-                self.queue.task_done()
 
 
     def __init__(self, api_client=None, proxy=None,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list