[ARVADOS] updated: c30d3a7cc1c647ed5339ac15c8d8df07a76c712d

git at public.curoverse.com git at public.curoverse.com
Wed Feb 4 13:20:25 EST 2015


Summary of changes:
 sdk/python/arvados/arvfile.py    |   7 +++
 sdk/python/arvados/collection.py | 112 ++++++++++++++++++++++++++++++---------
 2 files changed, 93 insertions(+), 26 deletions(-)

       via  c30d3a7cc1c647ed5339ac15c8d8df07a76c712d (commit)
      from  130b8d3147af5240ba3118c56b0aa1f19d2d5c59 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c30d3a7cc1c647ed5339ac15c8d8df07a76c712d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Feb 4 13:22:34 2015 -0500

    4823: Implemented diff/apply/update methods for Collection synchronization.  Needs tests.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2edd2e9..7ad763a 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -596,6 +596,13 @@ class ArvadosFile(object):
 
         return cp
 
+    @_must_be_writable
+    @_synchronized
+    def replace_contents(self, other):
+        """Replace segments of this file with segments from another `ArvadosFile` object."""
+        self._segments = other.segments()
+        self._modified = True
+
     @_synchronized
     def __eq__(self, other):
         if type(other) != ArvadosFile:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 1f2974b..292d421 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -644,6 +644,7 @@ class ResumableCollectionWriter(CollectionWriter):
 
 ADD = "add"
 DEL = "del"
+MOD = "mod"
 
 class SynchronizedCollectionBase(CollectionBase):
     def __init__(self, parent=None):
@@ -716,7 +717,10 @@ class SynchronizedCollectionBase(CollectionBase):
                     self._items[p[0]] = item
                     self.notify(self, ADD, p[0], item)
                 del p[0]
-                return item.find("/".join(p), create=create)
+                if isinstance(item, SynchronizedCollectionBase):
+                    return item.find("/".join(p), create=create)
+                else:
+                    raise errors.ArgumentError("Interior path components must be subcollection")
         else:
             return self
 
@@ -866,11 +870,11 @@ class SynchronizedCollectionBase(CollectionBase):
 
     @_must_be_writable
     @_synchronized
-    def copy(self, source_path, target_path, source_collection=None, overwrite=False):
+    def copy(self, source, target_path, source_collection=None, overwrite=False):
         """Copy a file or subcollection to a new path in this collection.
 
-        :source_path:
-          Source file or subcollection
+        :source:
+          An ArvadosFile, Subcollection, or string with a path to source file or subcollection
 
         :target_path:
           Destination file or path.  If the target path already exists and is a
@@ -888,20 +892,28 @@ class SynchronizedCollectionBase(CollectionBase):
             source_collection = self
 
         # Find the object to copy
-        sp = source_path.split("/")
-        source_obj = source_collection.find(source_path)
-        if source_obj is None:
-            raise IOError((errno.ENOENT, "File not found"))
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError((errno.ENOENT, "File not found"))
+            sp = source.split("/")
+        else:
+            source_obj = source
+            sp = None
 
         # Find parent collection the target path
         tp = target_path.split("/")
-        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
 
         # Determine the name to use.
-        target_name = tp[-1] if tp[-1] else sp[-1]
+        target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
 
         if target_name in target_dir:
-            if isinstance(target_dir[target_name], SynchronizedCollectionBase):
+            if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
                 target_dir = target_dir[target_name]
                 target_name = sp[-1]
             elif not overwrite:
@@ -938,22 +950,67 @@ class SynchronizedCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
+    @_synchronized
+    def diff(self, start_collection, prefix="."):
+        """
+        Generate list of add/delete actions which change `start_collection` to result in `self`
+        """
+        changes = []
+        for k in start_collection:
+            if k not in self:
+               changes.append((DEL, os.path.join(prefix, k), start_collection[k]))
+        for k in self:
+            if k in start_collection:
+                if isinstance(self[k], Subcollection) and isinstance(start_collection[k], Subcollection):
+                    changes.extend(self[k].diff(start_collection[k], os.path.join(prefix, k)))
+                elif self[k] != start_collection[k]:
+                    changes.append((MOD, os.path.join(prefix, k), start_collection[k], self[k]))
+            else:
+                changes.append((ADD, os.path.join(prefix, k), self[k]))
+        return changes
+
     @_must_be_writable
     @_synchronized
-    def merge(self, other):
-        for k in other.keys():
-            if k in self:
-                if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
-                    self[k].merge(other[k])
+    def apply(self, changes):
+        """
+        Apply changes from `diff`.  If a change conflicts with a local change, it
+        will be saved to an alternate path indicating the conflict.
+        """
+        for c in changes:
+            path = c[1]
+            initial = c[2]
+            local = self.find(path)
+            conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+                                                                    time.gmtime()))
+            if c[0] == ADD:
+                if local is None:
+                    # No local file at path, safe to copy over new file
+                    self.copy(initial, path)
+                elif local is not None and local != initial:
+                    # There is already local file and it is different:
+                    # save change to conflict file.
+                    self.copy(initial, conflictpath)
+            elif c[1] == MOD:
+                if local == initial:
+                    # Local matches the "initial" item so assume it hasn't
+                    # changed locally and is safe to update.
+                    if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
+                        # Replace contents of local file with new contents
+                        local.replace_contents(c[3])
+                    else:
+                        # Overwrite path with new item; this can happen if if
+                        # path was a file and is now a collection or vice versa
+                        self.copy(c[3], path, overwrite=True)
                 else:
-                    if self[k] != other[k]:
-                        name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d_%H:%M:%S",
-                                                                     time.gmtime()))
-                        self._items[name] = other[k].clone(self)
-                        self.notify(self, name, ADD, self[name])
-            else:
-                self._items[k] = other[k].clone(self)
-                self.notify(self, k, ADD, self[k])
+                    # Local is missing (presumably deleted) or local doesn't
+                    # match the "start" value, so save change to conflict file
+                    self.copy(c[3], conflictpath)
+            elif c[1] == DEL:
+                if local == initial
+                    # Local item matches "initial" value, so it is safe to remove.
+                    self.remove(path, rm_r=True)
+                # else, the file is modified or already removed, in either
+                # case we don't want to try to remove it.
 
     def portable_data_hash(self):
         """Get the portable data hash for this collection's manifest."""
@@ -1016,6 +1073,7 @@ class Collection(SynchronizedCollectionBase):
         self.lock = threading.RLock()
         self.callbacks = []
         self.events = None
+        self._baseline_manifest
 
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
@@ -1052,9 +1110,10 @@ class Collection(SynchronizedCollectionBase):
 
     @_synchronized
     def update(self):
-        n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
+        n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
         other = import_collection(n["manifest_text"])
-        self.merge(other)
+        baseline = import_collection(self._baseline_manifest)
+        self.apply(other.diff(baseline))
 
     @_synchronized
     def _my_api(self):
@@ -1136,6 +1195,7 @@ class Collection(SynchronizedCollectionBase):
                     error_via_api,
                     error_via_keep))
         # populate
+        self._baseline_manifest = self._manifest_text
         import_manifest(self._manifest_text, self)
 
         if self._sync == SYNC_READONLY:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list