[ARVADOS] updated: f8f654db2a5efc85a358dfd126be4ccb1abb322d
git at public.curoverse.com
git at public.curoverse.com
Thu Feb 12 17:10:50 EST 2015
Summary of changes:
sdk/python/arvados/__init__.py | 2 +-
sdk/python/arvados/arvfile.py | 254 ++++++++++++++++++-----------------
sdk/python/arvados/collection.py | 131 ++++++++++++------
sdk/python/arvados/keep.py | 20 ++-
sdk/python/arvados/safeapi.py | 6 +-
sdk/python/tests/test_arvfile.py | 5 +-
sdk/python/tests/test_collections.py | 4 +-
7 files changed, 245 insertions(+), 177 deletions(-)
via f8f654db2a5efc85a358dfd126be4ccb1abb322d (commit)
from 8ae4b9342772ce4693b5961da5e83f54a36ad0e7 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f8f654db2a5efc85a358dfd126be4ccb1abb322d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Feb 12 17:12:57 2015 -0500
4823: Style cleanup on docstrings, renamed some single-letter variables to be
more descriptive.
diff --git a/sdk/python/arvados/__init__.py b/sdk/python/arvados/__init__.py
index 19a7ad8..59a1a44 100644
--- a/sdk/python/arvados/__init__.py
+++ b/sdk/python/arvados/__init__.py
@@ -22,7 +22,7 @@ from api import *
from collection import *
from keep import *
from stream import *
-from arvfile import *
+from arvfile import StreamFileReader
import errors
import util
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index acb02a0..456602f 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -11,12 +11,14 @@ import threading
import Queue
import copy
import errno
+from .errors import KeepWriteError
def split(path):
- """split(path) -> streamname, filename
+ """Separate the stream name and file name in a /-separated stream path and
+ return a tuple (stream_name, file_name).
- Separate the stream name and file name in a /-separated stream path.
If no stream name is available, assume '.'.
+
"""
try:
stream_name, file_name = path.rsplit('/', 1)
@@ -33,11 +35,11 @@ class ArvadosFileBase(object):
@staticmethod
def _before_close(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def before_close_wrapper(self, *args, **kwargs):
if self.closed:
raise ValueError("I/O operation on closed stream file")
return orig_func(self, *args, **kwargs)
- return wrapper
+ return before_close_wrapper
def __enter__(self):
return self
@@ -219,10 +221,10 @@ class StreamFileReader(ArvadosFileReaderBase):
class BufferBlock(object):
- """
- A BufferBlock is a stand-in for a Keep block that is in the process of being
- written. Writers can append to it, get the size, and compute the Keep locator.
+ """A BufferBlock is a stand-in for a Keep block that is in the process of being
+ written.
+ Writers can append to it, get the size, and compute the Keep locator.
There are three valid states:
WRITABLE
@@ -236,7 +238,9 @@ class BufferBlock(object):
released, fetching the block will fetch it via keep client (since we
discarded the internal copy), and identifiers referring to the BufferBlock
can be replaced with the block locator.
+
"""
+
WRITABLE = 0
PENDING = 1
COMMITTED = 2
@@ -251,6 +255,7 @@ class BufferBlock(object):
:owner:
ArvadosFile that owns this block
+
"""
self.blockid = blockid
self.buffer_block = bytearray(starting_capacity)
@@ -261,10 +266,11 @@ class BufferBlock(object):
self.owner = owner
def append(self, data):
- """
- Append some data to the buffer. Only valid if the block is in WRITABLE
- state. Implements an expanding buffer, doubling capacity as needed to
- accomdate all the data.
+ """Append some data to the buffer.
+
+ Only valid if the block is in WRITABLE state. Implements an expanding
+ buffer, doubling capacity as needed to accomdate all the data.
+
"""
if self.state == BufferBlock.WRITABLE:
while (self.write_pointer+len(data)) > len(self.buffer_block):
@@ -279,7 +285,7 @@ class BufferBlock(object):
raise AssertionError("Buffer block is not writable")
def size(self):
- """Amount of data written to the buffer"""
+ """The amount of data written to the buffer."""
return self.write_pointer
def locator(self):
@@ -289,23 +295,12 @@ class BufferBlock(object):
return self._locator
-class AsyncKeepWriteErrors(Exception):
- """
- Roll up one or more Keep write exceptions (generated by background
- threads) into a single one.
- """
- def __init__(self, errors):
- self.errors = errors
-
- def __repr__(self):
- return "\n".join(self.errors)
-
def synchronized(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def synchronized_wrapper(self, *args, **kwargs):
with self.lock:
return orig_func(self, *args, **kwargs)
- return wrapper
+ return synchronized_wrapper
class NoopLock(object):
def __enter__(self):
@@ -326,17 +321,17 @@ SYNC_LIVE = 3
def must_be_writable(orig_func):
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def must_be_writable_wrapper(self, *args, **kwargs):
if self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
return orig_func(self, *args, **kwargs)
- return wrapper
+ return must_be_writable_wrapper
class BlockManager(object):
- """
- BlockManager handles buffer blocks, background block uploads, and
- background block prefetch for a Collection of ArvadosFiles.
+ """BlockManager handles buffer blocks, background block uploads, and background
+ block prefetch for a Collection of ArvadosFiles.
+
"""
def __init__(self, keep):
"""keep: KeepClient object to use"""
@@ -354,8 +349,7 @@ class BlockManager(object):
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
- """
- Allocate a new, empty bufferblock in WRITABLE state and return it.
+ """Allocate a new, empty bufferblock in WRITABLE state and return it.
:blockid:
optional block identifier, otherwise one will be automatically assigned
@@ -365,40 +359,41 @@ class BlockManager(object):
:owner:
ArvadosFile that owns this block
+
"""
if blockid is None:
blockid = "bufferblock%i" % len(self._bufferblocks)
- bb = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
- self._bufferblocks[bb.blockid] = bb
- return bb
+ bufferblock = BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
@synchronized
def dup_block(self, blockid, owner):
- """
- Create a new bufferblock in WRITABLE state, initialized with the content of an existing bufferblock.
+ """Create a new bufferblock in WRITABLE state, initialized with the content of
+ an existing bufferblock.
:blockid:
the block to copy. May be an existing buffer block id.
:owner:
ArvadosFile that owns the new block
+
"""
new_blockid = "bufferblock%i" % len(self._bufferblocks)
block = self._bufferblocks[blockid]
- bb = BufferBlock(new_blockid, len(block), owner)
- bb.append(block)
- self._bufferblocks[bb.blockid] = bb
- return bb
+ bufferblock = BufferBlock(new_blockid, len(block), owner)
+ bufferblock.append(block)
+ self._bufferblocks[bufferblock.blockid] = bufferblock
+ return bufferblock
@synchronized
- def is_bufferblock(self, id):
- return id in self._bufferblocks
+ def is_bufferblock(self, locator):
+ return locator in self._bufferblocks
@synchronized
def stop_threads(self):
- """
- Shut down and wait for background upload and download threads to finish.
- """
+ """Shut down and wait for background upload and download threads to finish."""
+
if self._put_threads is not None:
for t in self._put_threads:
self._put_queue.put(None)
@@ -417,27 +412,28 @@ class BlockManager(object):
self._prefetch_queue = None
def commit_bufferblock(self, block):
+ """Initiate a background upload of a bufferblock.
+
+ This will block if the upload queue is at capacity, otherwise it will
+ return immediately.
+
"""
- Initiate a background upload of a bufferblock. This will block if the
- upload queue is at capacity, otherwise it will return immediately.
- """
- def worker(self):
- """
- Background uploader thread.
- """
+ def commit_bufferblock_worker(self):
+ """Background uploader thread."""
+
while True:
try:
- b = self._put_queue.get()
- if b is None:
+ bufferblock = self._put_queue.get()
+ if bufferblock is None:
return
- b._locator = self._keep.put(b.buffer_view[0:b.write_pointer].tobytes())
- b.state = BufferBlock.COMMITTED
- b.buffer_view = None
- b.buffer_block = None
+ bufferblock._locator = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ bufferblock.state = BufferBlock.COMMITTED
+ bufferblock.buffer_view = None
+ bufferblock.buffer_block = None
except Exception as e:
print e
- self._put_errors.put(e)
+ self._put_errors.put((bufferblock.locator(), e))
finally:
if self._put_queue is not None:
self._put_queue.task_done()
@@ -460,35 +456,41 @@ class BlockManager(object):
self._put_threads = []
for i in xrange(0, self.num_put_threads):
- t = threading.Thread(target=worker, args=(self,))
- self._put_threads.append(t)
- t.daemon = True
- t.start()
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
# Mark the block as PENDING so to disallow any more appends.
block.state = BufferBlock.PENDING
self._put_queue.put(block)
def get_block(self, locator, num_retries, cache_only=False):
- """
- Fetch a block. First checks to see if the locator is a BufferBlock and
- return that, if not, passes the request through to KeepClient.get().
+ """Fetch a block.
+
+ First checks to see if the locator is a BufferBlock and return that, if
+ not, passes the request through to KeepClient.get().
+
"""
with self.lock:
if locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- if bb.state != BufferBlock.COMMITTED:
- return bb.buffer_view[0:bb.write_pointer].tobytes()
+ bufferblock = self._bufferblocks[locator]
+ if bufferblock.state != BufferBlock.COMMITTED:
+ return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
else:
- locator = bb._locator
- return self._keep.get(locator, num_retries=num_retries, cache_only=cache_only)
+ locator = bufferblock._locator
+ if cache_only:
+ return self._keep.get_from_cache(locator)
+ else:
+ return self._keep.get(locator, num_retries=num_retries)
def commit_all(self):
- """
- Commit all outstanding buffer blocks. Unlike commit_bufferblock(), this
- is a synchronous call, and will not return until all buffer blocks are
- uploaded. Raises AsyncKeepWriteErrors() if any blocks failed to
- upload.
+ """Commit all outstanding buffer blocks.
+
+ Unlike commit_bufferblock(), this is a synchronous call, and will not
+ return until all buffer blocks are uploaded. Raises
+ KeepWriteError() if any blocks failed to upload.
+
"""
with self.lock:
items = self._bufferblocks.items()
@@ -501,27 +503,29 @@ class BlockManager(object):
if self._put_queue is not None:
self._put_queue.join()
if not self._put_errors.empty():
- e = []
+ err = []
try:
while True:
- e.append(self._put_errors.get(False))
+ err.append(self._put_errors.get(False))
except Queue.Empty:
pass
- raise AsyncKeepWriteErrors(e)
+ raise KeepWriteError("Error writing some blocks", err)
def block_prefetch(self, locator):
- """
- Initiate a background download of a block. This assumes that the
- underlying KeepClient implements a block cache, so repeated requests
- for the same block will not result in repeated downloads (unless the
- block is evicted from the cache.) This method does not block.
+ """Initiate a background download of a block.
+
+ This assumes that the underlying KeepClient implements a block cache,
+ so repeated requests for the same block will not result in repeated
+ downloads (unless the block is evicted from the cache.) This method
+ does not block.
+
"""
if not self.prefetch_enabled:
return
- def worker(self):
- """Background downloader thread."""
+ def block_prefetch_worker(self):
+ """The background downloader thread."""
while True:
try:
b = self._prefetch_queue.get()
@@ -538,22 +542,26 @@ class BlockManager(object):
self._prefetch_queue = Queue.Queue()
self._prefetch_threads = []
for i in xrange(0, self.num_get_threads):
- t = threading.Thread(target=worker, args=(self,))
- self._prefetch_threads.append(t)
- t.daemon = True
- t.start()
+ thread = threading.Thread(target=block_prefetch_worker, args=(self,))
+ self._prefetch_threads.append(thread)
+ thread.daemon = True
+ thread.start()
self._prefetch_queue.put(locator)
class ArvadosFile(object):
"""ArvadosFile manages the underlying representation of a file in Keep as a
sequence of segments spanning a set of blocks, and implements random
- read/write access. This object may be accessed from multiple threads.
+ read/write access.
+
+ This object may be accessed from multiple threads.
"""
def __init__(self, parent, stream=[], segments=[]):
"""
+ ArvadosFile constructor.
+
:stream:
a list of Range objects representing a block stream
@@ -625,10 +633,12 @@ class ArvadosFile(object):
@must_be_writable
@synchronized
def truncate(self, size):
- """
- Adjust the size of the file. If `size` is less than the size of the file,
- the file contents after `size` will be discarded. If `size` is greater
- than the current size of the file, an IOError will be raised.
+ """Shrink the size of the file.
+
+ If `size` is less than the size of the file, the file contents after
+ `size` will be discarded. If `size` is greater than the current size
+ of the file, an IOError will be raised.
+
"""
if size < self.size():
new_segs = []
@@ -651,9 +661,8 @@ class ArvadosFile(object):
raise IOError("truncate() does not support extending the file size")
def readfrom(self, offset, size, num_retries):
- """
- read upto `size` bytes from the file starting at `offset`.
- """
+ """Read upto `size` bytes from the file starting at `offset`."""
+
with self.lock:
if size == 0 or offset >= self.size():
return ''
@@ -665,19 +674,21 @@ class ArvadosFile(object):
data = []
for lr in readsegs:
- d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
- if d:
- data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ if block:
+ data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
break
return ''.join(data)
def _repack_writes(self):
- """
- Test if the buffer block has more data than is referenced by actual segments
- (this happens when a buffered write over-writes a file range written in
- a previous buffered write). Re-pack the buffer block for efficiency
+ """Test if the buffer block has more data than is referenced by actual
+ segments.
+
+ This happens when a buffered write over-writes a file range written in
+ a previous buffered write. Re-pack the buffer block for efficiency
and to avoid leaking information.
+
"""
segs = self._segments
@@ -699,9 +710,11 @@ class ArvadosFile(object):
@must_be_writable
@synchronized
def writeto(self, offset, data, num_retries):
- """
- Write `data` to the file starting at `offset`. This will update
- existing bytes and/or extend the size of the file as necessary.
+ """Write `data` to the file starting at `offset`.
+
+ This will update existing bytes and/or extend the size of the file as
+ necessary.
+
"""
if len(data) == 0:
return
@@ -730,16 +743,14 @@ class ArvadosFile(object):
@must_be_writable
@synchronized
def add_segment(self, blocks, pos, size):
- """
- Add a segment to the end of the file, with `pos` and `offset` referencing a
+ """Add a segment to the end of the file, with `pos` and `offset` referencing a
section of the stream described by `blocks` (a list of Range objects)
+
"""
self._add_segment(blocks, pos, size)
def _add_segment(self, blocks, pos, size):
- """
- (Internal version.)
- """
+ """Internal implementation of add_segment."""
self._modified = True
for lr in locators_and_ranges(blocks, pos, size):
last = self._segments[-1] if self._segments else Range(0, 0, 0)
@@ -748,7 +759,7 @@ class ArvadosFile(object):
@synchronized
def size(self):
- """Get the file size"""
+ """Get the file size."""
if self._segments:
n = self._segments[-1]
return n.range_start + n.range_size
@@ -756,9 +767,11 @@ class ArvadosFile(object):
return 0
class ArvadosFileReader(ArvadosFileReaderBase):
- """Wraps ArvadosFile in a file-like object supporting reading only. Be aware
- that this class is NOT thread safe as there is no locking around updating file
- pointer.
+ """Wraps ArvadosFile in a file-like object supporting reading only.
+
+ Be aware that this class is NOT thread safe as there is no locking around
+ updating file pointer.
+
"""
def __init__(self, arvadosfile, name, mode="r", num_retries=None):
@@ -771,7 +784,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
@ArvadosFileBase._before_close
@retry_method
def read(self, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position"""
+ """Read up to `size` bytes from the stream, starting at the current file position."""
data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
self._filepos += len(data)
return data
@@ -779,7 +792,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
@ArvadosFileBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
- """Read up to `size` bytes from the stream, starting at the current file position"""
+ """Read up to `size` bytes from the stream, starting at the current file position."""
return self.arvadosfile.readfrom(offset, size, num_retries)
def flush(self):
@@ -788,6 +801,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
class ArvadosFileWriter(ArvadosFileReader):
"""Wraps ArvadosFile in a file-like object supporting both reading and writing.
+
Be aware that this class is NOT thread safe as there is no locking around
updating file pointer.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 82708cc..a64dd34 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -12,7 +12,7 @@ from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, Arv
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
-from .safeapi import SafeApi
+from .safeapi import ThreadSafeApiCache
import config
import errors
import util
@@ -159,11 +159,11 @@ class CollectionReader(CollectionBase):
def _populate_first(orig_func):
# Decorator for methods that read actual Collection data.
@functools.wraps(orig_func)
- def wrapper(self, *args, **kwargs):
+ def populate_first_wrapper(self, *args, **kwargs):
if self._streams is None:
self._populate()
return orig_func(self, *args, **kwargs)
- return wrapper
+ return populate_first_wrapper
@_populate_first
def api_response(self):
@@ -659,10 +659,16 @@ class ResumableCollectionWriter(CollectionWriter):
ADD = "add"
DEL = "del"
MOD = "mod"
+FILE = "file"
+COLLECTION = "collection"
class SynchronizedCollectionBase(CollectionBase):
- """Base class for Collections and Subcollections. Implements the majority of
- functionality relating to accessing items in the Collection."""
+ """Base class for Collections and Subcollections.
+
+ Implements the majority of functionality relating to accessing items in the
+ Collection.
+
+ """
def __init__(self, parent=None):
self.parent = parent
@@ -691,9 +697,13 @@ class SynchronizedCollectionBase(CollectionBase):
raise NotImplementedError()
@synchronized
- def find(self, path, create=False, create_collection=False):
- """Recursively search the specified file path. May return either a Collection
- or ArvadosFile.
+ def _find(self, path, create, create_collection):
+ """Internal method. Don't use this. Use `find()` or `find_or_create()`
+ instead.
+
+ Recursively search the specified file path. May return either a
+ Collection or ArvadosFile. Return None if not found and create=False.
+ Will create a new item if create=True.
:create:
If true, create path components (i.e. Collections) that are
@@ -708,6 +718,7 @@ class SynchronizedCollectionBase(CollectionBase):
component.
"""
+
if create and self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
@@ -738,12 +749,37 @@ class SynchronizedCollectionBase(CollectionBase):
self.notify(ADD, self, p[0], item)
del p[0]
if isinstance(item, SynchronizedCollectionBase):
- return item.find("/".join(p), create=create)
+ return item._find("/".join(p), create, create_collection)
else:
raise errors.ArgumentError("Interior path components must be subcollection")
else:
return self
+ def find(self, path):
+ """Recursively search the specified file path.
+
+ May return either a Collection or ArvadosFile. Return None if not
+ found.
+
+ """
+ return self._find(path, False, False)
+
+ def find_or_create(self, path, create_type):
+ """Recursively search the specified file path.
+
+ May return either a Collection or ArvadosFile. Will create a new item
+ at the specified path if none exists.
+
+ :create_type:
+ One of `arvado.collection.FILE` or
+ `arvado.collection.COLLECTION`. If the path is not found, and value
+ of create_type is FILE then create and return a new ArvadosFile for
+ the last path component. If COLLECTION, then create and return a new
+ Collection for the last path component.
+
+ """
+ return self._find(path, True, (create_type == COLLECTION))
+
def open(self, path, mode):
"""Open a file-like object for access.
@@ -770,7 +806,7 @@ class SynchronizedCollectionBase(CollectionBase):
if create and self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
- f = self.find(path, create=create)
+ f = self._find(path, create, False)
if f is None:
raise IOError((errno.ENOENT, "File not found"))
@@ -798,7 +834,7 @@ class SynchronizedCollectionBase(CollectionBase):
@synchronized
def set_unmodified(self):
- """Recursively clear modified flag"""
+ """Recursively clear modified flag."""
self._modified = False
for k,v in self._items.items():
v.set_unmodified()
@@ -806,7 +842,7 @@ class SynchronizedCollectionBase(CollectionBase):
@synchronized
def __iter__(self):
"""Iterate over names of files and collections contained in this collection."""
- return self._items.keys().__iter__()
+ return iter(self._items.keys())
@synchronized
def iterkeys(self):
@@ -823,12 +859,12 @@ class SynchronizedCollectionBase(CollectionBase):
@synchronized
def __contains__(self, k):
"""If there is a file or collection a directly contained by this collection
- with name "k"."""
+ with name `k`."""
return k in self._items
@synchronized
def __len__(self):
- """Get the number of items directly contained in this collection"""
+ """Get the number of items directly contained in this collection."""
return len(self._items)
@must_be_writable
@@ -855,14 +891,15 @@ class SynchronizedCollectionBase(CollectionBase):
return self._items.items()
def exists(self, path):
- """Test if there is a file or collection at "path" """
+ """Test if there is a file or collection at `path`."""
return self.find(path) != None
@must_be_writable
@synchronized
- def remove(self, path, rm_r=False):
+ def remove(self, path, recursive=False):
"""Remove the file or subcollection (directory) at `path`.
- :rm_r:
+
+ :recursive:
Specify whether to remove non-empty subcollections (True), or raise an error (False).
"""
p = path.split("/")
@@ -875,7 +912,7 @@ class SynchronizedCollectionBase(CollectionBase):
if item is None:
raise IOError((errno.ENOENT, "File not found"))
if len(p) == 1:
- if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
+ if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive:
raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
d = self._items[p[0]]
del self._items[p[0]]
@@ -936,7 +973,7 @@ class SynchronizedCollectionBase(CollectionBase):
if not target_name:
raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
- target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
+ target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION)
with target_dir.lock:
if target_name in target_dir:
@@ -1008,9 +1045,11 @@ class SynchronizedCollectionBase(CollectionBase):
@must_be_writable
@synchronized
def apply(self, changes):
- """
- Apply changes from `diff`. If a change conflicts with a local change, it
- will be saved to an alternate path indicating the conflict.
+ """Apply changes from `diff`.
+
+ If a change conflicts with a local change, it will be saved to an
+ alternate path indicating the conflict.
+
"""
for c in changes:
path = c[1]
@@ -1044,7 +1083,7 @@ class SynchronizedCollectionBase(CollectionBase):
elif c[0] == DEL:
if local == initial:
# Local item matches "initial" value, so it is safe to remove.
- self.remove(path, rm_r=True)
+ self.remove(path, recursive=True)
# else, the file is modified or already removed, in either
# case we don't want to try to remove it.
@@ -1115,7 +1154,9 @@ class CollectionRoot(SynchronizedCollectionBase):
num_retries=None,
block_manager=None,
sync=None):
- """:manifest_locator_or_text:
+ """CollectionRoot constructor.
+
+ :manifest_locator_or_text:
One of Arvados collection UUID, block locator of
a manifest, raw manifest text, or None (to create an empty collection).
:parent:
@@ -1210,6 +1251,10 @@ class CollectionRoot(SynchronizedCollectionBase):
@synchronized
@retry_method
def update(self, other=None, num_retries=None):
+ """Fetch the latest collection record on the API server and merge it with the
+ current collection contents.
+
+ """
if other is None:
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
@@ -1221,7 +1266,7 @@ class CollectionRoot(SynchronizedCollectionBase):
@synchronized
def _my_api(self):
if self._api_client is None:
- self._api_client = arvados.SafeApi(self._config)
+ self._api_client = ThreadSafeApiCache(self._config)
self._keep_client = self._api_client.keep
return self._api_client
@@ -1313,7 +1358,7 @@ class CollectionRoot(SynchronizedCollectionBase):
return self
def __exit__(self, exc_type, exc_value, traceback):
- """Support scoped auto-commit in a with: block"""
+ """Support scoped auto-commit in a with: block."""
if self._sync != SYNC_READONLY and self._has_collection_uuid():
self.save()
if self._block_manager is not None:
@@ -1332,12 +1377,11 @@ class CollectionRoot(SynchronizedCollectionBase):
@synchronized
def api_response(self):
- """
- api_response() -> dict or None
+ """Returns information about this Collection fetched from the API server.
- Returns information about this Collection fetched from the API server.
If the Collection exists in Keep but not the API server, currently
returns None. Future versions may provide a synthetic response.
+
"""
return self._api_response
@@ -1347,9 +1391,11 @@ class CollectionRoot(SynchronizedCollectionBase):
def save(self, merge=True, num_retries=None):
"""Commit pending buffer blocks to Keep, merge with remote record (if
update=True), write the manifest to Keep, and update the collection
- record. Will raise AssertionError if not associated with a collection
- record on the API server. If you want to save a manifest to Keep only,
- see `save_new()`.
+ record.
+
+ Will raise AssertionError if not associated with a collection record on
+ the API server. If you want to save a manifest to Keep only, see
+ `save_new()`.
:update:
Update and merge remote changes before saving. Otherwise, any
@@ -1378,8 +1424,9 @@ class CollectionRoot(SynchronizedCollectionBase):
@retry_method
def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
"""Commit pending buffer blocks to Keep, write the manifest to Keep, and create
- a new collection record (if create_collection_record True). After
- creating a new collection record, this Collection object will be
+ a new collection record (if create_collection_record True).
+
+ After creating a new collection record, this Collection object will be
associated with the new record for `save()` and SYNC_LIVE updates.
:name:
@@ -1452,7 +1499,11 @@ def LiveCollection(*args, **kwargs):
class Subcollection(SynchronizedCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
- server record. It falls under the umbrella of the root collection."""
+ server record.
+
+ It falls under the umbrella of the root collection.
+
+ """
def __init__(self, parent):
super(Subcollection, self).__init__(parent)
@@ -1491,7 +1542,7 @@ def import_manifest(manifest_text,
keep=None,
num_retries=None,
sync=SYNC_READONLY):
- """Import a manifest into a `Collection`.
+ """Import a manifest into a `CollectionRoot`.
:manifest_text:
The manifest text to import from.
@@ -1557,7 +1608,7 @@ def import_manifest(manifest_text,
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- f = c.find("%s/%s" % (stream_name, name), create=True)
+ f = c.find_or_create("%s/%s" % (stream_name, name), FILE)
f.add_segment(blocks, pos, size)
else:
# error!
@@ -1572,9 +1623,10 @@ def import_manifest(manifest_text,
return c
def export_manifest(item, stream_name=".", portable_locators=False):
- """
+ """Export a manifest from the contents of a SynchronizedCollectionBase.
+
:item:
- Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
+ Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`). If
`item` is a is a `Collection`, this will also export subcollections.
:stream_name:
@@ -1583,6 +1635,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
:portable_locators:
If True, strip any permission hints on block locators.
If False, use block locators as-is.
+
"""
buf = ""
if isinstance(item, SynchronizedCollectionBase):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index cc7dbd4..f59ec71 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -585,8 +585,16 @@ class KeepClient(object):
else:
return None
+ def get_from_cache(self, loc):
+ """Fetch a block only if is in the cache, otherwise return None."""
+ slot = self.block_cache.get(loc)
+ if slot.ready.is_set():
+ return slot.get()
+ else:
+ return None
+
@retry.retry_method
- def get(self, loc_s, num_retries=None, cache_only=False):
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
@@ -605,21 +613,11 @@ class KeepClient(object):
to fetch data from every available Keep service, along with any
that are named in location hints in the locator. The default value
is set when the KeepClient is initialized.
- * cache_only: If true, return the block data only if already present in
- cache, otherwise return None.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
-
- if cache_only:
- slot = self.block_cache.get(expect_hash)
- if slot.ready.is_set():
- return slot.get()
- else:
- return None
-
slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
diff --git a/sdk/python/arvados/safeapi.py b/sdk/python/arvados/safeapi.py
index 539f1e6..9737da1 100644
--- a/sdk/python/arvados/safeapi.py
+++ b/sdk/python/arvados/safeapi.py
@@ -4,8 +4,8 @@ import keep
import config
import copy
-class SafeApi(object):
- """Threadsafe wrapper for API object. This stores and returns a different api
+class ThreadSafeApiCache(object):
+ """Threadsafe wrapper for API objects. This stores and returns a different api
object per thread, because httplib2 which underlies apiclient is not
threadsafe.
"""
@@ -29,4 +29,4 @@ class SafeApi(object):
try:
return getattr(self.localapi(), name)
except AttributeError:
- return super(SafeApi, self).__getattr__(name)
+ return super(ThreadSafeApiCache, self).__getattr__(name)
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index a1995d8..52de0a3 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -20,7 +20,10 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0, cache_only=False):
+ def get(self, locator, num_retries=0):
+ self.requests.append(locator)
+ return self.blocks.get(locator)
+ def get_from_cache(self, locator):
self.requests.append(locator)
return self.blocks.get(locator)
def put(self, data, num_retries=None):
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 2af3622..5baf30a 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -721,10 +721,10 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
def test_write_three_replicas(self):
client = mock.MagicMock(name='api_client')
self.mock_keep_services(client, status=200, service_type='disk', count=6)
- writer = self.foo_writer(api_client=client, replication=3)
with self.mock_keep(
None, 200, 500, 200, 500, 200, 200,
**{'x-keep-replicas-stored': 1}) as keepmock:
+ writer = self.foo_writer(api_client=client, replication=3)
writer.manifest_text()
self.assertEqual(5, keepmock.call_count)
@@ -851,7 +851,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
with self.assertRaises(IOError):
c.remove("foo")
- c.remove("foo", rm_r=True)
+ c.remove("foo", recursive=True)
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", export_manifest(c))
def test_copy_to_dir1(self):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list