[ARVADOS] updated: bef2f8e7bc3bd70b286971885294e5da2729da7b

Git user git at public.curoverse.com
Fri Jun 24 18:10:35 EDT 2016


Summary of changes:
 sdk/python/arvados/keep.py           | 244 ++++++++---------------------------
 sdk/python/tests/test_keep_client.py | 103 +++++++--------
 2 files changed, 102 insertions(+), 245 deletions(-)

       via  bef2f8e7bc3bd70b286971885294e5da2729da7b (commit)
       via  9ccdea630f00035e96eb2bac539a4bd0d3df196d (commit)
      from  dea15524cdbe9f791722fc956e939ce593e450b6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit bef2f8e7bc3bd70b286971885294e5da2729da7b
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Jun 24 19:10:22 2016 -0300

    PySDK put() refactoring ready, all local tests worked OK

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e0e7797..682ca0e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -237,90 +237,6 @@ class KeepClient(object):
     DEFAULT_TIMEOUT = (2, 256, 32768)
     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
 
-    class ThreadLimiter(object):
-        """Limit the number of threads writing to Keep at once.
-
-        This ensures that only a number of writer threads that could
-        potentially achieve the desired replication level run at once.
-        Once the desired replication level is achieved, queued threads
-        are instructed not to run.
-
-        Should be used in a "with" block.
-        """
-        def __init__(self, want_copies, max_service_replicas):
-            self._started = 0
-            self._want_copies = want_copies
-            self._done = 0
-            self._thread_failures = 0
-            self._response = None
-            self._start_lock = threading.Condition()
-            if (not max_service_replicas) or (max_service_replicas >= want_copies):
-                max_threads = 1
-            else:
-                max_threads = math.ceil(float(want_copies) / max_service_replicas)
-            _logger.debug("Limiter max threads is %d", max_threads)
-            self._todo_lock = threading.Semaphore(max_threads)
-            self._done_lock = threading.Lock()
-            self._thread_failures_lock = threading.Lock()
-            self._local = threading.local()
-
-        def __enter__(self):
-            self._start_lock.acquire()
-            if getattr(self._local, 'sequence', None) is not None:
-                # If the calling thread has used set_sequence(N), then
-                # we wait here until N other threads have started.
-                while self._started < self._local.sequence:
-                    self._start_lock.wait()
-            self._todo_lock.acquire()
-            self._started += 1
-            self._start_lock.notifyAll()
-            self._start_lock.release()
-            return self
-
-        def __exit__(self, type, value, traceback):
-            with self._thread_failures_lock:
-                if self._thread_failures > 0:
-                    self._thread_failures -= 1
-                    self._todo_lock.release()
-
-            # If work is finished, release al pending threads
-            if not self.shall_i_proceed():
-                self._todo_lock.release()
-
-        def set_sequence(self, sequence):
-            self._local.sequence = sequence
-
-        def shall_i_proceed(self):
-            """
-            Return true if the current thread should write to Keep.
-            Return false otherwise.
-            """
-            with self._done_lock:
-                return (self._done < self._want_copies)
-
-        def save_response(self, response_body, replicas_stored):
-            """
-            Records a response body (a locator, possibly signed) returned by
-            the Keep server, and the number of replicas it stored.
-            """
-            if replicas_stored == 0:
-                # Failure notification, should start a new thread to try to reach full replication
-                with self._thread_failures_lock:
-                    self._thread_failures += 1
-            else:
-                with self._done_lock:
-                    self._done += replicas_stored
-                    self._response = response_body
-
-        def response(self):
-            """Return the body from the response to a PUT request."""
-            with self._done_lock:
-                return self._response
-
-        def done(self):
-            """Return the total number of replicas successfully stored."""
-            with self._done_lock:
-                return self._done
 
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
@@ -571,13 +487,15 @@ class KeepClient(object):
             Queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
+            self.response = None
             self.successful_copies_lock = threading.Lock()
             self.retries = copies
             self.retries_notification = threading.Condition()
         
-        def write_success(self, replicas_nr):
+        def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
+                self.response = response
         
         def write_fail(self, ks, status_code):
             with self.retries_notification:
@@ -589,38 +507,47 @@ class KeepClient(object):
     
     
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, copies=2, num_threads=None):
+        def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+            self.total_task_nr = 0
             self.wanted_copies = copies
-            if num_threads is None:
-                num_threads = copies
+            if (not max_service_replicas) or (max_service_replicas >= copies):
+                num_threads = 1
+            else:
+                num_threads = int(math.ceil(float(copies) / max_service_replicas))
+            _logger.debug("Pool max threads is %d", num_threads)
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
-            # Start workers
+            # Create workers
             for _ in range(num_threads):
-                w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash)
+                w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
         
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
+            self.total_task_nr += 1
         
         def done(self):
             return self.queue.successful_copies
         
-        def start(self):
+        def join(self):
+            # Start workers
             for worker in self.workers:
                 worker.start()
-        
-        def join(self):
+            # Wait for finished work
             self.queue.join()
             with self.queue.retries_notification:
                 self.queue.retries_notification.notify_all()
             for worker in self.workers:
                 worker.join()
+        
+        def response(self):
+            return self.queue.response
     
     
-    class KeepWriterThreadNew(threading.Thread):
-        def __init__(self, queue, data, data_hash):
-            super(KeepClient.KeepWriterThreadNew, self).__init__()
+    class KeepWriterThread(threading.Thread):
+        def __init__(self, queue, data, data_hash, timeout=None):
+            super(KeepClient.KeepWriterThread, self).__init__()
+            self.timeout = timeout
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
@@ -636,11 +563,16 @@ class KeepClient(object):
                         self.queue.retries -= 1
 
                     # Get to work
-                    service, service_root = self.queue.get()
-                    
+                    try:
+                        service, service_root = self.queue.get_nowait()
+                    except Queue.Empty:
+                        continue
+                    if service.finished():
+                        self.queue.task_done()
+                        continue
                     success = bool(service.put(self.data_hash,
                                                 self.data,
-                                                timeout=None))
+                                                timeout=self.timeout))
                     result = service.last_result()
                     if success:
                         _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -652,7 +584,8 @@ class KeepClient(object):
                             replicas_stored = int(result['headers']['x-keep-replicas-stored'])
                         except (KeyError, ValueError):
                             replicas_stored = 1
-                        self.queue.write_success(replicas_stored)
+                        
+                        self.queue.write_success(result['body'].strip(), replicas_stored)
                     else:
                         if result.get('status_code', None):
                             _logger.debug("Request fail: PUT %s => %s %s",
@@ -662,77 +595,14 @@ class KeepClient(object):
                         self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
                 else:
                     # Remove the task from the queue anyways
-                    self.queue.get()
+                    try:
+                        self.queue.get_nowait()
+                    except Queue.Empty:
+                        continue
                 # Mark as done so the queue can be join()ed
                 self.queue.task_done()
 
 
-    class KeepWriterThread(threading.Thread):
-        """
-        Write a blob of data to the given Keep server. On success, call
-        save_response() of the given ThreadLimiter to save the returned
-        locator.
-        """
-        def __init__(self, keep_service, **kwargs):
-            super(KeepClient.KeepWriterThread, self).__init__()
-            self.service = keep_service
-            self.args = kwargs
-            self._success = False
-
-        def success(self):
-            return self._success
-
-        def run(self):
-            limiter = self.args['thread_limiter']
-            sequence = self.args['thread_sequence']
-            if sequence is not None:
-                limiter.set_sequence(sequence)
-            with limiter:
-                if not limiter.shall_i_proceed():
-                    # My turn arrived, but the job has been done without
-                    # me.
-                    return
-                self.run_with_limiter(limiter)
-
-        def run_with_limiter(self, limiter):
-            if self.service.finished():
-                return
-            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
-                          str(threading.current_thread()),
-                          self.args['data_hash'],
-                          len(self.args['data']),
-                          self.args['service_root'])
-            self._success = bool(self.service.put(
-                self.args['data_hash'],
-                self.args['data'],
-                timeout=self.args.get('timeout', None)))
-            result = self.service.last_result()
-            if self._success:
-                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                              str(threading.current_thread()),
-                              self.args['data_hash'],
-                              len(self.args['data']),
-                              self.args['service_root'])
-                # Tick the 'done' counter for the number of replica
-                # reported stored by the server, for the case that
-                # we're talking to a proxy or other backend that
-                # stores to multiple copies for us.
-                try:
-                    replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                except (KeyError, ValueError):
-                    replicas_stored = 1
-                limiter.save_response(result['body'].strip(), replicas_stored)
-            elif result.get('status_code', None):
-                _logger.debug("Request fail: PUT %s => %s %s",
-                              self.args['data_hash'],
-                              result['status_code'],
-                              result['body'])
-            if not self._success:
-                # Notify the failure so that the Thread limiter allows
-                # a new one to run.
-                limiter.save_response(None, 0)
-
-
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
@@ -1169,30 +1039,22 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            thread_limiter = KeepClient.ThreadLimiter(
-                copies - done, self.max_replicas_per_service)
-            threads = []
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+                                                        data_hash=data_hash,
+                                                        copies=copies - done,
+                                                        max_service_replicas=self.max_replicas_per_service,
+                                                        timeout=self.current_timeout(num_retries - tries_left))
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
-                t = KeepClient.KeepWriterThread(
-                    ks,
-                    data=data,
-                    data_hash=data_hash,
-                    service_root=service_root,
-                    thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=len(threads))
-                t.start()
-                threads.append(t)
-            for t in threads:
-                t.join()
-            done += thread_limiter.done()
-            loop.save_result((done >= copies, len(threads)))
+                writer_pool.add_task(ks, service_root)
+            writer_pool.join()
+            done += writer_pool.done()
+            loop.save_result((done >= copies, writer_pool.total_task_nr))
 
         if loop.success():
-            return thread_limiter.response()
+            return writer_pool.response()
         if not roots_map:
             raise arvados.errors.KeepWriteError(
                 "failed to write {}: no Keep services available ({})".format(
@@ -1203,7 +1065,7 @@ class KeepClient(object):
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, thread_limiter.done()), service_errors, label="service")
+                    data_hash, copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 314b93d..3befc09 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1065,7 +1065,7 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_exception(copies=2, num_retries=3)
 
 
-class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock):
+class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
     
     
     class FakeKeepService(object):
@@ -1075,6 +1075,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+            self._result['body'] = 'foobar'
         
         def put(self, data_hash, data, timeout):
             time.sleep(self.delay)
@@ -1082,6 +1083,9 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
         
         def last_result(self):
             return self._result
+        
+        def finished(self):
+            return False
     
     
     def test_only_write_enough_on_success(self):
@@ -1089,12 +1093,12 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
         pool = arvados.KeepClient.KeepWriterThreadPool(
             data = 'foo',
             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+            max_service_replicas = copies,
             copies = copies
         )
         for i in range(10):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             pool.add_task(ks, None)
-        pool.start()
         pool.join()
         self.assertEqual(pool.done(), copies)
 
@@ -1103,6 +1107,7 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
         pool = arvados.KeepClient.KeepWriterThreadPool(
             data = 'foo',
             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+            max_service_replicas = copies,
             copies = copies
         )
         for i in range(5):
@@ -1110,72 +1115,10 @@ class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.A
             pool.add_task(ks, None)
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             pool.add_task(ks, None)
-        pool.start()
         pool.join()
         self.assertEqual(pool.done(), copies)
     
 
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
-    class KeepFakeWriterThread(threading.Thread):
-        """
-        Just Simulating the real KeepClient.KeepWriterThread, to test the ThreadLimiter.
-        """
-        def __init__(self, delay, will_succeed, thread_limiter):
-            super(KeepClientAvoidClientOverreplicationTestCase.KeepFakeWriterThread, self).__init__()
-            self.delay = delay # in seconds
-            self.success = will_succeed
-            self.limiter = thread_limiter
-
-        def run(self):
-            with self.limiter:
-                if not self.limiter.shall_i_proceed():
-                    return
-                time.sleep(self.delay)
-                if self.success:
-                    self.limiter.save_response('foo', 1)
-                else:
-                    self.limiter.save_response(None, 0)
-
-    def test_only_write_enough_on_success(self):
-        copies = 3
-        threads = []
-        limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
-        # Setting up fake writer threads with different delays so that the bug is revealed
-        for i in range(copies*2):
-            t = self.KeepFakeWriterThread(
-                    delay=i/10.0,
-                    will_succeed=True,
-                    thread_limiter=limiter)
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
-        self.assertEqual(limiter.done(), copies)
-
-    def test_only_write_enough_on_partial_failure(self):
-        copies = 3
-        threads = []
-        limiter = arvados.KeepClient.ThreadLimiter(want_copies=copies, max_service_replicas=1)
-        for i in range(copies):
-            t = self.KeepFakeWriterThread(
-                    delay=i/10.0,
-                    will_succeed=False,
-                    thread_limiter=limiter)
-            t.start()
-            threads.append(t)
-            t = self.KeepFakeWriterThread(
-                    delay=i/10.0,
-                    will_succeed=True,
-                    thread_limiter=limiter)
-            t.start()
-            threads.append(t)
-        for t in threads:
-            t.join()
-        self.assertEqual(limiter.done(), copies)
-
-
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
     # Test put()s that need two distinct servers to succeed, possibly

commit 9ccdea630f00035e96eb2bac539a4bd0d3df196d
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Fri Jun 24 12:34:55 2016 -0300

    Some tests done on new code

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 5d511e4..e0e7797 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -568,7 +568,7 @@ class KeepClient(object):
 
     class KeepWriterQueue(Queue.Queue):
         def __init__(self, copies):
-            super(KeepClient.KeepWriterQueue, self).__init__()
+            Queue.Queue.__init__(self) # Old-style superclass
             self.wanted_copies = copies
             self.successful_copies = 0
             self.successful_copies_lock = threading.Lock()
@@ -589,18 +589,21 @@ class KeepClient(object):
     
     
     class KeepWriterThreadPool(object):
-        def __init__(self, data, data_hash, num_threads, copies=2):
+        def __init__(self, data, data_hash, copies=2, num_threads=None):
             self.wanted_copies = copies
+            if num_threads is None:
+                num_threads = copies
             self.workers = []
             self.queue = KeepClient.KeepWriterQueue(copies)
             # Start workers
             for _ in range(num_threads):
-                self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
+                w = KeepClient.KeepWriterThreadNew(self.queue, data, data_hash)
+                self.workers.append(w)
         
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
         
-        def successful_copies(self):
+        def done(self):
             return self.queue.successful_copies
         
         def start(self):
@@ -621,7 +624,6 @@ class KeepClient(object):
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
-            self.daemon = True
         
         def run(self):
             while not self.queue.empty():
@@ -636,9 +638,9 @@ class KeepClient(object):
                     # Get to work
                     service, service_root = self.queue.get()
                     
-                    success = bool(self.service.put(self.data_hash,
-                                                    self.data,
-                                                    timeout=None))
+                    success = bool(service.put(self.data_hash,
+                                                self.data,
+                                                timeout=None))
                     result = service.last_result()
                     if success:
                         _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
@@ -1153,7 +1155,7 @@ class KeepClient(object):
 
         headers = {}
         # Tell the proxy how many copies we want it to store
-        headers['X-Keep-Desired-Replication'] = str(copies)
+        headers['X-Keep-Desired-Replicas'] = str(copies)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 977f05e..314b93d 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -1065,6 +1065,56 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_exception(copies=2, num_retries=3)
 
 
+class KeepClientAvoidClientOverreplicationTestCaseNew(unittest.TestCase, tutil.ApiClientMock):
+    
+    
+    class FakeKeepService(object):
+        def __init__(self, delay, will_succeed, replicas=1):
+            self.delay = delay
+            self.success = will_succeed
+            self._result = {}
+            self._result['headers'] = {}
+            self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+        
+        def put(self, data_hash, data, timeout):
+            time.sleep(self.delay)
+            return self.success
+        
+        def last_result(self):
+            return self._result
+    
+    
+    def test_only_write_enough_on_success(self):
+        copies = 3
+        pool = arvados.KeepClient.KeepWriterThreadPool(
+            data = 'foo',
+            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+            copies = copies
+        )
+        for i in range(10):
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            pool.add_task(ks, None)
+        pool.start()
+        pool.join()
+        self.assertEqual(pool.done(), copies)
+
+    def test_only_write_enough_on_partial_success(self):
+        copies = 3
+        pool = arvados.KeepClient.KeepWriterThreadPool(
+            data = 'foo',
+            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+            copies = copies
+        )
+        for i in range(5):
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
+            pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            pool.add_task(ks, None)
+        pool.start()
+        pool.join()
+        self.assertEqual(pool.done(), copies)
+    
+
 class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
 
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list