[ARVADOS] updated: 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e

git at public.curoverse.com git at public.curoverse.com
Tue Apr 21 11:44:44 EDT 2015


Summary of changes:
 sdk/python/arvados/arvfile.py          |  89 +++++++++++++++++----------
 sdk/python/arvados/collection.py       | 109 ++++++++++++++++++++++++---------
 sdk/python/tests/test_arvfile.py       |   8 +--
 sdk/python/tests/test_collections.py   |  30 +++++++--
 services/fuse/arvados_fuse/__init__.py |  17 ++---
 services/fuse/tests/test_mount.py      |   2 +
 6 files changed, 174 insertions(+), 81 deletions(-)

       via  4077a9af0985d3c85f2f2de2bb7a0f6be581e71e (commit)
      from  32131dfa999fe658e5e61f465a5badf71271e2d2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Apr 21 11:44:38 2015 -0400

    3198: Implement rename() (efficient move within/between collections).

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index eb17bd4..95acb9c 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -441,11 +441,17 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self._prefetch_queue = None
 
-    def commit_bufferblock(self, block):
+    def commit_bufferblock(self, block, wait):
         """Initiate a background upload of a bufferblock.
 
-        This will block if the upload queue is at capacity, otherwise it will
-        return immediately.
+        :block:
+          The block object to upload
+
+        :wait:
+          If `wait` is True, upload the block synchronously.
+          If `wait` is False, upload the block asynchronously.  This will
+          return immediately unless if the upload queue is at capacity, in
+          which case it will wait on an upload queue slot.
 
         """
 
@@ -467,32 +473,37 @@ class _BlockManager(object):
                     if self._put_queue is not None:
                         self._put_queue.task_done()
 
-        with self.lock:
-            if self._put_threads is None:
-                # Start uploader threads.
-
-                # If we don't limit the Queue size, the upload queue can quickly
-                # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
-                # servers.
-                #
-                # With two upload threads and a queue size of 2, this means up to 4
-                # blocks pending.  If they are full 64 MiB blocks, that means up to
-                # 256 MiB of internal buffering, which is the same size as the
-                # default download block cache in KeepClient.
-                self._put_queue = Queue.Queue(maxsize=2)
-                self._put_errors = Queue.Queue()
-
-                self._put_threads = []
-                for i in xrange(0, self.num_put_threads):
-                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
-                    self._put_threads.append(thread)
-                    thread.daemon = True
-                    thread.start()
-
-        # Mark the block as PENDING so to disallow any more appends.
-        block.set_state(_BufferBlock.PENDING)
-        self._put_queue.put(block)
+        if wait:
+            block.set_state(_BufferBlock.PENDING)
+            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+            block.set_state(_BufferBlock.COMMITTED, loc)
+        else:
+            with self.lock:
+                if self._put_threads is None:
+                    # Start uploader threads.
+
+                    # If we don't limit the Queue size, the upload queue can quickly
+                    # grow to take up gigabytes of RAM if the writing process is
+                    # generating data more quickly than it can be send to the Keep
+                    # servers.
+                    #
+                    # With two upload threads and a queue size of 2, this means up to 4
+                    # blocks pending.  If they are full 64 MiB blocks, that means up to
+                    # 256 MiB of internal buffering, which is the same size as the
+                    # default download block cache in KeepClient.
+                    self._put_queue = Queue.Queue(maxsize=2)
+                    self._put_errors = Queue.Queue()
+
+                    self._put_threads = []
+                    for i in xrange(0, self.num_put_threads):
+                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+                        self._put_threads.append(thread)
+                        thread.daemon = True
+                        thread.start()
+
+            # Mark the block as PENDING so to disallow any more appends.
+            block.set_state(_BufferBlock.PENDING)
+            self._put_queue.put(block)
 
     @synchronized
     def get_bufferblock(self, locator):
@@ -530,7 +541,7 @@ class _BlockManager(object):
 
         for k,v in items:
             if v.state() == _BufferBlock.WRITABLE:
-                self.commit_bufferblock(v)
+                self.commit_bufferblock(v, False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -605,13 +616,13 @@ class ArvadosFile(object):
           a list of Range objects representing segments
         """
         self.parent = parent
+        self.name = name
         self._modified = True
         self._segments = []
         self.lock = parent.root_collection().lock
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
-        self.name = name
 
     def writable(self):
         return self.parent.writable()
@@ -800,7 +811,7 @@ class ArvadosFile(object):
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes()
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
@@ -812,11 +823,11 @@ class ArvadosFile(object):
         return len(data)
 
     @synchronized
-    def flush(self):
+    def flush(self, wait=True):
         if self.modified():
             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
                 self._repack_writes()
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
             self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
@@ -863,6 +874,16 @@ class ArvadosFile(object):
         buf += "\n"
         return buf
 
+    @must_be_writable
+    @synchronized
+    def reparent(self, newparent, newname):
+        self.flush()
+        self.parent.remove(self.name)
+
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+        self._modified = True
 
 class ArvadosFileReader(ArvadosFileReaderBase):
     """Wraps ArvadosFile in a file-like object supporting reading only.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index d610c35..0ecc34a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -741,8 +741,8 @@ class RichCollectionBase(CollectionBase):
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False):
-        """Copy a file or subcollection to this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
         :source_obj:
           An ArvadosFile, or Subcollection object
@@ -754,6 +754,11 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
 
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
 
         if target_name in self and not overwrite:
@@ -763,15 +768,60 @@ class RichCollectionBase(CollectionBase):
         if target_name in self:
             modified_from = self[target_name]
 
-        # Actually make the copy.
-        dup = source_obj.clone(self, target_name)
-        self._items[target_name] = dup
+        # Actually make the move or copy.
+        if reparent:
+            source_obj.reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
         self._modified = True
 
         if modified_from:
-            self.notify(MOD, self, target_name, (modified_from, dup))
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found")
+            sourcecomponents = source.split("/")
+        else:
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
         else:
-            self.notify(ADD, self, target_name, dup)
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found.")
+
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
 
     @must_be_writable
     @synchronized
@@ -793,35 +843,35 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
-        if source_collection is None:
-            source_collection = self
 
-        # Find the object to copy
-        if isinstance(source, basestring):
-            source_obj = source_collection.find(source)
-            if source_obj is None:
-                raise IOError(errno.ENOENT, "File not found")
-            sourcecomponents = source.split("/")
-        else:
-            source_obj = source
-            sourcecomponents = None
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
 
-        # Find parent collection the target path
-        targetcomponents = target_path.split("/")
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
 
-        # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        :source:
+          A string with a path to source file or subcollection.
 
-        if not target_name:
-            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
-            target_dir = target_dir[target_name]
-            target_name = sourcecomponents[-1]
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
 
-        target_dir.add(source_obj, target_name, overwrite)
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection must be writable.")
+        target_dir.add(source_obj, target_name, overwrite, True)
 
     @synchronized
     def manifest_text(self, stream_name=".", strip=False, normalize=False):
@@ -1337,6 +1387,7 @@ class Collection(RichCollectionBase):
         if create_collection_record:
             if name is None:
                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
                     "name": name}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 92c778e..3041e28 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -450,8 +450,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__eq__from_writes(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("0123456789")
+                f = c2.open("count1.txt", "w")
+                f.write("0123456789")
 
                 self.assertTrue(c1["count1.txt"] == c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] != c2["count1.txt"])
@@ -459,8 +459,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__ne__(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("1234567890")
+                f = c2.open("count1.txt", "w")
+                f.write("1234567890")
 
                 self.assertTrue(c1["count1.txt"] != c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] == c2["count1.txt"])
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 8cf34f0..95b6dbe 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -886,6 +886,24 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c.copy("count1.txt", "foo/")
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
 
+    def test_rename_file(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.rename("count1.txt", "count2.txt")
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_dir(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.mkdirs("foo")
+        c.rename("count1.txt", "foo/count2.txt")
+        self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_other(self):
+        c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c2 = Collection()
+        c2.rename("count1.txt", "count2.txt", source_collection=c1)
+        self.assertEqual("", c1.manifest_text())
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
+
     def test_clone(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
         cl = c.clone()
@@ -985,8 +1003,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
                              ('add', './count2.txt', c2["count2.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so it should not be deleted.
         c1.apply(d)
@@ -997,8 +1015,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
         d = c1.diff(c2)
         self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so c2 mod will go to a conflict file
         c1.apply(d)
@@ -1010,8 +1028,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
                              ('add', './count1.txt', c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 added count1.txt, so c2 add will go to a conflict file
         c1.apply(d)
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index e5d8128..e140672 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -472,14 +472,15 @@ class Operations(llfuse.Operations):
     def rmdir(self, inode_parent, name):
         self.unlink(inode_parent, name)
 
-    # @catch_exceptions
-    # def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
-    #     src = self._check_writable(inode_parent_old)
-    #     dest = self._check_writable(inode_parent_new)
-    #
-    #     with llfuse.lock_released:
-    #         dest.collection.copy(name_old, name_new, source_collection=src.collection, overwrite=True)
-    #         src.collection.remove(name_old)
+    @catch_exceptions
+    def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
+        src = self._check_writable(inode_parent_old)
+        dest = self._check_writable(inode_parent_new)
+
+        with llfuse.lock_released:
+            dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+            dest.flush()
+            src.flush()
 
     @catch_exceptions
     def flush(self, fh):
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index ff641ca..70cfbef 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -462,6 +462,7 @@ class FuseWriteFileTest(MountTestBase):
         # Forturnately the multiprocessing module makes this relatively easy.
         pool = multiprocessing.Pool(1)
         self.assertTrue(pool.apply(fuseWriteFileTestHelper, (self.mounttmp,)))
+        pool.close()
 
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
         self.assertRegexpMatches(collection2["manifest_text"],
@@ -503,6 +504,7 @@ class FuseUpdateFileTest(MountTestBase):
         pool = multiprocessing.Pool(1)
         self.assertTrue(pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,)))
         self.assertTrue(pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,)))
+        pool.close()
 
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
         self.assertRegexpMatches(collection2["manifest_text"],

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list