[ARVADOS] updated: 20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4

git at public.curoverse.com git at public.curoverse.com
Thu Jan 1 21:30:02 EST 2015


Summary of changes:
 sdk/python/arvados/arvfile.py    | 12 +++++++-----
 sdk/python/arvados/collection.py | 10 +++++++---
 sdk/python/arvados/keep.py       | 19 +++++++++++++------
 3 files changed, 27 insertions(+), 14 deletions(-)

       via  20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4 (commit)
      from  68cd78ca1a20d80e0ab90d125df305f30b606f85 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 20f65ff9a7c6c6f73b152d122b70fb44ea5a21a4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jan 1 21:31:25 2015 -0500

    3198: KeepClient creates a requests session to re-use connections.  export_manifest correctly strips hints when portable_manifest=True.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 61ba3e0..5c6916e 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -245,6 +245,9 @@ class AsyncKeepWriteErrors(Exception):
     def __init__(self, errors):
         self.errors = errors
 
+    def __repr__(self):
+        return "\n".join(self.errors)
+
 class BlockManager(object):
     def __init__(self, keep):
         self._keep = keep
@@ -299,12 +302,11 @@ class BlockManager(object):
                         self._put_queue.task_done()
 
         if self._put_threads is None:
-            self._put_queue = Queue.Queue()
+            self._put_queue = Queue.Queue(maxsize=2)
             self._put_errors = Queue.Queue()
-            self._put_threads = [threading.Thread(target=worker, args=(self,)),
-                                threading.Thread(target=worker, args=(self,))]
-            self._put_threads[0].start()
-            self._put_threads[1].start()
+            self._put_threads = [threading.Thread(target=worker, args=(self,)), threading.Thread(target=worker, args=(self,))]
+            for t in self._put_threads:
+                t.start()
 
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 5857fb2..b07a643 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -922,14 +922,14 @@ class Collection(CollectionBase):
             self.set_unmodified()
 
     @_populate_first
-    def save_as(self, name, owner_uuid=None):
+    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True))
         body = {"manifest_text": self.manifest_text(strip=False),
                 "name": name}
         if owner_uuid:
             body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
+        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
         self._manifest_locator = self._api_response["uuid"]
         self.set_unmodified()
 
@@ -1002,6 +1002,8 @@ def export_manifest(item, stream_name=".", portable_locators=False):
                 loc = s.locator
                 if loc.startswith("bufferblock"):
                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+                if portable_locators:
+                    loc = KeepLocator(loc).stripped()
                 st.append(LocatorAndRange(loc, locator_block_size(loc),
                                      s.segment_offset, s.range_size))
             stream[k] = st
@@ -1009,13 +1011,15 @@ def export_manifest(item, stream_name=".", portable_locators=False):
             buf += ' '.join(normalize_stream(stream_name, stream))
             buf += "\n"
         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
-            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
+            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
     elif isinstance(item, ArvadosFile):
         st = []
         for s in item._segments:
             loc = s.locator
             if loc.startswith("bufferblock"):
                 loc = item._bufferblocks[loc].calculate_locator()
+            if portable_locators:
+                loc = KeepLocator(loc).stripped()
             st.append(LocatorAndRange(loc, locator_block_size(loc),
                                  s.segment_offset, s.range_size))
         stream[stream_name] = st
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 0c5eac8..7080465 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -58,6 +58,9 @@ class KeepLocator(object):
                              self.permission_hint()] + self.hints
             if s is not None)
 
+    def stripped(self):
+        return "%s+%i" % (self.md5sum, self.size)
+
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
         # must be a hex string of the given length.
@@ -273,10 +276,11 @@ class KeepClient(object):
         HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, session, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
+            self.session = session
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
@@ -299,7 +303,7 @@ class KeepClient(object):
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = requests.get(url.encode('utf-8'),
+                    result = self.session.get(url.encode('utf-8'),
                                           headers=self.get_headers,
                                           timeout=timeout)
             except self.HTTP_ERRORS as e:
@@ -325,7 +329,7 @@ class KeepClient(object):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = requests.put(url.encode('utf-8'),
+                result = self.session.put(url.encode('utf-8'),
                                       data=body,
                                       headers=self.put_headers,
                                       timeout=timeout)
@@ -365,9 +369,10 @@ class KeepClient(object):
         def run_with_limiter(self, limiter):
             if self.service.finished():
                 return
-            _logger.debug("KeepWriterThread %s proceeding %s %s",
+            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
                           str(threading.current_thread()),
                           self.args['data_hash'],
+                          len(self.args['data']),
                           self.args['service_root'])
             self._success = bool(self.service.put(
                 self.args['data_hash'],
@@ -376,9 +381,10 @@ class KeepClient(object):
             status = self.service.last_status()
             if self._success:
                 result = self.service.last_result
-                _logger.debug("KeepWriterThread %s succeeded %s %s",
+                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
+                              len(self.args['data']),
                               self.args['service_root'])
                 # Tick the 'done' counter for the number of replica
                 # reported stored by the server, for the case that
@@ -456,6 +462,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.session = requests.Session()
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -547,7 +554,7 @@ class KeepClient(object):
         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, **headers)
+                roots_map[root] = self.KeepService(root, self.session, **headers)
         return local_roots
 
     @staticmethod

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list