[ARVADOS] updated: 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 21 11:44:44 EDT 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 89 +++++++++++++++++----------
sdk/python/arvados/collection.py | 109 ++++++++++++++++++++++++---------
sdk/python/tests/test_arvfile.py | 8 +--
sdk/python/tests/test_collections.py | 30 +++++++--
services/fuse/arvados_fuse/__init__.py | 17 ++---
services/fuse/tests/test_mount.py | 2 +
6 files changed, 174 insertions(+), 81 deletions(-)
via 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e (commit)
from 32131dfa999fe658e5e61f465a5badf71271e2d2 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Apr 21 11:44:38 2015 -0400
3198: Implement rename() (efficient move within/between collections).
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index eb17bd4..95acb9c 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -441,11 +441,17 @@ class _BlockManager(object):
self._prefetch_threads = None
self._prefetch_queue = None
- def commit_bufferblock(self, block):
+ def commit_bufferblock(self, block, wait):
"""Initiate a background upload of a bufferblock.
- This will block if the upload queue is at capacity, otherwise it will
- return immediately.
+ :block:
+ The block object to upload
+
+ :wait:
+ If `wait` is True, upload the block synchronously.
+ If `wait` is False, upload the block asynchronously. This will
+ return immediately unless if the upload queue is at capacity, in
+ which case it will wait on an upload queue slot.
"""
@@ -467,32 +473,37 @@ class _BlockManager(object):
if self._put_queue is not None:
self._put_queue.task_done()
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
-
- # Mark the block as PENDING so to disallow any more appends.
- block.set_state(_BufferBlock.PENDING)
- self._put_queue.put(block)
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
+
+ # Mark the block as PENDING so to disallow any more appends.
+ block.set_state(_BufferBlock.PENDING)
+ self._put_queue.put(block)
@synchronized
def get_bufferblock(self, locator):
@@ -530,7 +541,7 @@ class _BlockManager(object):
for k,v in items:
if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v)
+ self.commit_bufferblock(v, False)
with self.lock:
if self._put_queue is not None:
@@ -605,13 +616,13 @@ class ArvadosFile(object):
a list of Range objects representing segments
"""
self.parent = parent
+ self.name = name
self._modified = True
self._segments = []
self.lock = parent.root_collection().lock
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
- self.name = name
def writable(self):
return self.parent.writable()
@@ -800,7 +811,7 @@ class ArvadosFile(object):
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self._repack_writes()
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
@@ -812,11 +823,11 @@ class ArvadosFile(object):
return len(data)
@synchronized
- def flush(self):
+ def flush(self, wait=True):
if self.modified():
if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
self._repack_writes()
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@@ -863,6 +874,16 @@ class ArvadosFile(object):
buf += "\n"
return buf
+ @must_be_writable
+ @synchronized
+ def reparent(self, newparent, newname):
+ self.flush()
+ self.parent.remove(self.name)
+
+ self.parent = newparent
+ self.name = newname
+ self.lock = self.parent.root_collection().lock
+ self._modified = True
class ArvadosFileReader(ArvadosFileReaderBase):
"""Wraps ArvadosFile in a file-like object supporting reading only.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index d610c35..0ecc34a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -741,8 +741,8 @@ class RichCollectionBase(CollectionBase):
@must_be_writable
@synchronized
- def add(self, source_obj, target_name, overwrite=False):
- """Copy a file or subcollection to this collection.
+ def add(self, source_obj, target_name, overwrite=False, reparent=False):
+ """Copy or move a file or subcollection to this collection.
:source_obj:
An ArvadosFile, or Subcollection object
@@ -754,6 +754,11 @@ class RichCollectionBase(CollectionBase):
:overwrite:
Whether to overwrite target file if it already exists.
+ :reparent:
+ If True, source_obj will be moved from its parent collection to this collection.
+ If False, source_obj will be copied and the parent collection will be
+ unmodified.
+
"""
if target_name in self and not overwrite:
@@ -763,15 +768,60 @@ class RichCollectionBase(CollectionBase):
if target_name in self:
modified_from = self[target_name]
- # Actually make the copy.
- dup = source_obj.clone(self, target_name)
- self._items[target_name] = dup
+ # Actually make the move or copy.
+ if reparent:
+ source_obj.reparent(self, target_name)
+ item = source_obj
+ else:
+ item = source_obj.clone(self, target_name)
+
+ self._items[target_name] = item
self._modified = True
if modified_from:
- self.notify(MOD, self, target_name, (modified_from, dup))
+ self.notify(MOD, self, target_name, (modified_from, item))
+ else:
+ self.notify(ADD, self, target_name, item)
+
+ def _get_src_target(self, source, target_path, source_collection, create_dest):
+ if source_collection is None:
+ source_collection = self
+
+ # Find the object
+ if isinstance(source, basestring):
+ source_obj = source_collection.find(source)
+ if source_obj is None:
+ raise IOError(errno.ENOENT, "File not found")
+ sourcecomponents = source.split("/")
+ else:
+ source_obj = source
+ sourcecomponents = None
+
+ # Find parent collection the target path
+ targetcomponents = target_path.split("/")
+
+ # Determine the name to use.
+ target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+ if not target_name:
+ raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
+
+ if create_dest:
+ target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
else:
- self.notify(ADD, self, target_name, dup)
+ if len(targetcomponents) > 1:
+ target_dir = self.find("/".join(targetcomponents[0:-1]))
+ else:
+ target_dir = self
+
+ if target_dir is None:
+ raise IOError(errno.ENOENT, "Target directory not found.")
+
+ if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+ target_dir = target_dir[target_name]
+ target_name = sourcecomponents[-1]
+
+ return (source_obj, target_dir, target_name)
@must_be_writable
@synchronized
@@ -793,35 +843,35 @@ class RichCollectionBase(CollectionBase):
:overwrite:
Whether to overwrite target file if it already exists.
"""
- if source_collection is None:
- source_collection = self
- # Find the object to copy
- if isinstance(source, basestring):
- source_obj = source_collection.find(source)
- if source_obj is None:
- raise IOError(errno.ENOENT, "File not found")
- sourcecomponents = source.split("/")
- else:
- source_obj = source
- sourcecomponents = None
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+ target_dir.add(source_obj, target_name, overwrite, False)
- # Find parent collection the target path
- targetcomponents = target_path.split("/")
+ @must_be_writable
+ @synchronized
+ def rename(self, source, target_path, source_collection=None, overwrite=False):
+ """Move a file or subcollection from `source_collection` to a new path in this collection.
- # Determine the name to use.
- target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+ :source:
+ A string with a path to source file or subcollection.
- if not target_name:
- raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
- target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
- target_dir = target_dir[target_name]
- target_name = sourcecomponents[-1]
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
- target_dir.add(source_obj, target_name, overwrite)
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+ if not source_obj.writable():
+ raise IOError(errno.EROFS, "Source collection must be writable.")
+ target_dir.add(source_obj, target_name, overwrite, True)
@synchronized
def manifest_text(self, stream_name=".", strip=False, normalize=False):
@@ -1337,6 +1387,7 @@ class Collection(RichCollectionBase):
if create_collection_record:
if name is None:
name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+ ensure_unique_name = True
body = {"manifest_text": text,
"name": name}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 92c778e..3041e28 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -450,8 +450,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def test__eq__from_writes(self):
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
with Collection() as c2:
- with c2.open("count1.txt", "w") as f:
- f.write("0123456789")
+ f = c2.open("count1.txt", "w")
+ f.write("0123456789")
self.assertTrue(c1["count1.txt"] == c2["count1.txt"])
self.assertFalse(c1["count1.txt"] != c2["count1.txt"])
@@ -459,8 +459,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
def test__ne__(self):
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
with Collection() as c2:
- with c2.open("count1.txt", "w") as f:
- f.write("1234567890")
+ f = c2.open("count1.txt", "w")
+ f.write("1234567890")
self.assertTrue(c1["count1.txt"] != c2["count1.txt"])
self.assertFalse(c1["count1.txt"] == c2["count1.txt"])
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 8cf34f0..95b6dbe 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -886,6 +886,24 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
c.copy("count1.txt", "foo/")
self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ def test_rename_file(self):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c.rename("count1.txt", "count2.txt")
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+ def test_move_file_to_dir(self):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c.mkdirs("foo")
+ c.rename("count1.txt", "foo/count2.txt")
+ self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+ def test_move_file_to_other(self):
+ c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c2 = Collection()
+ c2.rename("count1.txt", "count2.txt", source_collection=c1)
+ self.assertEqual("", c1.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
+
def test_clone(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
cl = c.clone()
@@ -985,8 +1003,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
d = c1.diff(c2)
self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
('add', './count2.txt', c2["count2.txt"])])
- with c1.open("count1.txt", "w") as f:
- f.write("zzzzz")
+ f = c1.open("count1.txt", "w")
+ f.write("zzzzz")
# c1 changed, so it should not be deleted.
c1.apply(d)
@@ -997,8 +1015,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
d = c1.diff(c2)
self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
- with c1.open("count1.txt", "w") as f:
- f.write("zzzzz")
+ f = c1.open("count1.txt", "w")
+ f.write("zzzzz")
# c1 changed, so c2 mod will go to a conflict file
c1.apply(d)
@@ -1010,8 +1028,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
d = c1.diff(c2)
self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
('add', './count1.txt', c2["count1.txt"])])
- with c1.open("count1.txt", "w") as f:
- f.write("zzzzz")
+ f = c1.open("count1.txt", "w")
+ f.write("zzzzz")
# c1 added count1.txt, so c2 add will go to a conflict file
c1.apply(d)
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index e5d8128..e140672 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -472,14 +472,15 @@ class Operations(llfuse.Operations):
def rmdir(self, inode_parent, name):
self.unlink(inode_parent, name)
- # @catch_exceptions
- # def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
- # src = self._check_writable(inode_parent_old)
- # dest = self._check_writable(inode_parent_new)
- #
- # with llfuse.lock_released:
- # dest.collection.copy(name_old, name_new, source_collection=src.collection, overwrite=True)
- # src.collection.remove(name_old)
+ @catch_exceptions
+ def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
+ src = self._check_writable(inode_parent_old)
+ dest = self._check_writable(inode_parent_new)
+
+ with llfuse.lock_released:
+ dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+ dest.flush()
+ src.flush()
@catch_exceptions
def flush(self, fh):
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index ff641ca..70cfbef 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -462,6 +462,7 @@ class FuseWriteFileTest(MountTestBase):
# Forturnately the multiprocessing module makes this relatively easy.
pool = multiprocessing.Pool(1)
self.assertTrue(pool.apply(fuseWriteFileTestHelper, (self.mounttmp,)))
+ pool.close()
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
@@ -503,6 +504,7 @@ class FuseUpdateFileTest(MountTestBase):
pool = multiprocessing.Pool(1)
self.assertTrue(pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,)))
self.assertTrue(pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,)))
+ pool.close()
collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
self.assertRegexpMatches(collection2["manifest_text"],
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list