[ARVADOS] updated: f8f654db2a5efc85a358dfd126be4ccb1abb322d

git at public.curoverse.com git at public.curoverse.com
Thu Feb 12 17:10:50 EST 2015


Summary of changes:
 sdk/python/arvados/__init__.py       |   2 +-
 sdk/python/arvados/arvfile.py        | 254 ++++++++++++++++++-----------------
 sdk/python/arvados/collection.py     | 131 ++++++++++++------
 sdk/python/arvados/keep.py           |  20 ++-
 sdk/python/arvados/safeapi.py        |   6 +-
 sdk/python/tests/test_arvfile.py     |   5 +-
 sdk/python/tests/test_collections.py |   4 +-
 7 files changed, 245 insertions(+), 177 deletions(-)

       via  f8f654db2a5efc85a358dfd126be4ccb1abb322d (commit)
      from  8ae4b9342772ce4693b5961da5e83f54a36ad0e7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f8f654db2a5efc85a358dfd126be4ccb1abb322d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 12 17:12:57 2015 -0500

    4823: Style cleanup on docstrings, renamed some single-letter variables to be
    more descriptive.

diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index 19a7ad8..59a1a44 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -22,7 +22,7 @@ from api import *
 from collection import *
 from keep import *
 from stream import *
-from arvfile import *
+from arvfile import StreamFileReader
 import errors
 import util
 
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index acb02a0..456602f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -11,12 +11,14 @@ import threading
 import Queue
 import copy
 import errno
+from .errors import KeepWriteError
 
 def split(path):
-    """split(path) -> streamname, filename
+    """Separate the stream name and file name in a /-separated stream path and
+    return a tuple (stream_name, file_name).
 
-    Separate the stream name and file name in a /-separated stream path.
     If no stream name is available, assume '.'.
+
     """
     try:
         stream_name, file_name = path.rsplit('/', 1)
@@ -33,11 +35,11 @@ class ArvadosFileBase(object):
     @staticmethod
     def _before_close(orig_func):
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def before_close_wrapper(self, *args, **kwargs):
             if self.closed:
                 raise ValueError("I/O operation on closed stream file")
             return orig_func(self, *args, **kwargs)
-        return wrapper
+        return before_close_wrapper
 
     def __enter__(self):
         return self
@@ -219,10 +221,10 @@ class StreamFileReader(ArvadosFileReaderBase):
 
 
 class BufferBlock(object):
-    """
-    A BufferBlock is a stand-in for a Keep block that is in the process of being
-    written.  Writers can append to it, get the size, and compute the Keep locator.
+    """A BufferBlock is a stand-in for a Keep block that is in the process of being
+    written.
 
+    Writers can append to it, get the size, and compute the Keep locator.
     There are three valid states:
 
     WRITABLE
@@ -236,7 +238,9 @@ class BufferBlock(object):
       released, fetching the block will fetch it via keep client (since we
       discarded the internal copy), and identifiers referring to the BufferBlock
       can be replaced with the block locator.
+
     """
+
     WRITABLE = 0
     PENDING = 1
     COMMITTED = 2
@@ -251,6 +255,7 @@ class BufferBlock(object):
 
         :owner:
           ArvadosFile that owns this block
+
         """
         self.blockid = blockid
         self.buffer_block = bytearray(starting_capacity)
@@ -261,10 +266,11 @@ class BufferBlock(object):
         self.owner = owner
 
     def append(self, data):
-        """
-        Append some data to the buffer.  Only valid if the block is in WRITABLE
-        state.  Implements an expanding buffer, doubling capacity as needed to
-        accomdate all the data.
+        """Append some data to the buffer.
+
+        Only valid if the block is in WRITABLE state.  Implements an expanding
+        buffer, doubling capacity as needed to accomdate all the data.
+
         """
         if self.state == BufferBlock.WRITABLE:
             while (self.write_pointer+len(data)) > len(self.buffer_block):
@@ -279,7 +285,7 @@ class BufferBlock(object):
             raise AssertionError("Buffer block is not writable")
 
     def size(self):
-        """Amount of data written to the buffer"""
+        """The amount of data written to the buffer."""
         return self.write_pointer
 
     def locator(self):
@@ -289,23 +295,12 @@ class BufferBlock(object):
         return self._locator
 
 
-class AsyncKeepWriteErrors(Exception):
-    """
-    Roll up one or more Keep write exceptions (generated by background
-    threads) into a single one.
-    """
-    def __init__(self, errors):
-        self.errors = errors
-
-    def __repr__(self):
-        return "\n".join(self.errors)
-
 def synchronized(orig_func):
     @functools.wraps(orig_func)
-    def wrapper(self, *args, **kwargs):
+    def synchronized_wrapper(self, *args, **kwargs):
         with self.lock:
             return orig_func(self, *args, **kwargs)
-    return wrapper
+    return synchronized_wrapper
 
 class NoopLock(object):
     def __enter__(self):
@@ -326,17 +321,17 @@ SYNC_LIVE = 3
 
 def must_be_writable(orig_func):
     @functools.wraps(orig_func)
-    def wrapper(self, *args, **kwargs):
+    def must_be_writable_wrapper(self, *args, **kwargs):
         if self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
         return orig_func(self, *args, **kwargs)
-    return wrapper
+    return must_be_writable_wrapper
 
 
 class BlockManager(object):
-    """
-    BlockManager handles buffer blocks, background block uploads, and
-    background block prefetch for a Collection of ArvadosFiles.
+    """BlockManager handles buffer blocks, background block uploads, and background
+    block prefetch for a Collection of ArvadosFiles.
+
     """
     def __init__(self, keep):
         """keep: KeepClient object to use"""
@@ -354,8 +349,7 @@ class BlockManager(object):
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
-        """
-        Allocate a new, empty bufferblock in WRITABLE state and return it.
+        """Allocate a new, empty bufferblock in WRITABLE state and return it.
 
         :blockid:
           optional block identifier, otherwise one will be automatically assigned
@@ -365,40 +359,41 @@ class BlockManager(object):
 
         :owner:
           ArvadosFile that owns this block
+
         """
         if blockid is None:
             blockid = "bufferblock%i" % len(self._bufferblocks)
-        bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
-        self._bufferblocks[bb.blockid] = bb
-        return bb
+        bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
+        self._bufferblocks[bufferblock.blockid] = bufferblock
+        return bufferblock
 
     @synchronized
     def dup_block(self, blockid, owner):
-        """
-        Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
+        """Create a new bufferblock in WRITABLE state, initialized with the content of
+        an existing bufferblock.
 
         :blockid:
           the block to copy.  May be an existing buffer block id.
 
         :owner:
           ArvadosFile that owns the new block
+
         """
         new_blockid = "bufferblock%i" % len(self._bufferblocks)
         block = self._bufferblocks[blockid]
-        bb = BufferBlock(new_blockid, len(block), owner)
-        bb.append(block)
-        self._bufferblocks[bb.blockid] = bb
-        return bb
+        bufferblock = BufferBlock(new_blockid, len(block), owner)
+        bufferblock.append(block)
+        self._bufferblocks[bufferblock.blockid] = bufferblock
+        return bufferblock
 
     @synchronized
-    def is_bufferblock(self, id):
-        return id in self._bufferblocks
+    def is_bufferblock(self, locator):
+        return locator in self._bufferblocks
 
     @synchronized
     def stop_threads(self):
-        """
-        Shut down and wait for background upload and download threads to finish.
-        """
+        """Shut down and wait for background upload and download threads to finish."""
+
         if self._put_threads is not None:
             for t in self._put_threads:
                 self._put_queue.put(None)
@@ -417,27 +412,28 @@ class BlockManager(object):
         self._prefetch_queue = None
 
     def commit_bufferblock(self, block):
+        """Initiate a background upload of a bufferblock.
+
+        This will block if the upload queue is at capacity, otherwise it will
+        return immediately.
+
         """
-        Initiate a background upload of a bufferblock.  This will block if the
-        upload queue is at capacity, otherwise it will return immediately.
-        """
 
-        def worker(self):
-            """
-            Background uploader thread.
-            """
+        def commit_bufferblock_worker(self):
+            """Background uploader thread."""
+
             while True:
                 try:
-                    b = self._put_queue.get()
-                    if b is None:
+                    bufferblock = self._put_queue.get()
+                    if bufferblock is None:
                         return
-                    b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
-                    b.state = BufferBlock.COMMITTED
-                    b.buffer_view = None
-                    b.buffer_block = None
+                    bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                    bufferblock.state = BufferBlock.COMMITTED
+                    bufferblock.buffer_view = None
+                    bufferblock.buffer_block = None
                 except Exception as e:
                     print e
-                    self._put_errors.put(e)
+                    self._put_errors.put((bufferblock.locator(), e))
                 finally:
                     if self._put_queue is not None:
                         self._put_queue.task_done()
@@ -460,35 +456,41 @@ class BlockManager(object):
 
                 self._put_threads = []
                 for i in xrange(0, self.num_put_threads):
-                    t = threading.Thread(target=worker, args=(self,))
-                    self._put_threads.append(t)
-                    t.daemon = True
-                    t.start()
+                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
         # Mark the block as PENDING so to disallow any more appends.
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
 
     def get_block(self, locator, num_retries, cache_only=False):
-        """
-        Fetch a block.  First checks to see if the locator is a BufferBlock and
-        return that, if not, passes the request through to KeepClient.get().
+        """Fetch a block.
+
+        First checks to see if the locator is a BufferBlock and return that, if
+        not, passes the request through to KeepClient.get().
+
         """
         with self.lock:
             if locator in self._bufferblocks:
-                bb = self._bufferblocks[locator]
-                if bb.state != BufferBlock.COMMITTED:
-                    return bb.buffer_view[0:bb.write_pointer].tobytes()
+                bufferblock = self._bufferblocks[locator]
+                if bufferblock.state != BufferBlock.COMMITTED:
+                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
                 else:
-                    locator = bb._locator
-        return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
+                    locator = bufferblock._locator
+        if cache_only:
+            return self._keep.get_from_cache(locator)
+        else:
+            return self._keep.get(locator, num_retries=num_retries)
 
     def commit_all(self):
-        """
-        Commit all outstanding buffer blocks.  Unlike commit_bufferblock(), this
-        is a synchronous call, and will not return until all buffer blocks are
-        uploaded.  Raises AsyncKeepWriteErrors() if any blocks failed to
-        upload.
+        """Commit all outstanding buffer blocks.
+
+        Unlike commit_bufferblock(), this is a synchronous call, and will not
+        return until all buffer blocks are uploaded.  Raises
+        KeepWriteError() if any blocks failed to upload.
+
         """
         with self.lock:
             items = self._bufferblocks.items()
@@ -501,27 +503,29 @@ class BlockManager(object):
             if self._put_queue is not None:
                 self._put_queue.join()
                 if not self._put_errors.empty():
-                    e = []
+                    err = []
                     try:
                         while True:
-                            e.append(self._put_errors.get(False))
+                            err.append(self._put_errors.get(False))
                     except Queue.Empty:
                         pass
-                    raise AsyncKeepWriteErrors(e)
+                    raise KeepWriteError("Error writing some blocks", err)
 
     def block_prefetch(self, locator):
-        """
-        Initiate a background download of a block.  This assumes that the
-        underlying KeepClient implements a block cache, so repeated requests
-        for the same block will not result in repeated downloads (unless the
-        block is evicted from the cache.)  This method does not block.
+        """Initiate a background download of a block.
+
+        This assumes that the underlying KeepClient implements a block cache,
+        so repeated requests for the same block will not result in repeated
+        downloads (unless the block is evicted from the cache.)  This method
+        does not block.
+
         """
 
         if not self.prefetch_enabled:
             return
 
-        def worker(self):
-            """Background downloader thread."""
+        def block_prefetch_worker(self):
+            """The background downloader thread."""
             while True:
                 try:
                     b = self._prefetch_queue.get()
@@ -538,22 +542,26 @@ class BlockManager(object):
                 self._prefetch_queue = Queue.Queue()
                 self._prefetch_threads = []
                 for i in xrange(0, self.num_get_threads):
-                    t = threading.Thread(target=worker, args=(self,))
-                    self._prefetch_threads.append(t)
-                    t.daemon = True
-                    t.start()
+                    thread = threading.Thread(target=block_prefetch_worker, args=(self,))
+                    self._prefetch_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
         self._prefetch_queue.put(locator)
 
 
 class ArvadosFile(object):
     """ArvadosFile manages the underlying representation of a file in Keep as a
     sequence of segments spanning a set of blocks, and implements random
-    read/write access.  This object may be accessed from multiple threads.
+    read/write access.
+
+    This object may be accessed from multiple threads.
 
     """
 
     def __init__(self, parent, stream=[], segments=[]):
         """
+        ArvadosFile constructor.
+
         :stream:
           a list of Range objects representing a block stream
 
@@ -625,10 +633,12 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def truncate(self, size):
-        """
-        Adjust the size of the file.  If `size` is less than the size of the file,
-        the file contents after `size` will be discarded.  If `size` is greater
-        than the current size of the file, an IOError will be raised.
+        """Shrink the size of the file.
+
+        If `size` is less than the size of the file, the file contents after
+        `size` will be discarded.  If `size` is greater than the current size
+        of the file, an IOError will be raised.
+
         """
         if size < self.size():
             new_segs = []
@@ -651,9 +661,8 @@ class ArvadosFile(object):
             raise IOError("truncate() does not support extending the file size")
 
     def readfrom(self, offset, size, num_retries):
-        """
-        read upto `size` bytes from the file starting at `offset`.
-        """
+        """Read upto `size` bytes from the file starting at `offset`."""
+
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
@@ -665,19 +674,21 @@ class ArvadosFile(object):
 
         data = []
         for lr in readsegs:
-            d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
-            if d:
-                data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+            if block:
+                data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
             else:
                 break
         return ''.join(data)
 
     def _repack_writes(self):
-        """
-        Test if the buffer block has more data than is referenced by actual segments
-        (this happens when a buffered write over-writes a file range written in
-        a previous buffered write).  Re-pack the buffer block for efficiency
+        """Test if the buffer block has more data than is referenced by actual
+        segments.
+
+        This happens when a buffered write over-writes a file range written in
+        a previous buffered write.  Re-pack the buffer block for efficiency
         and to avoid leaking information.
+
         """
         segs = self._segments
 
@@ -699,9 +710,11 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def writeto(self, offset, data, num_retries):
-        """
-        Write `data` to the file starting at `offset`.  This will update
-        existing bytes and/or extend the size of the file as necessary.
+        """Write `data` to the file starting at `offset`.
+
+        This will update existing bytes and/or extend the size of the file as
+        necessary.
+
         """
         if len(data) == 0:
             return
@@ -730,16 +743,14 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def add_segment(self, blocks, pos, size):
-        """
-        Add a segment to the end of the file, with `pos` and `offset` referencing a
+        """Add a segment to the end of the file, with `pos` and `offset` referencing a
         section of the stream described by `blocks` (a list of Range objects)
+
         """
         self._add_segment(blocks, pos, size)
 
     def _add_segment(self, blocks, pos, size):
-        """
-        (Internal version.)
-        """
+        """Internal implementation of add_segment."""
         self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0)
@@ -748,7 +759,7 @@ class ArvadosFile(object):
 
     @synchronized
     def size(self):
-        """Get the file size"""
+        """Get the file size."""
         if self._segments:
             n = self._segments[-1]
             return n.range_start + n.range_size
@@ -756,9 +767,11 @@ class ArvadosFile(object):
             return 0
 
 class ArvadosFileReader(ArvadosFileReaderBase):
-    """Wraps ArvadosFile in a file-like object supporting reading only.  Be aware
-    that this class is NOT thread safe as there is no locking around updating file
-    pointer.
+    """Wraps ArvadosFile in a file-like object supporting reading only.
+
+    Be aware that this class is NOT thread safe as there is no locking around
+    updating file pointer.
+
     """
 
     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
@@ -771,7 +784,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     @ArvadosFileBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position"""
+        """Read up to `size` bytes from the stream, starting at the current file position."""
         data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
         self._filepos += len(data)
         return data
@@ -779,7 +792,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     @ArvadosFileBase._before_close
     @retry_method
     def readfrom(self, offset, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position"""
+        """Read up to `size` bytes from the stream, starting at the current file position."""
         return self.arvadosfile.readfrom(offset, size, num_retries)
 
     def flush(self):
@@ -788,6 +801,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
 class ArvadosFileWriter(ArvadosFileReader):
     """Wraps ArvadosFile in a file-like object supporting both reading and writing.
+
     Be aware that this class is NOT thread safe as there is no locking around
     updating file pointer.
 
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 82708cc..a64dd34 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -12,7 +12,7 @@ from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, Arv
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
-from .safeapi import SafeApi
+from .safeapi import ThreadSafeApiCache
 import config
 import errors
 import util
@@ -159,11 +159,11 @@ class CollectionReader(CollectionBase):
     def _populate_first(orig_func):
         # Decorator for methods that read actual Collection data.
         @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
+        def populate_first_wrapper(self, *args, **kwargs):
             if self._streams is None:
                 self._populate()
             return orig_func(self, *args, **kwargs)
-        return wrapper
+        return populate_first_wrapper
 
     @_populate_first
     def api_response(self):
@@ -659,10 +659,16 @@ class ResumableCollectionWriter(CollectionWriter):
 ADD = "add"
 DEL = "del"
 MOD = "mod"
+FILE = "file"
+COLLECTION = "collection"
 
 class SynchronizedCollectionBase(CollectionBase):
-    """Base class for Collections and Subcollections.  Implements the majority of
-    functionality relating to accessing items in the Collection."""
+    """Base class for Collections and Subcollections.
+
+    Implements the majority of functionality relating to accessing items in the
+    Collection.
+
+    """
 
     def __init__(self, parent=None):
         self.parent = parent
@@ -691,9 +697,13 @@ class SynchronizedCollectionBase(CollectionBase):
         raise NotImplementedError()
 
     @synchronized
-    def find(self, path, create=False, create_collection=False):
-        """Recursively search the specified file path.  May return either a Collection
-        or ArvadosFile.
+    def _find(self, path, create, create_collection):
+        """Internal method.  Don't use this.  Use `find()` or `find_or_create()`
+        instead.
+
+        Recursively search the specified file path.  May return either a
+        Collection or ArvadosFile.  Return None if not found and create=False.
+        Will create a new item if create=True.
 
         :create:
           If true, create path components (i.e. Collections) that are
@@ -708,6 +718,7 @@ class SynchronizedCollectionBase(CollectionBase):
           component.
 
         """
+
         if create and self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
@@ -738,12 +749,37 @@ class SynchronizedCollectionBase(CollectionBase):
                     self.notify(ADD, self, p[0], item)
                 del p[0]
                 if isinstance(item, SynchronizedCollectionBase):
-                    return item.find("/".join(p), create=create)
+                    return item._find("/".join(p), create, create_collection)
                 else:
                     raise errors.ArgumentError("Interior path components must be subcollection")
         else:
             return self
 
+    def find(self, path):
+        """Recursively search the specified file path.
+
+        May return either a Collection or ArvadosFile.  Return None if not
+        found.
+
+        """
+        return self._find(path, False, False)
+
+    def find_or_create(self, path, create_type):
+        """Recursively search the specified file path.
+
+        May return either a Collection or ArvadosFile.  Will create a new item
+        at the specified path if none exists.
+
+        :create_type:
+          One of `arvado.collection.FILE` or
+          `arvado.collection.COLLECTION`.  If the path is not found, and value
+          of create_type is FILE then create and return a new ArvadosFile for
+          the last path component.  If COLLECTION, then create and return a new
+          Collection for the last path component.
+
+        """
+        return self._find(path, True, (create_type == COLLECTION))
+
     def open(self, path, mode):
         """Open a file-like object for access.
 
@@ -770,7 +806,7 @@ class SynchronizedCollectionBase(CollectionBase):
         if create and self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
-        f = self.find(path, create=create)
+        f = self._find(path, create, False)
 
         if f is None:
             raise IOError((errno.ENOENT, "File not found"))
@@ -798,7 +834,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
     @synchronized
     def set_unmodified(self):
-        """Recursively clear modified flag"""
+        """Recursively clear modified flag."""
         self._modified = False
         for k,v in self._items.items():
             v.set_unmodified()
@@ -806,7 +842,7 @@ class SynchronizedCollectionBase(CollectionBase):
     @synchronized
     def __iter__(self):
         """Iterate over names of files and collections contained in this collection."""
-        return self._items.keys().__iter__()
+        return iter(self._items.keys())
 
     @synchronized
     def iterkeys(self):
@@ -823,12 +859,12 @@ class SynchronizedCollectionBase(CollectionBase):
     @synchronized
     def __contains__(self, k):
         """If there is a file or collection a directly contained by this collection
-        with name "k"."""
+        with name `k`."""
         return k in self._items
 
     @synchronized
     def __len__(self):
-        """Get the number of items directly contained in this collection"""
+        """Get the number of items directly contained in this collection."""
         return len(self._items)
 
     @must_be_writable
@@ -855,14 +891,15 @@ class SynchronizedCollectionBase(CollectionBase):
         return self._items.items()
 
     def exists(self, path):
-        """Test if there is a file or collection at "path" """
+        """Test if there is a file or collection at `path`."""
         return self.find(path) != None
 
     @must_be_writable
     @synchronized
-    def remove(self, path, rm_r=False):
+    def remove(self, path, recursive=False):
         """Remove the file or subcollection (directory) at `path`.
-        :rm_r:
+
+        :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
         p = path.split("/")
@@ -875,7 +912,7 @@ class SynchronizedCollectionBase(CollectionBase):
             if item is None:
                 raise IOError((errno.ENOENT, "File not found"))
             if len(p) == 1:
-                if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
+                if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive:
                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
                 d = self._items[p[0]]
                 del self._items[p[0]]
@@ -936,7 +973,7 @@ class SynchronizedCollectionBase(CollectionBase):
         if not target_name:
             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 
-        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
+        target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION)
 
         with target_dir.lock:
             if target_name in target_dir:
@@ -1008,9 +1045,11 @@ class SynchronizedCollectionBase(CollectionBase):
     @must_be_writable
     @synchronized
     def apply(self, changes):
-        """
-        Apply changes from `diff`.  If a change conflicts with a local change, it
-        will be saved to an alternate path indicating the conflict.
+        """Apply changes from `diff`.
+
+        If a change conflicts with a local change, it will be saved to an
+        alternate path indicating the conflict.
+
         """
         for c in changes:
             path = c[1]
@@ -1044,7 +1083,7 @@ class SynchronizedCollectionBase(CollectionBase):
             elif c[0] == DEL:
                 if local == initial:
                     # Local item matches "initial" value, so it is safe to remove.
-                    self.remove(path, rm_r=True)
+                    self.remove(path, recursive=True)
                 # else, the file is modified or already removed, in either
                 # case we don't want to try to remove it.
 
@@ -1115,7 +1154,9 @@ class CollectionRoot(SynchronizedCollectionBase):
                  num_retries=None,
                  block_manager=None,
                  sync=None):
-        """:manifest_locator_or_text:
+        """CollectionRoot constructor.
+
+        :manifest_locator_or_text:
           One of Arvados collection UUID, block locator of
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
@@ -1210,6 +1251,10 @@ class CollectionRoot(SynchronizedCollectionBase):
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
+        """Fetch the latest collection record on the API server and merge it with the
+        current collection contents.
+
+        """
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
@@ -1221,7 +1266,7 @@ class CollectionRoot(SynchronizedCollectionBase):
     @synchronized
     def _my_api(self):
         if self._api_client is None:
-            self._api_client = arvados.SafeApi(self._config)
+            self._api_client = ThreadSafeApiCache(self._config)
             self._keep_client = self._api_client.keep
         return self._api_client
 
@@ -1313,7 +1358,7 @@ class CollectionRoot(SynchronizedCollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        """Support scoped auto-commit in a with: block"""
+        """Support scoped auto-commit in a with: block."""
         if self._sync != SYNC_READONLY and self._has_collection_uuid():
             self.save()
         if self._block_manager is not None:
@@ -1332,12 +1377,11 @@ class CollectionRoot(SynchronizedCollectionBase):
 
     @synchronized
     def api_response(self):
-        """
-        api_response() -> dict or None
+        """Returns information about this Collection fetched from the API server.
 
-        Returns information about this Collection fetched from the API server.
         If the Collection exists in Keep but not the API server, currently
         returns None.  Future versions may provide a synthetic response.
+
         """
         return self._api_response
 
@@ -1347,9 +1391,11 @@ class CollectionRoot(SynchronizedCollectionBase):
     def save(self, merge=True, num_retries=None):
         """Commit pending buffer blocks to Keep, merge with remote record (if
         update=True), write the manifest to Keep, and update the collection
-        record.  Will raise AssertionError if not associated with a collection
-        record on the API server.  If you want to save a manifest to Keep only,
-        see `save_new()`.
+        record.
+
+        Will raise AssertionError if not associated with a collection record on
+        the API server.  If you want to save a manifest to Keep only, see
+        `save_new()`.
 
         :update:
           Update and merge remote changes before saving.  Otherwise, any
@@ -1378,8 +1424,9 @@ class CollectionRoot(SynchronizedCollectionBase):
     @retry_method
     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
         """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
-        a new collection record (if create_collection_record True).  After
-        creating a new collection record, this Collection object will be
+        a new collection record (if create_collection_record True).
+
+        After creating a new collection record, this Collection object will be
         associated with the new record for `save()` and SYNC_LIVE updates.
 
         :name:
@@ -1452,7 +1499,11 @@ def LiveCollection(*args, **kwargs):
 
 class Subcollection(SynchronizedCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
-    server record.  It falls under the umbrella of the root collection."""
+    server record.
+
+    It falls under the umbrella of the root collection.
+
+    """
 
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
@@ -1491,7 +1542,7 @@ def import_manifest(manifest_text,
                     keep=None,
                     num_retries=None,
                     sync=SYNC_READONLY):
-    """Import a manifest into a `Collection`.
+    """Import a manifest into a `CollectionRoot`.
 
     :manifest_text:
       The manifest text to import from.
@@ -1557,7 +1608,7 @@ def import_manifest(manifest_text,
                 pos = long(s.group(1))
                 size = long(s.group(2))
                 name = s.group(3).replace('\\040', ' ')
-                f = c.find("%s/%s" % (stream_name, name), create=True)
+                f = c.find_or_create("%s/%s" % (stream_name, name), FILE)
                 f.add_segment(blocks, pos, size)
             else:
                 # error!
@@ -1572,9 +1623,10 @@ def import_manifest(manifest_text,
     return c
 
 def export_manifest(item, stream_name=".", portable_locators=False):
-    """
+    """Export a manifest from the contents of a SynchronizedCollectionBase.
+
     :item:
-      Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
+      Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`).  If
       `item` is a is a `Collection`, this will also export subcollections.
 
     :stream_name:
@@ -1583,6 +1635,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
     :portable_locators:
       If True, strip any permission hints on block locators.
       If False, use block locators as-is.
+
     """
     buf = ""
     if isinstance(item, SynchronizedCollectionBase):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index cc7dbd4..f59ec71 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -585,8 +585,16 @@ class KeepClient(object):
         else:
             return None
 
+    def get_from_cache(self, loc):
+        """Fetch a block only if is in the cache, otherwise return None."""
+        slot = self.block_cache.get(loc)
+        if slot.ready.is_set():
+            return slot.get()
+        else:
+            return None
+
     @retry.retry_method
-    def get(self, loc_s, num_retries=None, cache_only=False):
+    def get(self, loc_s, num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -605,21 +613,11 @@ class KeepClient(object):
           to fetch data from every available Keep service, along with any
           that are named in location hints in the locator.  The default value
           is set when the KeepClient is initialized.
-        * cache_only: If true, return the block data only if already present in
-          cache, otherwise return None.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
-
-        if cache_only:
-            slot = self.block_cache.get(expect_hash)
-            if slot.ready.is_set():
-                return slot.get()
-            else:
-                return None
-
         slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
diff --git a/sdk/python/arvados/safeapi.py b/sdk/python/arvados/safeapi.py
index 539f1e6..9737da1 100644
--- a/sdk/python/arvados/safeapi.py
+++ b/sdk/python/arvados/safeapi.py
@@ -4,8 +4,8 @@ import keep
 import config
 import copy
 
-class SafeApi(object):
-    """Threadsafe wrapper for API object.  This stores and returns a different api
+class ThreadSafeApiCache(object):
+    """Threadsafe wrapper for API objects.  This stores and returns a different api
     object per thread, because httplib2 which underlies apiclient is not
     threadsafe.
     """
@@ -29,4 +29,4 @@ class SafeApi(object):
         try:
             return getattr(self.localapi(), name)
         except AttributeError:
-            return super(SafeApi, self).__getattr__(name)
+            return super(ThreadSafeApiCache, self).__getattr__(name)
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index a1995d8..52de0a3 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -20,7 +20,10 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, blocks):
             self.blocks = blocks
             self.requests = []
-        def get(self, locator, num_retries=0, cache_only=False):
+        def get(self, locator, num_retries=0):
+            self.requests.append(locator)
+            return self.blocks.get(locator)
+        def get_from_cache(self, locator):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def put(self, data, num_retries=None):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 2af3622..5baf30a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -721,10 +721,10 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
     def test_write_three_replicas(self):
         client = mock.MagicMock(name='api_client')
         self.mock_keep_services(client, status=200, service_type='disk', count=6)
-        writer = self.foo_writer(api_client=client, replication=3)
         with self.mock_keep(
                 None, 200, 500, 200, 500, 200, 200,
                 **{'x-keep-replicas-stored': 1}) as keepmock:
+            writer = self.foo_writer(api_client=client, replication=3)
             writer.manifest_text()
             self.assertEqual(5, keepmock.call_count)
 
@@ -851,7 +851,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
         with self.assertRaises(IOError):
             c.remove("foo")
-        c.remove("foo", rm_r=True)
+        c.remove("foo", recursive=True)
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
 
     def test_copy_to_dir1(self):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list