[ARVADOS] updated: da63a23ebc1d936305324cbc66d9903b9129df94

git at public.curoverse.com git at public.curoverse.com
Thu Feb 5 13:44:32 EST 2015


Summary of changes:
 sdk/python/arvados/arvfile.py    |  44 ++++++++++-----
 sdk/python/arvados/collection.py | 116 ++++++++++++++++++++++-----------------
 sdk/python/tests/test_arvfile.py |  22 ++++----
 3 files changed, 110 insertions(+), 72 deletions(-)

       via  da63a23ebc1d936305324cbc66d9903b9129df94 (commit)
      from  fef791011864b1bd69dfd01bfcd26c4942d6d5a1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit da63a23ebc1d936305324cbc66d9903b9129df94
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Feb 5 13:46:42 2015 -0500

    4823: Add root_collection() to make it easier to find the root.  Adjusted
    semantics of save(), renamed save_as() to save_newC().  Now call save() on
    close() when in SYNC_LIVE.  Release readfrom() now releases collection lock
    before actually fetching blocks.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 4edecf7..acb02a0 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -546,9 +546,10 @@ class BlockManager(object):
 
 
 class ArvadosFile(object):
-    """
-    ArvadosFile manages the underlying representation of a file in Keep as a sequence of
-    segments spanning a set of blocks, and implements random read/write access.
+    """ArvadosFile manages the underlying representation of a file in Keep as a
+    sequence of segments spanning a set of blocks, and implements random
+    read/write access.  This object may be accessed from multiple threads.
+
     """
 
     def __init__(self, parent, stream=[], segments=[]):
@@ -562,7 +563,7 @@ class ArvadosFile(object):
         self.parent = parent
         self._modified = True
         self._segments = []
-        self.lock = parent._root_lock()
+        self.lock = parent.root_collection().lock
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
@@ -649,19 +650,21 @@ class ArvadosFile(object):
         elif size > self.size():
             raise IOError("truncate() does not support extending the file size")
 
-    @synchronized
     def readfrom(self, offset, size, num_retries):
         """
         read upto `size` bytes from the file starting at `offset`.
         """
-        if size == 0 or offset >= self.size():
-            return ''
-        data = []
+        with self.lock:
+            if size == 0 or offset >= self.size():
+                return ''
+            prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
+            readsegs = locators_and_ranges(self._segments, offset, size)
 
-        for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
+        for lr in prefetch:
             self.parent._my_block_manager().block_prefetch(lr.locator)
 
-        for lr in locators_and_ranges(self._segments, offset, size):
+        data = []
+        for lr in readsegs:
             d = self.parent._my_block_manager().get_block(lr.locator, num_retries=num_retries, cache_only=bool(data))
             if d:
                 data.append(d[lr.segment_offset:lr.segment_offset+lr.segment_size])
@@ -753,6 +756,11 @@ class ArvadosFile(object):
             return 0
 
 class ArvadosFileReader(ArvadosFileReaderBase):
+    """Wraps ArvadosFile in a file-like object supporting reading only.  Be aware
+    that this class is NOT thread safe as there is no locking around updating file
+    pointer.
+    """
+
     def __init__(self, arvadosfile, name, mode="r", num_retries=None):
         super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
         self.arvadosfile = arvadosfile
@@ -764,7 +772,7 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     @retry_method
     def read(self, size, num_retries=None):
         """Read up to `size` bytes from the stream, starting at the current file position"""
-        data = self.arvadosfile.readfrom(self._filepos, size, num_retries=num_retries)
+        data = self.arvadosfile.readfrom(self._filepos, size, num_retries)
         self._filepos += len(data)
         return data
 
@@ -779,6 +787,12 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
 
 class ArvadosFileWriter(ArvadosFileReader):
+    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
+    Be aware that this class is NOT thread safe as there is no locking around
+    updating file pointer.
+
+    """
+
     def __init__(self, arvadosfile, name, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
 
@@ -786,7 +800,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     @retry_method
     def write(self, data, num_retries=None):
         if self.mode[0] == "a":
-            self.arvadosfile.writeto(self.size(), data)
+            self.arvadosfile.writeto(self.size(), data, num_retries)
         else:
             self.arvadosfile.writeto(self._filepos, data, num_retries)
             self._filepos += len(data)
@@ -795,7 +809,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
-            self.write(s)
+            self.write(s, num_retries)
 
     def truncate(self, size=None):
         if size is None:
@@ -803,3 +817,7 @@ class ArvadosFileWriter(ArvadosFileReader):
         self.arvadosfile.truncate(size)
         if self._filepos > self.size():
             self._filepos = self.size()
+
+    def close(self):
+        if self.arvadosfile.parent.sync_mode() == SYNC_LIVE:
+            self.arvadosfile.parent.root_collection().save()
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index b1a3050..42db95e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -662,15 +662,15 @@ class SynchronizedCollectionBase(CollectionBase):
     def _my_block_manager(self):
         raise NotImplementedError()
 
-    def _root_lock(self):
-        raise NotImplementedError()
-
     def _populate(self):
         raise NotImplementedError()
 
     def sync_mode(self):
         raise NotImplementedError()
 
+    def root_collection(self):
+        raise NotImplementedError()
+
     def notify(self, event, collection, name, item):
         raise NotImplementedError()
 
@@ -1057,7 +1057,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
 class Collection(SynchronizedCollectionBase):
     """Store an Arvados collection, consisting of a set of files and
-    sub-collections.
+    sub-collections.  This object
     """
 
     def __init__(self, manifest_locator_or_text=None,
@@ -1125,8 +1125,8 @@ class Collection(SynchronizedCollectionBase):
             self._populate()
 
             if self._sync == SYNC_LIVE:
-                if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
-                    raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
+                if not self._has_collection_uuid():
+                    raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
                 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
 
     @staticmethod
@@ -1135,8 +1135,8 @@ class Collection(SynchronizedCollectionBase):
         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
         return c
 
-    def _root_lock(self):
-        return self.lock
+    def root_collection(self):
+        return self
 
     def sync_mode(self):
         return self._sync
@@ -1243,13 +1243,16 @@ class Collection(SynchronizedCollectionBase):
             # forego any further locking.
             self.lock = NoopLock()
 
+    def _has_collection_uuid(self):
+        return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
+
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block"""
-        if self._sync != SYNC_READONLY:
-            self.save(allow_no_locator=True)
+        if self._sync != SYNC_READONLY and self._has_collection_uuid():
+            self.save()
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
@@ -1278,42 +1281,50 @@ class Collection(SynchronizedCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
-    def save(self, update=True, allow_no_locator=False, num_retries=None):
-        """Commit pending buffer blocks to Keep, write the manifest to Keep, and
-        update the collection record to Keep.
+    def save(self, merge=True, num_retries=None):
+        """Commit pending buffer blocks to Keep, merge with remote record (if
+        update=True), write the manifest to Keep, and update the collection
+        record.  Will raise AssertionError if not associated with a collection
+        record on the API server.  If you want to save a manifest to Keep only,
+        see `save_new()`.
 
         :update:
-          Update and merge remote changes before saving back.
+          Update and merge remote changes before saving.  Otherwise, any
+          remote changes will be ignored and overwritten.
 
-        :allow_no_locator:
-          If there is no collection uuid associated with this
-          Collection and `allow_no_locator` is False, raise an error.  If True,
-          do not raise an error.
         """
         if self.modified():
+            if not self._has_collection_uuid():
+                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
             self._my_block_manager().commit_all()
-            if update and self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+            if merge:
                 self.update()
             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
-            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
-                self._api_response = self._my_api().collections().update(
-                    uuid=self._manifest_locator,
-                    body={'manifest_text': self.manifest_text(strip=False)}
-                    ).execute(
-                        num_retries=num_retries)
-            elif not allow_no_locator:
-                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+
+            mt = self.manifest_text(strip=False)
+            self._api_response = self._my_api().collections().update(
+                uuid=self._manifest_locator,
+                body={'manifest_text': mt}
+                ).execute(
+                    num_retries=num_retries)
+            self._manifest_text = mt
             self.set_unmodified()
 
     @must_be_writable
     @synchronized
     @retry_method
-    def save_as(self, name, owner_uuid=None, ensure_unique_name=False, num_retries=None):
-        """Save a new collection record.
+    def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
+        """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
+        a new collection record (if create_collection_record True).  After
+        creating a new collection record, this Collection object will be
+        associated with the new record for `save()` and SYNC_LIVE updates.
 
         :name:
           The collection name.
 
+        :keep_only:
+          Only save the manifest to keep, do not create a collection record.
+
         :owner_uuid:
           the user, or project uuid that will own this collection.
           If None, defaults to the current user.
@@ -1326,20 +1337,28 @@ class Collection(SynchronizedCollectionBase):
         """
         self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
-        body = {"manifest_text": self.manifest_text(strip=False),
-                "name": name}
-        if owner_uuid:
-            body["owner_uuid"] = owner_uuid
-        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+        mt = self.manifest_text(strip=False)
+
+        if create_collection_record:
+            if name is None:
+                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
 
-        if self.events:
-            self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            body = {"manifest_text": mt,
+                    "name": name}
+            if owner_uuid:
+                body["owner_uuid"] = owner_uuid
 
-        self._manifest_locator = self._api_response["uuid"]
+            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
 
-        if self.events:
-            self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+            if self.events:
+                self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
 
+            self._manifest_locator = self._api_response["uuid"]
+
+            if self.events:
+                self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
+
+        self._manifest_text = mt
         self.set_unmodified()
 
     @synchronized
@@ -1361,33 +1380,32 @@ class Subcollection(SynchronizedCollectionBase):
 
     def __init__(self, parent):
         super(Subcollection, self).__init__(parent)
-        self.lock = parent._root_lock()
+        self.lock = self.root_collection().lock
 
-    def _root_lock(self):
-        return self.parent._root_lock()
+    def root_collection(self):
+        return self.parent.root_collection()
 
     def sync_mode(self):
-        return self.parent.sync_mode()
+        return self.root_collection().sync_mode()
 
     def _my_api(self):
-        return self.parent._my_api()
+        return self.root_collection()._my_api()
 
     def _my_keep(self):
-        return self.parent._my_keep()
+        return self.root_collection()._my_keep()
 
     def _my_block_manager(self):
-        return self.parent._my_block_manager()
+        return self.root_collection()._my_block_manager()
 
     def _populate(self):
-        self.parent._populate()
+        self.root_collection()._populate()
 
     def notify(self, event, collection, name, item):
-        self.parent.notify(event, collection, name, item)
+        return self.root_collection().notify(event, collection, name, item)
 
     @synchronized
     def clone(self, new_parent):
         c = Subcollection(new_parent)
-        c._items = {}
         self._cloneinto(c)
         return c
 
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 8c2af7b..704ec61 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -68,7 +68,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
 
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
-            c.save_as("test_truncate")
+            c.save_new("test_truncate")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
 
@@ -91,7 +91,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
             self.assertEqual(None, keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
-            c.save_as("test_append")
+            c.save_new("test_append")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
             self.assertEqual("foo", keep.get("acbd18db4cc2f85cedef654fccc4a4d8+3"))
@@ -171,7 +171,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
 
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
-            c.save_as("test_write_large")
+            c.save_new("test_write_large")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
 
@@ -231,7 +231,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
 
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
-            c.save_as("test_write_large")
+            c.save_new("test_write_large")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
 
@@ -249,7 +249,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
             self.assertEqual(None, keep.get("2e9ec317e197819358fbc43afca7d837+8"))
-            c.save_as("test_create")
+            c.save_new("test_create")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
             self.assertEqual("01234567", keep.get("2e9ec317e197819358fbc43afca7d837+8"))
@@ -263,7 +263,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         with arvados.Collection(api_client=api, keep_client=keep, sync=SYNC_EXPLICIT) as c:
             writer = c.open("foo/bar/count.txt", "w+")
             writer.write("01234567")
-            c.save_as("test_create")
+            c.save_new("test_create")
 
     def test_overwrite(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
@@ -279,7 +279,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
 
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
-            c.save_as("test_overwrite")
+            c.save_new("test_overwrite")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
 
@@ -309,7 +309,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.assertEqual(None, c._manifest_locator)
             self.assertEqual(True, c.modified())
             self.assertEqual(None, keep.get("2e9ec317e197819358fbc43afca7d837+8"))
-            c.save_as("test_create_multiple")
+            c.save_new("test_create_multiple")
             self.assertEqual("zzzzz-4zz18-mockcollection0", c._manifest_locator)
             self.assertEqual(False, c.modified())
             self.assertEqual("01234567", keep.get("2e9ec317e197819358fbc43afca7d837+8"))
@@ -333,6 +333,10 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         def __init__(self, blocks, nocache):
             self.blocks = blocks
             self.nocache = nocache
+            self.lock = arvados.arvfile.NoopLock()
+
+        def root_collection(self):
+            return self
 
         def _my_block_manager(self):
             return ArvadosFileReaderTestCase.MockParent.MockBlockMgr(self.blocks, self.nocache)
@@ -340,8 +344,6 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         def sync_mode(self):
             return SYNC_READONLY
 
-        def _root_lock(self):
-            return arvados.arvfile.NoopLock()
 
     def make_count_reader(self, nocache=False):
         stream = []

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list