[ARVADOS] updated: 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 16 16:47:08 EST 2015
Summary of changes:
crunch_scripts/split-fastq.py | 72 +------
sdk/python/arvados/arvfile.py | 51 +++--
sdk/python/arvados/collection.py | 372 ++++++++++++++++++++---------------
sdk/python/arvados/ranges.py | 71 ++++---
sdk/python/arvados/stream.py | 2 +-
sdk/python/tests/test_arvfile.py | 102 +++++++++-
sdk/python/tests/test_collections.py | 65 +++++-
services/fuse/bin/arv-mount | 4 +-
8 files changed, 460 insertions(+), 279 deletions(-)
via 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9 (commit)
from 446785a11237f832e9bac540a04e7dd238b45a9e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 5f905666581dd7ccd8f7e05d1c8c4a6eedff0da9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 16 16:49:18 2015 -0500
4823: Add new tests for BufferBlock, BlockManager, saving, updating. Stylistic
changes on variable naming. Remove splitting code from splitfastq.
diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index 17aabf2..1c7a368 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -16,8 +16,6 @@ inp = arvados.CollectionReader(arvados.getjobparam('reads'))
manifest_list = []
-chunking = False #arvados.getjobparam('chunking')
-
def nextline(reader, start):
n = -1
while True:
@@ -31,63 +29,6 @@ def nextline(reader, start):
start += 128
return n
-# Chunk a fastq into approximately 64 MiB chunks. Requires that the input data
-# be decompressed ahead of time, such as using decompress-all.py. Generates a
-# new manifest, but doesn't actually move any data around. Handles paired
-# reads by ensuring that each chunk of a pair gets the same number of records.
-#
-# This works, but in practice is so slow that potential gains in alignment
-# performance are lost in the prep time, which is why it is currently disabled.
-#
-# A better algorithm would seek to a file position a bit less than the desired
-# chunk size and then scan ahead for the next record, making sure that record
-# was matched by the read pair.
-def splitfastq(p):
- for i in xrange(0, len(p)):
- p[i]["start"] = 0
- p[i]["end"] = 0
-
- count = 0
- recordsize = [0, 0]
-
- global piece
- finish = False
- while not finish:
- for i in xrange(0, len(p)):
- recordsize[i] = 0
-
- # read next 4 lines
- for i in xrange(0, len(p)):
- for ln in xrange(0, 4):
- r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
- if r == -1:
- finish = True
- break
- recordsize[i] += (r+1)
-
- splitnow = finish
- for i in xrange(0, len(p)):
- if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
- splitnow = True
-
- if splitnow:
- for i in xrange(0, len(p)):
- global manifest_list
- print >>sys.stderr, "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
- manifest = []
- manifest.extend(["./_" + str(piece)])
- manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
- manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
- manifest_list.append(manifest)
- p[i]["start"] = p[i]["end"]
- piece += 1
- else:
- for i in xrange(0, len(p)):
- p[i]["end"] += recordsize[i]
- count += 1
- if count % 10000 == 0:
- print >>sys.stderr, "Record %s at %s" % (count, p[i]["end"])
-
prog = re.compile(r'(.*?)(_[12])?\.fastq(\.gz)?$')
# Look for fastq files
@@ -115,14 +56,11 @@ for s in inp.all_streams():
p[0]["reader"] = s.files()[name_pieces.group(0)]
if p is not None:
- if chunking:
- splitfastq(p)
- else:
- for i in xrange(0, len(p)):
- m = p[i]["reader"].as_manifest().split()
- m[0] = "./_" + str(piece)
- manifest_list.append(m)
- piece += 1
+ for i in xrange(0, len(p)):
+ m = p[i]["reader"].as_manifest().split()
+ m[0] = "./_" + str(piece)
+ manifest_list.append(m)
+ piece += 1
manifest_text = "\n".join(" ".join(m) for m in manifest_list) + "\n"
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 456602f..0bc70a7 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -11,7 +11,7 @@ import threading
import Queue
import copy
import errno
-from .errors import KeepWriteError
+from .errors import KeepWriteError, AssertionError
def split(path):
"""Separate the stream name and file name in a /-separated stream path and
@@ -26,7 +26,7 @@ def split(path):
stream_name, file_name = '.', path
return stream_name, file_name
-class ArvadosFileBase(object):
+class FileLikeObjectBase(object):
def __init__(self, name, mode):
self.name = name
self.mode = mode
@@ -55,7 +55,7 @@ class ArvadosFileBase(object):
self.closed = True
-class ArvadosFileReaderBase(ArvadosFileBase):
+class ArvadosFileReaderBase(FileLikeObjectBase):
class _NameAttribute(str):
# The Python file API provides a plain .name attribute.
# Older SDK provided a name() method.
@@ -79,7 +79,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
def decompressed_name(self):
return re.sub('\.(bz2|gz)$', '', self.name)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
def seek(self, pos, whence=os.SEEK_CUR):
if whence == os.SEEK_CUR:
pos += self._filepos
@@ -90,7 +90,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
def tell(self):
return self._filepos
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
while True:
@@ -99,7 +99,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
break
yield data
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readline(self, size=float('inf'), num_retries=None):
cache_pos, cache_data = self._readline_cache
@@ -123,7 +123,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
self._readline_cache = (self.tell(), data[nextline_index:])
return data[:nextline_index]
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def decompress(self, decompress, size, num_retries=None):
for segment in self.readall(size, num_retries):
@@ -131,7 +131,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
if data:
yield data
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
@@ -146,7 +146,7 @@ class ArvadosFileReaderBase(ArvadosFileBase):
else:
return self.readall(size, num_retries=num_retries)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readlines(self, sizehint=float('inf'), num_retries=None):
data = []
@@ -181,7 +181,7 @@ class StreamFileReader(ArvadosFileReaderBase):
n = self.segments[-1]
return n.range_start + n.range_size
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
@@ -199,7 +199,7 @@ class StreamFileReader(ArvadosFileReaderBase):
self._filepos += len(data)
return data
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
@@ -381,8 +381,11 @@ class BlockManager(object):
"""
new_blockid = "bufferblock%i" % len(self._bufferblocks)
block = self._bufferblocks[blockid]
- bufferblock = BufferBlock(new_blockid, len(block), owner)
- bufferblock.append(block)
+ if block.state != BufferBlock.WRITABLE:
+ raise AssertionError("Can only duplicate a writable buffer block")
+
+ bufferblock = BufferBlock(new_blockid, block.size(), owner)
+ bufferblock.append(block.buffer_view[0:block.size()])
self._bufferblocks[bufferblock.blockid] = bufferblock
return bufferblock
@@ -465,7 +468,11 @@ class BlockManager(object):
block.state = BufferBlock.PENDING
self._put_queue.put(block)
- def get_block(self, locator, num_retries, cache_only=False):
+ def get_bufferblock(self, locator):
+ with self.lock:
+ return self._bufferblocks.get(locator)
+
+ def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
First checks to see if the locator is a BufferBlock and return that, if
@@ -593,7 +600,11 @@ class ArvadosFile(object):
new_loc = r.locator
if self.parent._my_block_manager().is_bufferblock(r.locator):
if r.locator not in map_loc:
- map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp).blockid
+ bufferblock = get_bufferblock(r.locator)
+ if bufferblock.state == BufferBlock.COMITTED:
+ map_loc[r.locator] = bufferblock.locator()
+ else:
+ map_loc[r.locator] = self.parent._my_block_manager().dup_block(r.locator, cp)
new_loc = map_loc[r.locator]
cp._segments.append(Range(new_loc, r.range_start, r.range_size, r.segment_offset))
@@ -674,7 +685,7 @@ class ArvadosFile(object):
data = []
for lr in readsegs:
- block = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
+ block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
if block:
data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
else:
@@ -781,7 +792,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
def size(self):
return self.arvadosfile.size()
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def read(self, size, num_retries=None):
"""Read up to `size` bytes from the stream, starting at the current file position."""
@@ -789,7 +800,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
self._filepos += len(data)
return data
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def readfrom(self, offset, size, num_retries=None):
"""Read up to `size` bytes from the stream, starting at the current file position."""
@@ -810,7 +821,7 @@ class ArvadosFileWriter(ArvadosFileReader):
def __init__(self, arvadosfile, name, mode, num_retries=None):
super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def write(self, data, num_retries=None):
if self.mode[0] == "a":
@@ -819,7 +830,7 @@ class ArvadosFileWriter(ArvadosFileReader):
self.arvadosfile.writeto(self._filepos, data, num_retries)
self._filepos += len(data)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
@retry_method
def writelines(self, seq, num_retries=None):
for s in seq:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a64dd34..33af0c2 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -8,7 +8,7 @@ import time
from collections import deque
from stat import *
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
+from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
from keep import *
from .stream import StreamReader, normalize_stream, locator_block_size
from .ranges import Range, LocatorAndRange
@@ -244,7 +244,7 @@ class CollectionReader(CollectionBase):
return self._manifest_text
-class _WriterFile(ArvadosFileBase):
+class _WriterFile(FileLikeObjectBase):
def __init__(self, coll_writer, name):
super(_WriterFile, self).__init__(name, 'wb')
self.dest = coll_writer
@@ -253,16 +253,16 @@ class _WriterFile(ArvadosFileBase):
super(_WriterFile, self).close()
self.dest.finish_current_file()
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
def write(self, data):
self.dest.write(data)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
def writelines(self, seq):
for data in seq:
self.write(data)
- @ArvadosFileBase._before_close
+ @FileLikeObjectBase._before_close
def flush(self):
self.dest.flush_data()
@@ -696,65 +696,62 @@ class SynchronizedCollectionBase(CollectionBase):
def notify(self, event, collection, name, item):
raise NotImplementedError()
+ @must_be_writable
@synchronized
- def _find(self, path, create, create_collection):
- """Internal method. Don't use this. Use `find()` or `find_or_create()`
- instead.
-
- Recursively search the specified file path. May return either a
- Collection or ArvadosFile. Return None if not found and create=False.
- Will create a new item if create=True.
-
- :create:
- If true, create path components (i.e. Collections) that are
- missing. If "create" is False, return None if a path component is
- not found.
-
- :create_collection:
- If the path is not found, "create" is True, and
- "create_collection" is False, then create and return a new
- ArvadosFile for the last path component. If "create_collection" is
- True, then create and return a new Collection for the last path
- component.
+ def find_or_create(self, path, create_type):
+ """Recursively search the specified file path.
+
+ May return either a `Collection` or `ArvadosFile`. If not found, will
+ create a new item at the specified path based on `create_type`. Will
+ create intermediate subcollections needed to contain the final item in
+ the path.
+
+ :create_type:
+ One of `arvado.collection.FILE` or
+ `arvado.collection.COLLECTION`. If the path is not found, and value
+ of create_type is FILE then create and return a new ArvadosFile for
+ the last path component. If COLLECTION, then create and return a new
+ Collection for the last path component.
"""
- if create and self.sync_mode() == SYNC_READONLY:
+ if self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
- p = path.split("/")
- if p[0] == '.':
- del p[0]
+ pathcomponents = path.split("/")
+ if pathcomponents[0] == '.':
+ del pathcomponents[0]
- if p and p[0]:
- item = self._items.get(p[0])
- if len(p) == 1:
+ if pathcomponents and pathcomponents[0]:
+ item = self._items.get(pathcomponents[0])
+ if len(pathcomponents) == 1:
# item must be a file
- if item is None and create:
+ if item is None:
# create new file
- if create_collection:
+ if create_type == COLLECTION:
item = Subcollection(self)
else:
item = ArvadosFile(self)
- self._items[p[0]] = item
+ self._items[pathcomponents[0]] = item
self._modified = True
- self.notify(ADD, self, p[0], item)
+ self.notify(ADD, self, pathcomponents[0], item)
return item
else:
- if item is None and create:
+ if item is None:
# create new collection
item = Subcollection(self)
- self._items[p[0]] = item
+ self._items[pathcomponents[0]] = item
self._modified = True
- self.notify(ADD, self, p[0], item)
- del p[0]
+ self.notify(ADD, self, pathcomponents[0], item)
+ del pathcomponents[0]
if isinstance(item, SynchronizedCollectionBase):
- return item._find("/".join(p), create, create_collection)
+ return item.find_or_create("/".join(pathcomponents), create_type)
else:
raise errors.ArgumentError("Interior path components must be subcollection")
else:
return self
+ @synchronized
def find(self, path):
"""Recursively search the specified file path.
@@ -762,23 +759,32 @@ class SynchronizedCollectionBase(CollectionBase):
found.
"""
- return self._find(path, False, False)
+ pathcomponents = path.split("/")
+ if pathcomponents[0] == '.':
+ del pathcomponents[0]
- def find_or_create(self, path, create_type):
- """Recursively search the specified file path.
+ if pathcomponents and pathcomponents[0]:
+ item = self._items.get(pathcomponents[0])
+ if len(pathcomponents) == 1:
+ # item must be a file
+ return item
+ else:
+ del pathcomponents[0]
+ if isinstance(item, SynchronizedCollectionBase):
+ return item.find("/".join(pathcomponents))
+ else:
+ raise errors.ArgumentError("Interior path components must be subcollection")
+ else:
+ return self
- May return either a Collection or ArvadosFile. Will create a new item
- at the specified path if none exists.
+ def mkdirs(path):
+ """Recursive subcollection create.
- :create_type:
- One of `arvado.collection.FILE` or
- `arvado.collection.COLLECTION`. If the path is not found, and value
- of create_type is FILE then create and return a new ArvadosFile for
- the last path component. If COLLECTION, then create and return a new
- Collection for the last path component.
+ Like `os.mkdirs()`. Will create intermediate subcollections needed to
+ contain the leaf subcollection path.
"""
- return self._find(path, True, (create_type == COLLECTION))
+ return self.find_or_create(path, COLLECTION)
def open(self, path, mode):
"""Open a file-like object for access.
@@ -806,20 +812,23 @@ class SynchronizedCollectionBase(CollectionBase):
if create and self.sync_mode() == SYNC_READONLY:
raise IOError((errno.EROFS, "Collection is read only"))
- f = self._find(path, create, False)
+ if create:
+ arvfile = self.find_or_create(path, FILE)
+ else:
+ arvfile = self.find(path)
- if f is None:
+ if arvfile is None:
raise IOError((errno.ENOENT, "File not found"))
- if not isinstance(f, ArvadosFile):
+ if not isinstance(arvfile, ArvadosFile):
raise IOError((errno.EISDIR, "Path must refer to a file."))
if mode[0] == "w":
- f.truncate(0)
+ arvfile.truncate(0)
if mode == "r":
- return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
+ return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries)
else:
- return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
+ return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries)
@synchronized
def modified(self):
@@ -902,25 +911,25 @@ class SynchronizedCollectionBase(CollectionBase):
:recursive:
Specify whether to remove non-empty subcollections (True), or raise an error (False).
"""
- p = path.split("/")
- if p[0] == '.':
+ pathcomponents = path.split("/")
+ if pathcomponents[0] == '.':
# Remove '.' from the front of the path
- del p[0]
+ del pathcomponents[0]
- if len(p) > 0:
- item = self._items.get(p[0])
+ if len(pathcomponents) > 0:
+ item = self._items.get(pathcomponents[0])
if item is None:
raise IOError((errno.ENOENT, "File not found"))
- if len(p) == 1:
- if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not recursive:
+ if len(pathcomponents) == 1:
+ if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
- d = self._items[p[0]]
- del self._items[p[0]]
+ deleteditem = self._items[pathcomponents[0]]
+ del self._items[pathcomponents[0]]
self._modified = True
- self.notify(DEL, self, p[0], d)
+ self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- del p[0]
- item.remove("/".join(p))
+ del pathcomponents[0]
+ item.remove("/".join(pathcomponents))
else:
raise IOError((errno.ENOENT, "File not found"))
@@ -959,41 +968,41 @@ class SynchronizedCollectionBase(CollectionBase):
source_obj = source_collection.find(source)
if source_obj is None:
raise IOError((errno.ENOENT, "File not found"))
- sp = source.split("/")
+ sourcecomponents = source.split("/")
else:
source_obj = source
- sp = None
+ sourcecomponents = None
# Find parent collection the target path
- tp = target_path.split("/")
+ targetcomponents = target_path.split("/")
# Determine the name to use.
- target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
+ target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
if not target_name:
raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
- target_dir = self.find_or_create("/".join(tp[0:-1]), COLLECTION)
+ target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
with target_dir.lock:
if target_name in target_dir:
- if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
+ if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
target_dir = target_dir[target_name]
- target_name = sp[-1]
+ target_name = sourcecomponents[-1]
elif not overwrite:
raise IOError((errno.EEXIST, "File already exists"))
- mod = None
+ modified_from = None
if target_name in target_dir:
- mod = target_dir[target_name]
+ modified_from = target_dir[target_name]
# Actually make the copy.
dup = source_obj.clone(target_dir)
target_dir._items[target_name] = dup
target_dir._modified = True
- if mod:
- self.notify(MOD, target_dir, target_name, (mod, dup))
+ if modified_from:
+ self.notify(MOD, target_dir, target_name, (modified_from, dup))
else:
self.notify(ADD, target_dir, target_name, dup)
@@ -1028,7 +1037,7 @@ class SynchronizedCollectionBase(CollectionBase):
"""
changes = []
if holding_collection is None:
- holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
+ holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
for k in self:
if k not in end_collection:
changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
@@ -1051,13 +1060,14 @@ class SynchronizedCollectionBase(CollectionBase):
alternate path indicating the conflict.
"""
- for c in changes:
- path = c[1]
- initial = c[2]
+ for change in changes:
+ event_type = change[0]
+ path = change[1]
+ initial = change[2]
local = self.find(path)
conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
time.gmtime()))
- if c[0] == ADD:
+ if event_type == ADD:
if local is None:
# No local file at path, safe to copy over new file
self.copy(initial, path)
@@ -1065,22 +1075,23 @@ class SynchronizedCollectionBase(CollectionBase):
# There is already local file and it is different:
# save change to conflict file.
self.copy(initial, conflictpath)
- elif c[0] == MOD:
+ elif event_type == MOD:
+ final = change[3]
if local == initial:
# Local matches the "initial" item so it has not
# changed locally and is safe to update.
- if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
+ if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
# Replace contents of local file with new contents
- local.replace_contents(c[3])
+ local.replace_contents(final)
else:
# Overwrite path with new item; this can happen if
# path was a file and is now a collection or vice versa
- self.copy(c[3], path, overwrite=True)
+ self.copy(final, path, overwrite=True)
else:
# Local is missing (presumably deleted) or local doesn't
# match the "start" value, so save change to conflict file
- self.copy(c[3], conflictpath)
- elif c[0] == DEL:
+ self.copy(final, conflictpath)
+ elif event_type == DEL:
if local == initial:
# Local item matches "initial" value, so it is safe to remove.
self.remove(path, recursive=True)
@@ -1110,7 +1121,7 @@ class SynchronizedCollectionBase(CollectionBase):
def __ne__(self, other):
return not self.__eq__(other)
-class CollectionRoot(SynchronizedCollectionBase):
+class Collection(SynchronizedCollectionBase):
"""Represents the root of an Arvados Collection, which may be associated with
an API server Collection record.
@@ -1154,7 +1165,7 @@ class CollectionRoot(SynchronizedCollectionBase):
num_retries=None,
block_manager=None,
sync=None):
- """CollectionRoot constructor.
+ """Collection constructor.
:manifest_locator_or_text:
One of Arvados collection UUID, block locator of
@@ -1185,7 +1196,7 @@ class CollectionRoot(SynchronizedCollectionBase):
background websocket events, on block write, or on file close.
"""
- super(CollectionRoot, self).__init__(parent)
+ super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
self._block_manager = block_manager
@@ -1195,7 +1206,7 @@ class CollectionRoot(SynchronizedCollectionBase):
else:
self._config = config.settings()
- self.num_retries = num_retries
+ self.num_retries = num_retries if num_retries is not None else 2
self._manifest_locator = None
self._manifest_text = None
self._api_response = None
@@ -1220,13 +1231,7 @@ class CollectionRoot(SynchronizedCollectionBase):
"Argument to CollectionReader must be a manifest or a collection UUID")
self._populate()
-
- if self._sync == SYNC_LIVE:
- if not self._has_collection_uuid():
- raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
- self.events = events.subscribe(arvados.api(apiconfig=self._config),
- [["object_uuid", "=", self._manifest_locator]],
- self.on_message)
+ self._subscribe_events()
def root_collection(self):
@@ -1235,19 +1240,18 @@ class CollectionRoot(SynchronizedCollectionBase):
def sync_mode(self):
return self._sync
+ def _subscribe_events(self):
+ if self._sync == SYNC_LIVE and self.events is None:
+ if not self._has_collection_uuid():
+ raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
+ self.events = events.subscribe(arvados.api(apiconfig=self._config),
+ [["object_uuid", "=", self._manifest_locator]],
+ self.on_message)
+
def on_message(self, event):
if event.get("object_uuid") == self._manifest_locator:
self.update()
- @staticmethod
- def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
- """Create a new empty Collection with associated collection record."""
- c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
- c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
- if sync == SYNC_LIVE:
- c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
- return c
-
@synchronized
@retry_method
def update(self, other=None, num_retries=None):
@@ -1258,9 +1262,9 @@ class CollectionRoot(SynchronizedCollectionBase):
if other is None:
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
- n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
- other = import_collection(n["manifest_text"])
- baseline = import_collection(self._manifest_text)
+ response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ other = import_manifest(response["manifest_text"])
+ baseline = import_manifest(self._manifest_text)
self.apply(other.diff(baseline))
@synchronized
@@ -1368,12 +1372,12 @@ class CollectionRoot(SynchronizedCollectionBase):
def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
if new_config is None:
new_config = self._config
- c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
+ newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=new_sync)
if new_sync == SYNC_READONLY:
- c.lock = NoopLock()
- c._items = {}
- self._cloneinto(c)
- return c
+ newcollection.lock = NoopLock()
+ newcollection._items = {}
+ self._cloneinto(newcollection)
+ return newcollection
@synchronized
def api_response(self):
@@ -1410,13 +1414,13 @@ class CollectionRoot(SynchronizedCollectionBase):
self.update()
self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
- mt = self.manifest_text(strip=False)
+ text = self.manifest_text(strip=False)
self._api_response = self._my_api().collections().update(
uuid=self._manifest_locator,
- body={'manifest_text': mt}
+ body={'manifest_text': text}
).execute(
num_retries=num_retries)
- self._manifest_text = mt
+ self._manifest_text = text
self.set_unmodified()
@must_be_writable
@@ -1447,13 +1451,13 @@ class CollectionRoot(SynchronizedCollectionBase):
"""
self._my_block_manager().commit_all()
self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
- mt = self.manifest_text(strip=False)
+ text = self.manifest_text(strip=False)
if create_collection_record:
if name is None:
name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
- body = {"manifest_text": mt,
+ body = {"manifest_text": text,
"name": name}
if owner_uuid:
body["owner_uuid"] = owner_uuid
@@ -1468,7 +1472,7 @@ class CollectionRoot(SynchronizedCollectionBase):
if self.events:
self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
- self._manifest_text = mt
+ self._manifest_text = text
self.set_unmodified()
@synchronized
@@ -1485,17 +1489,75 @@ class CollectionRoot(SynchronizedCollectionBase):
c(event, collection, name, item)
def ReadOnlyCollection(*args, **kwargs):
+ """Create a read-only collection object from an api collection record locator,
+ a portable data hash of a manifest, or raw manifest text.
+
+ See `Collection` constructor for detailed options.
+
+ """
kwargs["sync"] = SYNC_READONLY
- return CollectionRoot(*args, **kwargs)
+ return Collection(*args, **kwargs)
def WritableCollection(*args, **kwargs):
+ """Create a writable collection object from an api collection record locator,
+ a portable data hash of a manifest, or raw manifest text.
+
+ See `Collection` constructor for detailed options.
+
+ """
+
kwargs["sync"] = SYNC_EXPLICIT
- return CollectionRoot(*args, **kwargs)
+ return Collection(*args, **kwargs)
def LiveCollection(*args, **kwargs):
+ """Create a writable, live updating collection object representing an existing
+ collection record on the API server.
+
+ See `Collection` constructor for detailed options.
+
+ """
kwargs["sync"] = SYNC_LIVE
- return CollectionRoot(*args, **kwargs)
+ return Collection(*args, **kwargs)
+
+def createWritableCollection(name, owner_uuid=None, apiconfig=None):
+ """Create an empty, writable collection object and create an associated api
+ collection record.
+
+ :name:
+ The collection name
+
+ :owner_uuid:
+ The parent project.
+
+ :apiconfig:
+ Optional alternate api configuration to use (to specify alternate API
+ host or token than the default.)
+ """
+ newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+ newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+ return newcollection
+
+def createLiveCollection(name, owner_uuid=None, apiconfig=None):
+ """Create an empty, writable, live updating Collection object and create an
+ associated collection record on the API server.
+
+ :name:
+ The collection name
+
+ :owner_uuid:
+ The parent project.
+
+ :apiconfig:
+ Optional alternate api configuration to use (to specify alternate API
+ host or token than the default.)
+
+ """
+ newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
+ newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
+ newcollection._sync = SYNC_LIVE
+ newcollection._subscribe_events()
+ return newcollection
class Subcollection(SynchronizedCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
@@ -1542,7 +1604,7 @@ def import_manifest(manifest_text,
keep=None,
num_retries=None,
sync=SYNC_READONLY):
- """Import a manifest into a `CollectionRoot`.
+ """Import a manifest into a `Collection`.
:manifest_text:
The manifest text to import from.
@@ -1566,12 +1628,11 @@ def import_manifest(manifest_text,
if into_collection is not None:
if len(into_collection) > 0:
raise ArgumentError("Can only import manifest into an empty collection")
- c = into_collection
else:
- c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
+ into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
- save_sync = c.sync_mode()
- c._sync = None
+ save_sync = into_collection.sync_mode()
+ into_collection._sync = None
STREAM_NAME = 0
BLOCKS = 1
@@ -1608,7 +1669,7 @@ def import_manifest(manifest_text,
pos = long(s.group(1))
size = long(s.group(2))
name = s.group(3).replace('\\040', ' ')
- f = c.find_or_create("%s/%s" % (stream_name, name), FILE)
+ f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE)
f.add_segment(blocks, pos, size)
else:
# error!
@@ -1618,9 +1679,9 @@ def import_manifest(manifest_text,
stream_name = None
state = STREAM_NAME
- c.set_unmodified()
- c._sync = save_sync
- return c
+ into_collection.set_unmodified()
+ into_collection._sync = save_sync
+ return into_collection
def export_manifest(item, stream_name=".", portable_locators=False):
"""Export a manifest from the contents of a SynchronizedCollectionBase.
@@ -1641,34 +1702,35 @@ def export_manifest(item, stream_name=".", portable_locators=False):
if isinstance(item, SynchronizedCollectionBase):
stream = {}
sorted_keys = sorted(item.keys())
- for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
- v = item[k]
- st = []
- for s in v.segments():
- loc = s.locator
+ for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
+ # Create a stream per file `k`
+ arvfile = item[filename]
+ filestream = []
+ for segment in arvfile.segments():
+ loc = segment.locator
if loc.startswith("bufferblock"):
- loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+ loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
- st.append(LocatorAndRange(loc, locator_block_size(loc),
- s.segment_offset, s.range_size))
- stream[k] = st
+ filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+ segment.segment_offset, segment.range_size))
+ stream[filename] = filestream
if stream:
buf += ' '.join(normalize_stream(stream_name, stream))
buf += "\n"
- for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
- buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
+ for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
+ buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators)
elif isinstance(item, ArvadosFile):
- st = []
- for s in item.segments:
- loc = s.locator
+ filestream = []
+ for segment in item.segments:
+ loc = segment.locator
if loc.startswith("bufferblock"):
loc = item._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
- st.append(LocatorAndRange(loc, locator_block_size(loc),
- s.segment_offset, s.range_size))
- stream[stream_name] = st
+ filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+ segment.segment_offset, segment.range_size))
+ stream[stream_name] = filestream
buf += ' '.join(normalize_stream(stream_name, stream))
buf += "\n"
return buf
diff --git a/sdk/python/arvados/ranges.py b/sdk/python/arvados/ranges.py
index 12941a1..2a08b3b 100644
--- a/sdk/python/arvados/ranges.py
+++ b/sdk/python/arvados/ranges.py
@@ -1,3 +1,7 @@
+import logging
+
+_logger = logging.getLogger('arvados.ranges')
+
class Range(object):
def __init__(self, locator, range_start, range_size, segment_offset=0):
self.locator = locator
@@ -14,7 +18,7 @@ class Range(object):
self.range_size == other.range_size and
self.segment_offset == other.segment_offset)
-def first_block(data_locators, range_start, range_size, debug=False):
+def first_block(data_locators, range_start, range_size):
block_start = 0L
# range_start/block_start is the inclusive lower bound
@@ -26,7 +30,6 @@ def first_block(data_locators, range_start, range_size, debug=False):
block_size = data_locators[i].range_size
block_start = data_locators[i].range_start
block_end = block_start + block_size
- if debug: print '---'
# perform a binary search for the first block
# assumes that all of the blocks are contigious, so range_start is guaranteed
@@ -40,7 +43,6 @@ def first_block(data_locators, range_start, range_size, debug=False):
else:
hi = i
i = int((hi + lo) / 2)
- if debug: print lo, i, hi
block_size = data_locators[i].range_size
block_start = data_locators[i].range_start
block_end = block_start + block_size
@@ -63,13 +65,19 @@ class LocatorAndRange(object):
def __repr__(self):
return "LocatorAndRange(\"%s\", %i, %i, %i)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
-def locators_and_ranges(data_locators, range_start, range_size, debug=False):
- '''
- Get blocks that are covered by the range
- data_locators: list of Range objects, assumes that blocks are in order and contigous
- range_start: start of range
- range_size: size of range
- returns list of LocatorAndRange objects
+def locators_and_ranges(data_locators, range_start, range_size):
+ '''Get blocks that are covered by the range and return list of LocatorAndRange
+ objects.
+
+ :data_locators:
+ list of Range objects, assumes that blocks are in order and contigous
+
+ :range_start:
+ start of range
+
+ :range_size:
+ size of range
+
'''
if range_size == 0:
return []
@@ -78,7 +86,7 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
range_size = long(range_size)
range_end = range_start + range_size
- i = first_block(data_locators, range_start, range_size, debug)
+ i = first_block(data_locators, range_start, range_size)
if i is None:
return []
@@ -87,15 +95,16 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
block_start = dl.range_start
block_size = dl.range_size
block_end = block_start + block_size
- if debug:
- print dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end
+ _logger.debug(dl.locator, "range_start", range_start, "block_start", block_start, "range_end", range_end, "block_end", block_end)
if range_end <= block_start:
# range ends before this block starts, so don't look at any more locators
break
#if range_start >= block_end:
- # range starts after this block ends, so go to next block
- # we should always start at the first block due to the binary above, so this test is redundant
+ # Range starts after this block ends, so go to next block.
+ # We should always start at the first block due to the binary
+ # search above, so this test is unnecessary but useful to help
+ # document the algorithm.
#next
if range_start >= block_start and range_end <= block_end:
@@ -114,23 +123,28 @@ def locators_and_ranges(data_locators, range_start, range_size, debug=False):
i += 1
return resp
-def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset, debug=False):
+def replace_range(data_locators, new_range_start, new_range_size, new_locator, new_segment_offset):
'''
Replace a file segment range with a new segment.
- data_locators: list of Range objects, assumes that segments are in order and contigous
+ NOTE::
+ data_locators will be updated in place
- new_range_start: start of range to replace in data_locators
+ :data_locators:
+ list of Range objects, assumes that segments are in order and contigous
- new_range_size: size of range to replace in data_locators
+ :new_range_start:
+ start of range to replace in data_locators
- new_locator: locator for new segment to be inserted
+ :new_range_size:
+ size of range to replace in data_locators
- new_segment_offset: segment offset within the locator
+ :new_locator:
+ locator for new segment to be inserted
- debug: print debugging details.
+ :new_segment_offset:
+ segment offset within the locator
- !!! data_locators will be updated in place !!!
'''
if new_range_size == 0:
return
@@ -152,7 +166,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
return
- i = first_block(data_locators, new_range_start, new_range_size, debug)
+ i = first_block(data_locators, new_range_start, new_range_size)
if i is None:
return
@@ -160,15 +174,16 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
dl = data_locators[i]
old_segment_start = dl.range_start
old_segment_end = old_segment_start + dl.range_size
- if debug:
- print locator, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end
+ _logger.debug(dl, "range_start", new_range_start, "segment_start", old_segment_start, "range_end", new_range_end, "segment_end", old_segment_end)
if new_range_end <= old_segment_start:
# range ends before this segment starts, so don't look at any more locators
break
#if range_start >= old_segment_end:
- # range starts after this segment ends, so go to next segment
- # we should always start at the first segment due to the binary above, so this test is redundant
+ # Range starts after this segment ends, so go to next segment.
+ # We should always start at the first segment due to the binary
+ # search above, so this test is unnecessary but useful to help
+ # document the algorithm.
#next
if old_segment_start <= new_range_start and new_range_end <= old_segment_end:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 75b23cc..9cfceb8 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -7,7 +7,7 @@ import functools
import copy
from .ranges import *
-from .arvfile import ArvadosFileBase, StreamFileReader
+from .arvfile import StreamFileReader
from arvados.retry import retry_method
from keep import *
import config
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 52de0a3..29775a6 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -75,7 +75,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
- def test_append(self):
+ def test_append0(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
"manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"},
@@ -83,22 +83,44 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
api_client=api, keep_client=keep) as c:
writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 10)
+
writer.seek(5, os.SEEK_SET)
self.assertEqual("56789", writer.read(8))
+
writer.seek(10, os.SEEK_SET)
writer.write("foo")
self.assertEqual(writer.size(), 13)
+
writer.seek(5, os.SEEK_SET)
self.assertEqual("56789foo", writer.read(8))
self.assertEqual(None, c._manifest_locator)
self.assertEqual(True, c.modified())
self.assertEqual(None, keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
+
c.save_new("test_append")
self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
self.assertEqual(False, c.modified())
self.assertEqual("foo", keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
+
+ def test_append1(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+ c = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
+ writer = c.open("count.txt", "a+")
+ self.assertEqual(writer.read(20), "0123456789")
+ writer.seek(0, os.SEEK_SET)
+
+ writer.write("hello")
+ self.assertEqual(writer.read(20), "0123456789hello")
+ writer.seek(0, os.SEEK_SET)
+
+ writer.write("world")
+ self.assertEqual(writer.read(20), "0123456789helloworld")
+
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", export_manifest(c))
+
def test_write0(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
with arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -328,7 +350,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def block_prefetch(self, loc):
pass
- def get_block(self, loc, num_retries=0, cache_only=False):
+ def get_block_contents(self, loc, num_retries=0, cache_only=False):
if self.nocache and cache_only:
return None
return self.blocks[loc]
@@ -432,3 +454,79 @@ class ArvadosFileReadAllDecompressedTestCase(ArvadosFileReadTestCase):
class ArvadosFileReadlinesTestCase(ArvadosFileReadTestCase):
def read_for_test(self, reader, byte_count, **kwargs):
return ''.join(reader.readlines(**kwargs))
+
+class BlockManagerTest(unittest.TestCase):
+ def test_bufferblock_append(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ blockmanager = arvados.arvfile.BlockManager(keep)
+ bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.append("foo")
+
+ self.assertEqual(bufferblock.size(), 3)
+ self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+ self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+ bufferblock.append("bar")
+
+ self.assertEqual(bufferblock.size(), 6)
+ self.assertEqual(bufferblock.buffer_view[0:6], "foobar")
+ self.assertEqual(bufferblock.locator(), "3858f62230ac3c915f300c664312c63f+6")
+
+ bufferblock.state = arvados.arvfile.BufferBlock.PENDING
+ with self.assertRaises(arvados.errors.AssertionError):
+ bufferblock.append("bar")
+
+ def test_bufferblock_dup(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ blockmanager = arvados.arvfile.BlockManager(keep)
+ bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.append("foo")
+
+ self.assertEqual(bufferblock.size(), 3)
+ self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+ self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+ bufferblock.state = arvados.arvfile.BufferBlock.PENDING
+
+ bufferblock2 = blockmanager.dup_block(bufferblock.blockid, None)
+ self.assertNotEqual(bufferblock.blockid, bufferblock2.blockid)
+
+ bufferblock2.append("bar")
+
+ self.assertEqual(bufferblock2.size(), 6)
+ self.assertEqual(bufferblock2.buffer_view[0:6], "foobar")
+ self.assertEqual(bufferblock2.locator(), "3858f62230ac3c915f300c664312c63f+6")
+
+ self.assertEqual(bufferblock.size(), 3)
+ self.assertEqual(bufferblock.buffer_view[0:3], "foo")
+ self.assertEqual(bufferblock.locator(), "acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+ def test_bufferblock_get(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+ blockmanager = arvados.arvfile.BlockManager(keep)
+ bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.append("foo")
+
+ self.assertEqual(blockmanager.get_block_contents("781e5e245d69b566979b86e28d23f2c7+10", 1), "0123456789")
+ self.assertEqual(blockmanager.get_block_contents(bufferblock.blockid, 1), "foo")
+
+ def test_bufferblock_commit(self):
+ mockkeep = mock.MagicMock()
+ blockmanager = arvados.arvfile.BlockManager(mockkeep)
+ bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.append("foo")
+ blockmanager.commit_all()
+ self.assertTrue(mockkeep.put.called)
+ self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.COMMITTED)
+ self.assertIsNone(bufferblock.buffer_view)
+
+
+ def test_bufferblock_commit_with_error(self):
+ mockkeep = mock.MagicMock()
+ mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
+ blockmanager = arvados.arvfile.BlockManager(mockkeep)
+ bufferblock = blockmanager.alloc_bufferblock()
+ bufferblock.append("foo")
+ with self.assertRaises(arvados.errors.KeepWriteError) as err:
+ blockmanager.commit_all()
+ self.assertEquals(str(err.exception), "Error writing some blocks: acbd18db4cc2f85cedef654fccc4a4d8+3 raised KeepWriteError (fail)")
+ self.assertEqual(bufferblock.state, arvados.arvfile.BufferBlock.PENDING)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index fbec8b2..5e50357 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -816,12 +816,12 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
- def test_import_manifest(self):
+ def test_import_export_manifest(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
"""
- self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.ReadOnlyCollection(m1)))
+ self.assertEqual(". 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt\n", arvados.export_manifest(arvados.import_manifest(m1)))
def test_init_manifest(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
@@ -1023,10 +1023,67 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
c1.manifest_text()))
def test_notify1(self):
- c1 = arvados.WritableCollection(sync=SYNC_EXPLICIT)
+ c1 = arvados.WritableCollection()
events = []
c1.subscribe(lambda event, collection, name, item: events.append((event, collection, name, item)))
- c1.find("")
+ f = c1.open("foo.txt", "w")
+ self.assertEqual(events[0], (arvados.collection.ADD, c1, "foo.txt", f.arvadosfile))
+
+ def test_open_w(self):
+ c1 = arvados.WritableCollection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt')
+ self.assertEqual(c1["count.txt"].size(), 10)
+ c1.open("count.txt", "w").close()
+ self.assertEqual(c1["count.txt"].size(), 0)
+
+
+class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
+
+ def test_create_and_save(self):
+ c = arvados.collection.createWritableCollection("hello world")
+ self.assertEquals(c.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
+ self.assertEquals(c.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
+
+ with c.open("count.txt", "w") as f:
+ f.write("0123456789")
+
+ self.assertEquals(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
+
+ c.save()
+
+ c2 = arvados.api().collections().get(uuid=c._manifest_locator).execute()
+ self.assertTrue(re.match(r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count.txt$",
+ c2["manifest_text"]))
+
+
+ def test_create_and_update(self):
+ c1 = arvados.collection.createWritableCollection("hello world")
+ self.assertEquals(c1.portable_data_hash(), "d41d8cd98f00b204e9800998ecf8427e+0")
+ self.assertEquals(c1.api_response()["portable_data_hash"], "d41d8cd98f00b204e9800998ecf8427e+0" )
+ with c1.open("count.txt", "w") as f:
+ f.write("0123456789")
+
+ self.assertEquals(c1.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n")
+
+ print c1.manifest_text()
+ c1.save()
+ print c1.manifest_text()
+
+ c2 = arvados.collection.WritableCollection(c1._manifest_locator)
+ with c2.open("count.txt", "w") as f:
+ f.write("abcdefg")
+
+ c2.save()
+
+ self.assertNotEqual(c1.portable_data_hash(), c2.portable_data_hash())
+
+ print c1.manifest_text()
+ c1.update()
+ print c1.manifest_text()
+
+ self.assertEqual(c1.portable_data_hash(), c2.portable_data_hash())
+
if __name__ == '__main__':
unittest.main()
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 1e5e9f0..ca65133 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -11,7 +11,7 @@ import time
import arvados.commands._util as arv_cmd
from arvados_fuse import *
-from arvados.api import SafeApi
+from arvados.safeapi import ThreadSafeApiCache
logger = logging.getLogger('arvados.arv-mount')
@@ -82,7 +82,7 @@ with "--".
try:
# Create the request handler
operations = Operations(os.getuid(), os.getgid(), args.encoding)
- api = SafeApi(arvados.config)
+ api = ThreadSafeApiCache(arvados.config)
usr = api.users().current().execute(num_retries=args.retries)
now = time.time()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list