[ARVADOS] created: dea15524cdbe9f791722fc956e939ce593e450b6
Git user
git at public.curoverse.com
Thu Jun 23 18:52:17 EDT 2016
at dea15524cdbe9f791722fc956e939ce593e450b6 (commit)
commit dea15524cdbe9f791722fc956e939ce593e450b6
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Jun 23 19:47:19 2016 -0300
One last method before calling it a day
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e27885e..5d511e4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -606,6 +606,13 @@ class KeepClient(object):
def start(self):
for worker in self.workers:
worker.start()
+
+ def join(self):
+ self.queue.join()
+ with self.queue.retries_notification:
+ self.queue.retries_notification.notify_all()
+ for worker in self.workers:
+ worker.join()
class KeepWriterThreadNew(threading.Thread):
commit 09d1049dab1fcfbdad8332f1d74a091aebfa0e66
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Jun 23 19:38:23 2016 -0300
Ready to start testing code
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 9e9fb00..e27885e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -564,6 +564,98 @@ class KeepClient(object):
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
+
+
+ class KeepWriterQueue(Queue.Queue):
+ def __init__(self, copies):
+ super(KeepClient.KeepWriterQueue, self).__init__()
+ self.wanted_copies = copies
+ self.successful_copies = 0
+ self.successful_copies_lock = threading.Lock()
+ self.retries = copies
+ self.retries_notification = threading.Condition()
+
+ def write_success(self, replicas_nr):
+ with self.successful_copies_lock:
+ self.successful_copies += replicas_nr
+
+ def write_fail(self, ks, status_code):
+ with self.retries_notification:
+ self.retries += 1
+ self.retries_notification.notify()
+
+ def pending_copies(self):
+ return self.wanted_copies - self.successful_copies
+
+
+ class KeepWriterThreadPool(object):
+ def __init__(self, data, data_hash, num_threads, copies=2):
+ self.wanted_copies = copies
+ self.workers = []
+ self.queue = KeepClient.KeepWriterQueue(copies)
+ # Start workers
+ for _ in range(num_threads):
+ self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
+
+ def add_task(self, ks, service_root):
+ self.queue.put((ks, service_root))
+
+ def successful_copies(self):
+ return self.queue.successful_copies
+
+ def start(self):
+ for worker in self.workers:
+ worker.start()
+
+
+ class KeepWriterThreadNew(threading.Thread):
+ def __init__(self, queue, data, data_hash):
+ super(KeepClient.KeepWriterThreadNew, self).__init__()
+ self.queue = queue
+ self.data = data
+ self.data_hash = data_hash
+ self.daemon = True
+
+ def run(self):
+ while not self.queue.empty():
+ if self.queue.pending_copies() > 0:
+ # Avoid overreplication, wait for some needed retry
+ with self.queue.retries_notification:
+ if not self.queue.retries > 0:
+ self.queue.retries_notification.wait()
+ continue # try again when awake
+ self.queue.retries -= 1
+
+ # Get to work
+ service, service_root = self.queue.get()
+
+ success = bool(self.service.put(self.data_hash,
+ self.data,
+ timeout=None))
+ result = service.last_result()
+ if success:
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+ self.queue.write_success(replicas_stored)
+ else:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
+ else:
+ # Remove the task from the queue anyways
+ self.queue.get()
+ # Mark as done so the queue can be join()ed
+ self.queue.task_done()
class KeepWriterThread(threading.Thread):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list