[ARVADOS] created: dea15524cdbe9f791722fc956e939ce593e450b6

Git user git at public.curoverse.com
Thu Jun 23 18:52:17 EDT 2016


        at  dea15524cdbe9f791722fc956e939ce593e450b6 (commit)


commit dea15524cdbe9f791722fc956e939ce593e450b6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Jun 23 19:47:19 2016 -0300

    One last method before calling it a day

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e27885e..5d511e4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -606,6 +606,13 @@ class KeepClient(object):
         def start(self):
             for worker in self.workers:
                 worker.start()
+        
+        def join(self):
+            self.queue.join()
+            with self.queue.retries_notification:
+                self.queue.retries_notification.notify_all()
+            for worker in self.workers:
+                worker.join()
     
     
     class KeepWriterThreadNew(threading.Thread):

commit 09d1049dab1fcfbdad8332f1d74a091aebfa0e66
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Jun 23 19:38:23 2016 -0300

    Ready to start testing code

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 9e9fb00..e27885e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -564,6 +564,98 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
+    
+
+    class KeepWriterQueue(Queue.Queue):
+        def __init__(self, copies):
+            super(KeepClient.KeepWriterQueue, self).__init__()
+            self.wanted_copies = copies
+            self.successful_copies = 0
+            self.successful_copies_lock = threading.Lock()
+            self.retries = copies
+            self.retries_notification = threading.Condition()
+        
+        def write_success(self, replicas_nr):
+            with self.successful_copies_lock:
+                self.successful_copies += replicas_nr
+        
+        def write_fail(self, ks, status_code):
+            with self.retries_notification:
+                self.retries += 1
+                self.retries_notification.notify()
+        
+        def pending_copies(self):
+            return self.wanted_copies - self.successful_copies
+    
+    
+    class KeepWriterThreadPool(object):
+        def __init__(self, data, data_hash, num_threads, copies=2):
+            self.wanted_copies = copies
+            self.workers = []
+            self.queue = KeepClient.KeepWriterQueue(copies)
+            # Start workers
+            for _ in range(num_threads):
+                self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
+        
+        def add_task(self, ks, service_root):
+            self.queue.put((ks, service_root))
+        
+        def successful_copies(self):
+            return self.queue.successful_copies
+        
+        def start(self):
+            for worker in self.workers:
+                worker.start()
+    
+    
+    class KeepWriterThreadNew(threading.Thread):
+        def __init__(self, queue, data, data_hash):
+            super(KeepClient.KeepWriterThreadNew, self).__init__()
+            self.queue = queue
+            self.data = data
+            self.data_hash = data_hash
+            self.daemon = True
+        
+        def run(self):
+            while not self.queue.empty():
+                if self.queue.pending_copies() > 0:
+                    # Avoid overreplication, wait for some needed retry
+                    with self.queue.retries_notification:
+                        if not self.queue.retries > 0:
+                            self.queue.retries_notification.wait()
+                            continue # try again when awake
+                        self.queue.retries -= 1
+
+                    # Get to work
+                    service, service_root = self.queue.get()
+                    
+                    success = bool(self.service.put(self.data_hash,
+                                                    self.data,
+                                                    timeout=None))
+                    result = service.last_result()
+                    if success:
+                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                                      str(threading.current_thread()),
+                                      self.data_hash,
+                                      len(self.data),
+                                      service_root)
+                        try:
+                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+                        except (KeyError, ValueError):
+                            replicas_stored = 1
+                        self.queue.write_success(replicas_stored)
+                    else:
+                        if result.get('status_code', None):
+                            _logger.debug("Request fail: PUT %s => %s %s",
+                                          self.data_hash,
+                                          result['status_code'],
+                                          result['body'])
+                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
+                else:
+                    # Remove the task from the queue anyways
+                    self.queue.get()
+                # Mark as done so the queue can be join()ed
+                self.queue.task_done()
 
 
     class KeepWriterThread(threading.Thread):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list