[ARVADOS] updated: 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9

git at public.curoverse.com git at public.curoverse.com
Mon Feb 16 16:47:08 EST 2015


Summary of changes:
 crunch_scripts/split-fastq.py        |  72 +------
 sdk/python/arvados/arvfile.py        |  51 +++--
 sdk/python/arvados/collection.py     | 372 ++++++++++++++++++++---------------
 sdk/python/arvados/ranges.py         |  71 ++++---
 sdk/python/arvados/stream.py         |   2 +-
 sdk/python/tests/test_arvfile.py     | 102 +++++++++-
 sdk/python/tests/test_collections.py |  65 +++++-
 services/fuse/bin/arv-mount          |   4 +-
 8 files changed, 460 insertions(+), 279 deletions(-)

       via  5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9 (commit)
      from  446785a11237f832e9bac540a04e7dd238b45a9e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Feb 16 16:49:18 2015 -0500

    4823: Add new tests for BufferBlock, BlockManager, saving, updating.  Stylistic
    changes on variable naming.  Remove splitting code from splitfastq.

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index 17aabf2..1c7a368 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -16,8 +16,6 @@ inp = arvados.CollectionReader(arvados.getjobparam('reads'))
 
 manifest_list = []
 
-chunking = False #arvados.getjobparam('chunking')
-
 def nextline(reader, start):
     n = -1
     while True:
@@ -31,63 +29,6 @@ def nextline(reader, start):
             start += 128
     return n
 
-# Chunk a fastq into approximately 64 MiB chunks.  Requires that the input data
-# be decompressed ahead of time, such as using decompress-all.py.  Generates a
-# new manifest, but doesn't actually move any data around.  Handles paired
-# reads by ensuring that each chunk of a pair gets the same number of records.
-#
-# This works, but in practice is so slow that potential gains in alignment
-# performance are lost in the prep time, which is why it is currently disabled.
-#
-# A better algorithm would seek to a file position a bit less than the desired
-# chunk size and then scan ahead for the next record, making sure that record
-# was matched by the read pair.
-def splitfastq(p):
-    for i in xrange(0, len(p)):
-        p[i]["start"] = 0
-        p[i]["end"] = 0
-
-    count = 0
-    recordsize = [0, 0]
-
-    global piece
-    finish = False
-    while not finish:
-        for i in xrange(0, len(p)):
-            recordsize[i] = 0
-
-        # read next 4 lines
-        for i in xrange(0, len(p)):
-            for ln in xrange(0, 4):
-                r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
-                if r == -1:
-                    finish = True
-                    break
-                recordsize[i] += (r+1)
-
-        splitnow = finish
-        for i in xrange(0, len(p)):
-            if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
-                splitnow = True
-
-        if splitnow:
-            for i in xrange(0, len(p)):
-                global manifest_list
-                print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
-                manifest = []
-                manifest.extend(["./_" + str(piece)])
-                manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
-                manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
-                manifest_list.append(manifest)
-                p[i]["start"] = p[i]["end"]
-            piece += 1
-        else:
-            for i in xrange(0, len(p)):
-                p[i]["end"] += recordsize[i]
-            count += 1
-            if count % 10000 == 0:
-                print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
-
 prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
 
 # Look for fastq files
@@ -115,14 +56,11 @@ for s in inp.all_streams():
                 p[0]["reader"] = s.files()[name_pieces.group(0)]
 
             if p is not None:
-                if chunking:
-                    splitfastq(p)
-                else:
-                    for i in xrange(0, len(p)):
-                        m = p[i]["reader"].as_manifest().split()
-                        m[0] = "./_" + str(piece)
-                        manifest_list.append(m)
-                    piece += 1
+                for i in xrange(0, len(p)):
+                    m = p[i]["reader"].as_manifest().split()
+                    m[0] = "./_" + str(piece)
+                    manifest_list.append(m)
+                piece += 1
 
 manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
 
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 456602f..0bc70a7 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -11,7 +11,7 @@ import threading
 import Queue
 import copy
 import errno
-from .errors import KeepWriteError
+from .errors import KeepWriteError, AssertionError
 
 def split(path):
     """Separate the stream name and file name in a /-separated stream path and
@@ -26,7 +26,7 @@ def split(path):
         stream_name, file_name = '.', path
     return stream_name, file_name
 
-class ArvadosFileBase(object):
+class FileLikeObjectBase(object):
     def __init__(self, name, mode):
         self.name = name
         self.mode = mode
@@ -55,7 +55,7 @@ class ArvadosFileBase(object):
         self.closed = True
 
 
-class ArvadosFileReaderBase(ArvadosFileBase):
+class ArvadosFileReaderBase(FileLikeObjectBase):
     class _NameAttribute(str):
         # The Python file API provides a plain .name attribute.
         # Older SDK provided a name() method.
@@ -79,7 +79,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
     def decompressed_name(self):
         return re.sub('\.(bz2|gz)$', '', self.name)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     def seek(self, pos, whence=os.SEEK_CUR):
         if whence == os.SEEK_CUR:
             pos += self._filepos
@@ -90,7 +90,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
     def tell(self):
         return self._filepos
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readall(self, size=2**20, num_retries=None):
         while True:
@@ -99,7 +99,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
                 break
             yield data
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readline(self, size=float('inf'), num_retries=None):
         cache_pos, cache_data = self._readline_cache
@@ -123,7 +123,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
         for segment in self.readall(size, num_retries):
@@ -131,7 +131,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
             if data:
                 yield data
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readall_decompressed(self, size=2**20, num_retries=None):
         self.seek(0)
@@ -146,7 +146,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
         else:
             return self.readall(size, num_retries=num_retries)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readlines(self, sizehint=float('inf'), num_retries=None):
         data = []
@@ -181,7 +181,7 @@ class StreamFileReader(ArvadosFileReaderBase):
         n = self.segments[-1]
         return n.range_start + n.range_size
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at the current file position"""
@@ -199,7 +199,7 @@ class StreamFileReader(ArvadosFileReaderBase):
         self._filepos += len(data)
         return data
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -381,8 +381,11 @@ class BlockManager(object):
         """
         new_blockid = "bufferblock%i" % len(self._bufferblocks)
         block = self._bufferblocks[blockid]
-        bufferblock = BufferBlock(new_blockid, len(block), owner)
-        bufferblock.append(block)
+        if block.state != BufferBlock.WRITABLE:
+            raise AssertionError("Can only duplicate a writable buffer block")
+
+        bufferblock = BufferBlock(new_blockid, block.size(), owner)
+        bufferblock.append(block.buffer_view[0:block.size()])
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
 
@@ -465,7 +468,11 @@ class BlockManager(object):
         block.state = BufferBlock.PENDING
         self._put_queue.put(block)
 
-    def get_block(self, locator, num_retries, cache_only=False):
+    def get_bufferblock(self, locator):
+        with self.lock:
+            return self._bufferblocks.get(locator)
+
+    def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
 
         First checks to see if the locator is a BufferBlock and return that, if
@@ -593,7 +600,11 @@ class ArvadosFile(object):
             new_loc = r.locator
             if self.parent._my_block_manager().is_bufferblock(r.locator):
                 if r.locator not in map_loc:
-                    map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
+                    bufferblock = get_bufferblock(r.locator)
+                    if bufferblock.state == BufferBlock.COMITTED:
+                        map_loc[r.locator] = bufferblock.locator()
+                    else:
+                        map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp)
                 new_loc = map_loc[r.locator]
 
             cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
@@ -674,7 +685,7 @@ class ArvadosFile(object):
 
         data = []
         for lr in readsegs:
-            block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
             if block:
                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
             else:
@@ -781,7 +792,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     def size(self):
         return self.arvadosfile.size()
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to `size` bytes from the stream, starting at the current file position."""
@@ -789,7 +800,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
         self._filepos += len(data)
         return data
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def readfrom(self, offset, size, num_retries=None):
         """Read up to `size` bytes from the stream, starting at the current file position."""
@@ -810,7 +821,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, name, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def write(self, data, num_retries=None):
         if self.mode[0] == "a":
@@ -819,7 +830,7 @@ class ArvadosFileWriter(ArvadosFileReader):
             self.arvadosfile.writeto(self._filepos, data, num_retries)
             self._filepos += len(data)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a64dd34..33af0c2 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -8,7 +8,7 @@ import time
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -244,7 +244,7 @@ class CollectionReader(CollectionBase):
             return self._manifest_text
 
 
-class _WriterFile(ArvadosFileBase):
+class _WriterFile(FileLikeObjectBase):
     def __init__(self, coll_writer, name):
         super(_WriterFile, self).__init__(name, 'wb')
         self.dest = coll_writer
@@ -253,16 +253,16 @@ class _WriterFile(ArvadosFileBase):
         super(_WriterFile, self).close()
         self.dest.finish_current_file()
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     def write(self, data):
         self.dest.write(data)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     def writelines(self, seq):
         for data in seq:
             self.write(data)
 
-    @ArvadosFileBase._before_close
+    @FileLikeObjectBase._before_close
     def flush(self):
         self.dest.flush_data()
 
@@ -696,65 +696,62 @@ class SynchronizedCollectionBase(CollectionBase):
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
+    @must_be_writable
     @synchronized
-    def _find(self, path, create, create_collection):
-        """Internal method.  Don't use this.  Use `find()` or `find_or_create()`
-        instead.
-
-        Recursively search the specified file path.  May return either a
-        Collection or ArvadosFile.  Return None if not found and create=False.
-        Will create a new item if create=True.
-
-        :create:
-          If true, create path components (i.e. Collections) that are
-          missing.  If "create" is False, return None if a path component is
-          not found.
-
-        :create_collection:
-          If the path is not found, "create" is True, and
-          "create_collection" is False, then create and return a new
-          ArvadosFile for the last path component.  If "create_collection" is
-          True, then create and return a new Collection for the last path
-          component.
+    def find_or_create(self, path, create_type):
+        """Recursively search the specified file path.
+
+        May return either a `Collection` or `ArvadosFile`.  If not found, will
+        create a new item at the specified path based on `create_type`.  Will
+        create intermediate subcollections needed to contain the final item in
+        the path.
+
+        :create_type:
+          One of `arvado.collection.FILE` or
+          `arvado.collection.COLLECTION`.  If the path is not found, and value
+          of create_type is FILE then create and return a new ArvadosFile for
+          the last path component.  If COLLECTION, then create and return a new
+          Collection for the last path component.
 
         """
 
-        if create and self.sync_mode() == SYNC_READONLY:
+        if self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
-        p = path.split("/")
-        if p[0] == '.':
-            del p[0]
+        pathcomponents = path.split("/")
+        if pathcomponents[0] == '.':
+            del pathcomponents[0]
 
-        if p and p[0]:
-            item = self._items.get(p[0])
-            if len(p) == 1:
+        if pathcomponents and pathcomponents[0]:
+            item = self._items.get(pathcomponents[0])
+            if len(pathcomponents) == 1:
                 # item must be a file
-                if item is None and create:
+                if item is None:
                     # create new file
-                    if create_collection:
+                    if create_type == COLLECTION:
                         item = Subcollection(self)
                     else:
                         item = ArvadosFile(self)
-                    self._items[p[0]] = item
+                    self._items[pathcomponents[0]] = item
                     self._modified = True
-                    self.notify(ADD, self, p[0], item)
+                    self.notify(ADD, self, pathcomponents[0], item)
                 return item
             else:
-                if item is None and create:
+                if item is None:
                     # create new collection
                     item = Subcollection(self)
-                    self._items[p[0]] = item
+                    self._items[pathcomponents[0]] = item
                     self._modified = True
-                    self.notify(ADD, self, p[0], item)
-                del p[0]
+                    self.notify(ADD, self, pathcomponents[0], item)
+                del pathcomponents[0]
                 if isinstance(item, SynchronizedCollectionBase):
-                    return item._find("/".join(p), create, create_collection)
+                    return item.find_or_create("/".join(pathcomponents), create_type)
                 else:
                     raise errors.ArgumentError("Interior path components must be subcollection")
         else:
             return self
 
+    @synchronized
     def find(self, path):
         """Recursively search the specified file path.
 
@@ -762,23 +759,32 @@ class SynchronizedCollectionBase(CollectionBase):
         found.
 
         """
-        return self._find(path, False, False)
+        pathcomponents = path.split("/")
+        if pathcomponents[0] == '.':
+            del pathcomponents[0]
 
-    def find_or_create(self, path, create_type):
-        """Recursively search the specified file path.
+        if pathcomponents and pathcomponents[0]:
+            item = self._items.get(pathcomponents[0])
+            if len(pathcomponents) == 1:
+                # item must be a file
+                return item
+            else:
+                del pathcomponents[0]
+                if isinstance(item, SynchronizedCollectionBase):
+                    return item.find("/".join(pathcomponents))
+                else:
+                    raise errors.ArgumentError("Interior path components must be subcollection")
+        else:
+            return self
 
-        May return either a Collection or ArvadosFile.  Will create a new item
-        at the specified path if none exists.
+    def mkdirs(path):
+        """Recursive subcollection create.
 
-        :create_type:
-          One of `arvado.collection.FILE` or
-          `arvado.collection.COLLECTION`.  If the path is not found, and value
-          of create_type is FILE then create and return a new ArvadosFile for
-          the last path component.  If COLLECTION, then create and return a new
-          Collection for the last path component.
+        Like `os.mkdirs()`.  Will create intermediate subcollections needed to
+        contain the leaf subcollection path.
 
         """
-        return self._find(path, True, (create_type == COLLECTION))
+        return self.find_or_create(path, COLLECTION)
 
     def open(self, path, mode):
         """Open a file-like object for access.
@@ -806,20 +812,23 @@ class SynchronizedCollectionBase(CollectionBase):
         if create and self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
-        f = self._find(path, create, False)
+        if create:
+            arvfile = self.find_or_create(path, FILE)
+        else:
+            arvfile = self.find(path)
 
-        if f is None:
+        if arvfile is None:
             raise IOError((errno.ENOENT, "File not found"))
-        if not isinstance(f, ArvadosFile):
+        if not isinstance(arvfile, ArvadosFile):
             raise IOError((errno.EISDIR, "Path must refer to a file."))
 
         if mode[0] == "w":
-            f.truncate(0)
+            arvfile.truncate(0)
 
         if mode == "r":
-            return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries)
         else:
-            return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries)
 
     @synchronized
     def modified(self):
@@ -902,25 +911,25 @@ class SynchronizedCollectionBase(CollectionBase):
         :recursive:
           Specify whether to remove non-empty subcollections (True), or raise an error (False).
         """
-        p = path.split("/")
-        if p[0] == '.':
+        pathcomponents = path.split("/")
+        if pathcomponents[0] == '.':
             # Remove '.' from the front of the path
-            del p[0]
+            del pathcomponents[0]
 
-        if len(p) > 0:
-            item = self._items.get(p[0])
+        if len(pathcomponents) > 0:
+            item = self._items.get(pathcomponents[0])
             if item is None:
                 raise IOError((errno.ENOENT, "File not found"))
-            if len(p) == 1:
-                if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive:
+            if len(pathcomponents) == 1:
+                if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
-                d = self._items[p[0]]
-                del self._items[p[0]]
+                deleteditem = self._items[pathcomponents[0]]
+                del self._items[pathcomponents[0]]
                 self._modified = True
-                self.notify(DEL, self, p[0], d)
+                self.notify(DEL, self, pathcomponents[0], deleteditem)
             else:
-                del p[0]
-                item.remove("/".join(p))
+                del pathcomponents[0]
+                item.remove("/".join(pathcomponents))
         else:
             raise IOError((errno.ENOENT, "File not found"))
 
@@ -959,41 +968,41 @@ class SynchronizedCollectionBase(CollectionBase):
             source_obj = source_collection.find(source)
             if source_obj is None:
                 raise IOError((errno.ENOENT, "File not found"))
-            sp = source.split("/")
+            sourcecomponents = source.split("/")
         else:
             source_obj = source
-            sp = None
+            sourcecomponents = None
 
         # Find parent collection the target path
-        tp = target_path.split("/")
+        targetcomponents = target_path.split("/")
 
         # Determine the name to use.
-        target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
+        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
 
         if not target_name:
             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 
-        target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION)
+        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 
         with target_dir.lock:
             if target_name in target_dir:
-                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
+                if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
                     target_dir = target_dir[target_name]
-                    target_name = sp[-1]
+                    target_name = sourcecomponents[-1]
                 elif not overwrite:
                     raise IOError((errno.EEXIST, "File already exists"))
 
-            mod = None
+            modified_from = None
             if target_name in target_dir:
-                mod = target_dir[target_name]
+                modified_from = target_dir[target_name]
 
             # Actually make the copy.
             dup = source_obj.clone(target_dir)
             target_dir._items[target_name] = dup
             target_dir._modified = True
 
-        if mod:
-            self.notify(MOD, target_dir, target_name, (mod, dup))
+        if modified_from:
+            self.notify(MOD, target_dir, target_name, (modified_from, dup))
         else:
             self.notify(ADD, target_dir, target_name, dup)
 
@@ -1028,7 +1037,7 @@ class SynchronizedCollectionBase(CollectionBase):
         """
         changes = []
         if holding_collection is None:
-            holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
+            holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
         for k in self:
             if k not in end_collection:
                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
@@ -1051,13 +1060,14 @@ class SynchronizedCollectionBase(CollectionBase):
         alternate path indicating the conflict.
 
         """
-        for c in changes:
-            path = c[1]
-            initial = c[2]
+        for change in changes:
+            event_type = change[0]
+            path = change[1]
+            initial = change[2]
             local = self.find(path)
             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
                                                                     time.gmtime()))
-            if c[0] == ADD:
+            if event_type == ADD:
                 if local is None:
                     # No local file at path, safe to copy over new file
                     self.copy(initial, path)
@@ -1065,22 +1075,23 @@ class SynchronizedCollectionBase(CollectionBase):
                     # There is already local file and it is different:
                     # save change to conflict file.
                     self.copy(initial, conflictpath)
-            elif c[0] == MOD:
+            elif event_type == MOD:
+                final = change[3]
                 if local == initial:
                     # Local matches the "initial" item so it has not
                     # changed locally and is safe to update.
-                    if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
+                    if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
                         # Replace contents of local file with new contents
-                        local.replace_contents(c[3])
+                        local.replace_contents(final)
                     else:
                         # Overwrite path with new item; this can happen if
                         # path was a file and is now a collection or vice versa
-                        self.copy(c[3], path, overwrite=True)
+                        self.copy(final, path, overwrite=True)
                 else:
                     # Local is missing (presumably deleted) or local doesn't
                     # match the "start" value, so save change to conflict file
-                    self.copy(c[3], conflictpath)
-            elif c[0] == DEL:
+                    self.copy(final, conflictpath)
+            elif event_type == DEL:
                 if local == initial:
                     # Local item matches "initial" value, so it is safe to remove.
                     self.remove(path, recursive=True)
@@ -1110,7 +1121,7 @@ class SynchronizedCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
-class CollectionRoot(SynchronizedCollectionBase):
+class Collection(SynchronizedCollectionBase):
     """Represents the root of an Arvados Collection, which may be associated with
     an API server Collection record.
 
@@ -1154,7 +1165,7 @@ class CollectionRoot(SynchronizedCollectionBase):
                  num_retries=None,
                  block_manager=None,
                  sync=None):
-        """CollectionRoot constructor.
+        """Collection constructor.
 
         :manifest_locator_or_text:
           One of Arvados collection UUID, block locator of
@@ -1185,7 +1196,7 @@ class CollectionRoot(SynchronizedCollectionBase):
             background websocket events, on block write, or on file close.
 
         """
-        super(CollectionRoot, self).__init__(parent)
+        super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
@@ -1195,7 +1206,7 @@ class CollectionRoot(SynchronizedCollectionBase):
         else:
             self._config = config.settings()
 
-        self.num_retries = num_retries
+        self.num_retries = num_retries if num_retries is not None else 2
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
@@ -1220,13 +1231,7 @@ class CollectionRoot(SynchronizedCollectionBase):
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
             self._populate()
-
-            if self._sync == SYNC_LIVE:
-                if not self._has_collection_uuid():
-                    raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
-                self.events = events.subscribe(arvados.api(apiconfig=self._config),
-                                               [["object_uuid", "=", self._manifest_locator]],
-                                               self.on_message)
+            self._subscribe_events()
 
 
     def root_collection(self):
@@ -1235,19 +1240,18 @@ class CollectionRoot(SynchronizedCollectionBase):
     def sync_mode(self):
         return self._sync
 
+    def _subscribe_events(self):
+        if self._sync == SYNC_LIVE and self.events is None:
+            if not self._has_collection_uuid():
+                raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
+            self.events = events.subscribe(arvados.api(apiconfig=self._config),
+                                           [["object_uuid", "=", self._manifest_locator]],
+                                           self.on_message)
+
     def on_message(self, event):
         if event.get("object_uuid") == self._manifest_locator:
             self.update()
 
-    @staticmethod
-    def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
-        """Create a new empty Collection with associated collection record."""
-        c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
-        c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
-        if sync == SYNC_LIVE:
-            c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
-        return c
-
     @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
@@ -1258,9 +1262,9 @@ class CollectionRoot(SynchronizedCollectionBase):
         if other is None:
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
-            n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
-            other = import_collection(n["manifest_text"])
-        baseline = import_collection(self._manifest_text)
+            response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            other = import_manifest(response["manifest_text"])
+        baseline = import_manifest(self._manifest_text)
         self.apply(other.diff(baseline))
 
     @synchronized
@@ -1368,12 +1372,12 @@ class CollectionRoot(SynchronizedCollectionBase):
     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
         if new_config is None:
             new_config = self._config
-        c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
+        newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=new_sync)
         if new_sync == SYNC_READONLY:
-            c.lock = NoopLock()
-        c._items = {}
-        self._cloneinto(c)
-        return c
+            newcollection.lock = NoopLock()
+        newcollection._items = {}
+        self._cloneinto(newcollection)
+        return newcollection
 
     @synchronized
     def api_response(self):
@@ -1410,13 +1414,13 @@ class CollectionRoot(SynchronizedCollectionBase):
                 self.update()
             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
 
-            mt = self.manifest_text(strip=False)
+            text = self.manifest_text(strip=False)
             self._api_response = self._my_api().collections().update(
                 uuid=self._manifest_locator,
-                body={'manifest_text': mt}
+                body={'manifest_text': text}
                 ).execute(
                     num_retries=num_retries)
-            self._manifest_text = mt
+            self._manifest_text = text
             self.set_unmodified()
 
     @must_be_writable
@@ -1447,13 +1451,13 @@ class CollectionRoot(SynchronizedCollectionBase):
         """
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
-        mt = self.manifest_text(strip=False)
+        text = self.manifest_text(strip=False)
 
         if create_collection_record:
             if name is None:
                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
 
-            body = {"manifest_text": mt,
+            body = {"manifest_text": text,
                     "name": name}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
@@ -1468,7 +1472,7 @@ class CollectionRoot(SynchronizedCollectionBase):
             if self.events:
                 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 
-        self._manifest_text = mt
+        self._manifest_text = text
         self.set_unmodified()
 
     @synchronized
@@ -1485,17 +1489,75 @@ class CollectionRoot(SynchronizedCollectionBase):
             c(event, collection, name, item)
 
 def ReadOnlyCollection(*args, **kwargs):
+    """Create a read-only collection object from an api collection record locator,
+    a portable data hash of a manifest, or raw manifest text.
+
+    See `Collection` constructor for detailed options.
+
+    """
     kwargs["sync"] = SYNC_READONLY
-    return CollectionRoot(*args, **kwargs)
+    return Collection(*args, **kwargs)
 
 def WritableCollection(*args, **kwargs):
+    """Create a writable collection object from an api collection record locator,
+    a portable data hash of a manifest, or raw manifest text.
+
+    See `Collection` constructor for detailed options.
+
+    """
+
     kwargs["sync"] = SYNC_EXPLICIT
-    return CollectionRoot(*args, **kwargs)
+    return Collection(*args, **kwargs)
 
 def LiveCollection(*args, **kwargs):
+    """Create a writable, live updating collection object representing an existing
+    collection record on the API server.
+
+    See `Collection` constructor for detailed options.
+
+    """
     kwargs["sync"] = SYNC_LIVE
-    return CollectionRoot(*args, **kwargs)
+    return Collection(*args, **kwargs)
+
+def createWritableCollection(name, owner_uuid=None, apiconfig=None):
+    """Create an empty, writable collection object and create an associated api
+    collection record.
+
+    :name:
+      The collection name
+
+    :owner_uuid:
+      The parent project.
+
+    :apiconfig:
+      Optional alternate api configuration to use (to specify alternate API
+      host or token than the default.)
 
+    """
+    newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+    newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+    return newcollection
+
+def createLiveCollection(name, owner_uuid=None, apiconfig=None):
+    """Create an empty, writable, live updating Collection object and create an
+    associated collection record on the API server.
+
+    :name:
+      The collection name
+
+    :owner_uuid:
+      The parent project.
+
+    :apiconfig:
+      Optional alternate api configuration to use (to specify alternate API
+      host or token than the default.)
+
+    """
+    newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+    newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+    newcollection._sync = SYNC_LIVE
+    newcollection._subscribe_events()
+    return newcollection
 
 class Subcollection(SynchronizedCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1542,7 +1604,7 @@ def import_manifest(manifest_text,
                     keep=None,
                     num_retries=None,
                     sync=SYNC_READONLY):
-    """Import a manifest into a `CollectionRoot`.
+    """Import a manifest into a `Collection`.
 
     :manifest_text:
       The manifest text to import from.
@@ -1566,12 +1628,11 @@ def import_manifest(manifest_text,
     if into_collection is not None:
         if len(into_collection) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
-        c = into_collection
     else:
-        c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
+        into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
 
-    save_sync = c.sync_mode()
-    c._sync = None
+    save_sync = into_collection.sync_mode()
+    into_collection._sync = None
 
     STREAM_NAME = 0
     BLOCKS = 1
@@ -1608,7 +1669,7 @@ def import_manifest(manifest_text,
                 pos = long(s.group(1))
                 size = long(s.group(2))
                 name = s.group(3).replace('\\040', ' ')
-                f = c.find_or_create("%s/%s" % (stream_name, name), FILE)
+                f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE)
                 f.add_segment(blocks, pos, size)
             else:
                 # error!
@@ -1618,9 +1679,9 @@ def import_manifest(manifest_text,
             stream_name = None
             state = STREAM_NAME
 
-    c.set_unmodified()
-    c._sync = save_sync
-    return c
+    into_collection.set_unmodified()
+    into_collection._sync = save_sync
+    return into_collection
 
 def export_manifest(item, stream_name=".", portable_locators=False):
     """Export a manifest from the contents of a SynchronizedCollectionBase.
@@ -1641,34 +1702,35 @@ def export_manifest(item, stream_name=".", portable_locators=False):
     if isinstance(item, SynchronizedCollectionBase):
         stream = {}
         sorted_keys = sorted(item.keys())
-        for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
-            v = item[k]
-            st = []
-            for s in v.segments():
-                loc = s.locator
+        for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
+            # Create a stream per file `k`
+            arvfile = item[filename]
+            filestream = []
+            for segment in arvfile.segments():
+                loc = segment.locator
                 if loc.startswith("bufferblock"):
-                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+                    loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator()
                 if portable_locators:
                     loc = KeepLocator(loc).stripped()
-                st.append(LocatorAndRange(loc, locator_block_size(loc),
-                                     s.segment_offset, s.range_size))
-            stream[k] = st
+                filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+                                     segment.segment_offset, segment.range_size))
+            stream[filename] = filestream
         if stream:
             buf += ' '.join(normalize_stream(stream_name, stream))
             buf += "\n"
-        for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
-            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
+        for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
+            buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators)
     elif isinstance(item, ArvadosFile):
-        st = []
-        for s in item.segments:
-            loc = s.locator
+        filestream = []
+        for segment in item.segments:
+            loc = segment.locator
             if loc.startswith("bufferblock"):
                 loc = item._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
-            st.append(LocatorAndRange(loc, locator_block_size(loc),
-                                 s.segment_offset, s.range_size))
-        stream[stream_name] = st
+            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+                                 segment.segment_offset, segment.range_size))
+        stream[stream_name] = filestream
         buf += ' '.join(normalize_stream(stream_name, stream))
         buf += "\n"
     return buf
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index 12941a1..2a08b3b 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -1,3 +1,7 @@
+import logging
+
+_logger = logging.getLogger('arvados.ranges')
+
 class Range(object):
     def __init__(self, locator, range_start, range_size, segment_offset=0):
         self.locator = locator
@@ -14,7 +18,7 @@ class Range(object):
                 self.range_size == other.range_size and
                 self.segment_offset == other.segment_offset)
 
-def first_block(data_locators, range_start, range_size, debug=False):
+def first_block(data_locators, range_start, range_size):
     block_start = 0L
 
     # range_start/block_start is the inclusive lower bound
@@ -26,7 +30,6 @@ def first_block(data_locators, range_start, range_size, debug=False):
     block_size = data_locators[i].range_size
     block_start = data_locators[i].range_start
     block_end = block_start + block_size
-    if debug: print '---'
 
     # perform a binary search for the first block
     # assumes that all of the blocks are contigious, so range_start is guaranteed
@@ -40,7 +43,6 @@ def first_block(data_locators, range_start, range_size, debug=False):
         else:
             hi = i
         i = int((hi + lo) / 2)
-        if debug: print lo, i, hi
         block_size = data_locators[i].range_size
         block_start = data_locators[i].range_start
         block_end = block_start + block_size
@@ -63,13 +65,19 @@ class LocatorAndRange(object):
     def __repr__(self):
         return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
 
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
-    '''
-    Get blocks that are covered by the range
-    data_locators: list of Range objects, assumes that blocks are in order and contigous
-    range_start: start of range
-    range_size: size of range
-    returns list of LocatorAndRange objects
+def locators_and_ranges(data_locators, range_start, range_size):
+    '''Get blocks that are covered by the range and return list of LocatorAndRange
+    objects.
+
+    :data_locators:
+      list of Range objects, assumes that blocks are in order and contigous
+
+    :range_start:
+      start of range
+
+    :range_size:
+      size of range
+
     '''
     if range_size == 0:
         return []
@@ -78,7 +86,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
     range_size = long(range_size)
     range_end = range_start + range_size
 
-    i = first_block(data_locators, range_start, range_size, debug)
+    i = first_block(data_locators, range_start, range_size)
     if i is None:
         return []
 
@@ -87,15 +95,16 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         block_start = dl.range_start
         block_size = dl.range_size
         block_end = block_start + block_size
-        if debug:
-            print dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+        _logger.debug(dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end)
         if range_end <= block_start:
             # range ends before this block starts, so don't look at any more locators
             break
 
         #if range_start >= block_end:
-            # range starts after this block ends, so go to next block
-            # we should always start at the first block due to the binary above, so this test is redundant
+            # Range starts after this block ends, so go to next block.
+            # We should always start at the first block due to the binary
+            # search above, so this test is unnecessary but useful to help
+            # document the algorithm.
             #next
 
         if range_start >= block_start and range_end <= block_end:
@@ -114,23 +123,28 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
         i += 1
     return resp
 
-def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset, debug=False):
+def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset):
     '''
     Replace a file segment range with a new segment.
 
-    data_locators: list of Range objects, assumes that segments are in order and contigous
+    NOTE::
+    data_locators will be updated in place
 
-    new_range_start: start of range to replace in data_locators
+    :data_locators:
+      list of Range objects, assumes that segments are in order and contigous
 
-    new_range_size: size of range to replace in data_locators
+    :new_range_start:
+      start of range to replace in data_locators
 
-    new_locator: locator for new segment to be inserted
+    :new_range_size:
+      size of range to replace in data_locators
 
-    new_segment_offset: segment offset within the locator
+    :new_locator:
+      locator for new segment to be inserted
 
-    debug: print debugging details.
+    :new_segment_offset:
+      segment offset within the locator
 
-    !!! data_locators will be updated in place !!!
     '''
     if new_range_size == 0:
         return
@@ -152,7 +166,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
             data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
         return
 
-    i = first_block(data_locators, new_range_start, new_range_size, debug)
+    i = first_block(data_locators, new_range_start, new_range_size)
     if i is None:
         return
 
@@ -160,15 +174,16 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
         dl = data_locators[i]
         old_segment_start = dl.range_start
         old_segment_end = old_segment_start + dl.range_size
-        if debug:
-            print locator, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end
+        _logger.debug(dl, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end)
         if new_range_end <= old_segment_start:
             # range ends before this segment starts, so don't look at any more locators
             break
 
         #if range_start >= old_segment_end:
-            # range starts after this segment ends, so go to next segment
-            # we should always start at the first segment due to the binary above, so this test is redundant
+            # Range starts after this segment ends, so go to next segment.
+            # We should always start at the first segment due to the binary
+            # search above, so this test is unnecessary but useful to help
+            # document the algorithm.
             #next
 
         if  old_segment_start <= new_range_start and new_range_end <= old_segment_end:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 75b23cc..9cfceb8 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -7,7 +7,7 @@ import functools
 import copy
 
 from .ranges import *
-from .arvfile import ArvadosFileBase, StreamFileReader
+from .arvfile import StreamFileReader
 from arvados.retry import retry_method
 from keep import *
 import config
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 52de0a3..29775a6 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -75,7 +75,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
 
-    def test_append(self):
+    def test_append0(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
                                                  "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"},
@@ -83,22 +83,44 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
                              api_client=api, keep_client=keep) as c:
             writer = c.open("count.txt", "r+")
+            self.assertEqual(writer.size(), 10)
+
             writer.seek(5, os.SEEK_SET)
             self.assertEqual("56789", writer.read(8))
+
             writer.seek(10, os.SEEK_SET)
             writer.write("foo")
             self.assertEqual(writer.size(), 13)
+
             writer.seek(5, os.SEEK_SET)
             self.assertEqual("56789foo", writer.read(8))
 
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
             self.assertEqual(None, keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
+
             c.save_new("test_append")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
             self.assertEqual("foo", keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
 
+
+    def test_append1(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+        c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
+        writer = c.open("count.txt", "a+")
+        self.assertEqual(writer.read(20), "0123456789")
+        writer.seek(0, os.SEEK_SET)
+
+        writer.write("hello")
+        self.assertEqual(writer.read(20), "0123456789hello")
+        writer.seek(0, os.SEEK_SET)
+
+        writer.write("world")
+        self.assertEqual(writer.read(20), "0123456789helloworld")
+
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", export_manifest(c))
+
     def test_write0(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -328,7 +350,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             def block_prefetch(self, loc):
                 pass
 
-            def get_block(self, loc, num_retries=0, cache_only=False):
+            def get_block_contents(self, loc, num_retries=0, cache_only=False):
                 if self.nocache and cache_only:
                     return None
                 return self.blocks[loc]
@@ -432,3 +454,79 @@ class ArvadosFileReadAllDecompressedTestCase(ArvadosFileReadTestCase):
 class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
     def read_for_test(self, reader, byte_count, **kwargs):
         return ''.join(reader.readlines(**kwargs))
+
+class BlockManagerTest(unittest.TestCase):
+    def test_bufferblock_append(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        blockmanager = arvados.arvfile.BlockManager(keep)
+        bufferblock = blockmanager.alloc_bufferblock()
+        bufferblock.append("foo")
+
+        self.assertEqual(bufferblock.size(), 3)
+        self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+        self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+        bufferblock.append("bar")
+
+        self.assertEqual(bufferblock.size(), 6)
+        self.assertEqual(bufferblock.buffer_view[0:6], "foobar")
+        self.assertEqual(bufferblock.locator(), "3858f62230ac3c915f300c664312c63f+6")
+
+        bufferblock.state = arvados.arvfile.BufferBlock.PENDING
+        with self.assertRaises(arvados.errors.AssertionError):
+            bufferblock.append("bar")
+
+    def test_bufferblock_dup(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        blockmanager = arvados.arvfile.BlockManager(keep)
+        bufferblock = blockmanager.alloc_bufferblock()
+        bufferblock.append("foo")
+
+        self.assertEqual(bufferblock.size(), 3)
+        self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+        self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+        bufferblock.state = arvados.arvfile.BufferBlock.PENDING
+
+        bufferblock2 = blockmanager.dup_block(bufferblock.blockid, None)
+        self.assertNotEqual(bufferblock.blockid, bufferblock2.blockid)
+
+        bufferblock2.append("bar")
+
+        self.assertEqual(bufferblock2.size(), 6)
+        self.assertEqual(bufferblock2.buffer_view[0:6], "foobar")
+        self.assertEqual(bufferblock2.locator(), "3858f62230ac3c915f300c664312c63f+6")
+
+        self.assertEqual(bufferblock.size(), 3)
+        self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+        self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+    def test_bufferblock_get(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+        blockmanager = arvados.arvfile.BlockManager(keep)
+        bufferblock = blockmanager.alloc_bufferblock()
+        bufferblock.append("foo")
+
+        self.assertEqual(blockmanager.get_block_contents("781e5e245d69b566979b86e28d23f2c7+10", 1), "0123456789")
+        self.assertEqual(blockmanager.get_block_contents(bufferblock.blockid, 1), "foo")
+
+    def test_bufferblock_commit(self):
+        mockkeep = mock.MagicMock()
+        blockmanager = arvados.arvfile.BlockManager(mockkeep)
+        bufferblock = blockmanager.alloc_bufferblock()
+        bufferblock.append("foo")
+        blockmanager.commit_all()
+        self.assertTrue(mockkeep.put.called)
+        self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.COMMITTED)
+        self.assertIsNone(bufferblock.buffer_view)
+
+
+    def test_bufferblock_commit_with_error(self):
+        mockkeep = mock.MagicMock()
+        mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
+        blockmanager = arvados.arvfile.BlockManager(mockkeep)
+        bufferblock = blockmanager.alloc_bufferblock()
+        bufferblock.append("foo")
+        with self.assertRaises(arvados.errors.KeepWriteError) as err:
+            blockmanager.commit_all()
+        self.assertEquals(str(err.exception), "Error writing some blocks: acbd18db4cc2f85cedef654fccc4a4d8+3 raised KeepWriteError (fail)")
+        self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.PENDING)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fbec8b2..5e50357 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -816,12 +816,12 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
 
 
 class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
-    def test_import_manifest(self):
+    def test_import_export_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
 . 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
 """
-        self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.ReadOnlyCollection(m1)))
+        self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.import_manifest(m1)))
 
     def test_init_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
@@ -1023,10 +1023,67 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
                                  c1.manifest_text()))
 
     def test_notify1(self):
-        c1 = arvados.WritableCollection(sync=SYNC_EXPLICIT)
+        c1 = arvados.WritableCollection()
         events = []
         c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
-        c1.find("")
+        f = c1.open("foo.txt", "w")
+        self.assertEqual(events[0], (arvados.collection.ADD, c1, "foo.txt", f.arvadosfile))
+
+    def test_open_w(self):
+        c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
+        self.assertEqual(c1["count.txt"].size(), 10)
+        c1.open("count.txt", "w").close()
+        self.assertEqual(c1["count.txt"].size(), 0)
+
+
+class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+
+    def test_create_and_save(self):
+        c = arvados.collection.createWritableCollection("hello world")
+        self.assertEquals(c.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
+        self.assertEquals(c.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
+
+        with c.open("count.txt", "w") as f:
+            f.write("0123456789")
+
+        self.assertEquals(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
+
+        c.save()
+
+        c2 = arvados.api().collections().get(uuid=c._manifest_locator).execute()
+        self.assertTrue(re.match(r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count.txt$",
+                                 c2["manifest_text"]))
+
+
+    def test_create_and_update(self):
+        c1 = arvados.collection.createWritableCollection("hello world")
+        self.assertEquals(c1.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
+        self.assertEquals(c1.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
+        with c1.open("count.txt", "w") as f:
+            f.write("0123456789")
+
+        self.assertEquals(c1.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
+
+        print c1.manifest_text()
+        c1.save()
+        print c1.manifest_text()
+
+        c2 = arvados.collection.WritableCollection(c1._manifest_locator)
+        with c2.open("count.txt", "w") as f:
+            f.write("abcdefg")
+
+        c2.save()
+
+        self.assertNotEqual(c1.portable_data_hash(), c2.portable_data_hash())
+
+        print c1.manifest_text()
+        c1.update()
+        print c1.manifest_text()
+
+        self.assertEqual(c1.portable_data_hash(), c2.portable_data_hash())
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 1e5e9f0..ca65133 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -11,7 +11,7 @@ import time
 
 import arvados.commands._util as arv_cmd
 from arvados_fuse import *
-from arvados.api import SafeApi
+from arvados.safeapi import ThreadSafeApiCache
 
 logger = logging.getLogger('arvados.arv-mount')
 
@@ -82,7 +82,7 @@ with "--".
     try:
         # Create the request handler
         operations = Operations(os.getuid(), os.getgid(), args.encoding)
-        api = SafeApi(arvados.config)
+        api = ThreadSafeApiCache(arvados.config)
 
         usr = api.users().current().execute(num_retries=args.retries)
         now = time.time()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list