[ARVADOS] updated: c30d3a7cc1c647ed5339ac15c8d8df07a76c712d
git at public.curoverse.com
git at public.curoverse.com
Wed Feb 4 13:20:25 EST 2015
Summary of changes:
sdk/python/arvados/arvfile.py | 7 +++
sdk/python/arvados/collection.py | 112 ++++++++++++++++++++++++++++++---------
2 files changed, 93 insertions(+), 26 deletions(-)
via c30d3a7cc1c647ed5339ac15c8d8df07a76c712d (commit)
from 130b8d3147af5240ba3118c56b0aa1f19d2d5c59 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c30d3a7cc1c647ed5339ac15c8d8df07a76c712d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Feb 4 13:22:34 2015 -0500
4823: Implemented diff/apply/update methods for Collection synchronization. Needs tests.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 2edd2e9..7ad763a 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -596,6 +596,13 @@ class ArvadosFile(object):
return cp
+ @_must_be_writable
+ @_synchronized
+ def replace_contents(self, other):
+ """Replace segments of this file with segments from another `ArvadosFile` object."""
+ self._segments = other.segments()
+ self._modified = True
+
@_synchronized
def __eq__(self, other):
if type(other) != ArvadosFile:
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 1f2974b..292d421 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -644,6 +644,7 @@ class ResumableCollectionWriter(CollectionWriter):
ADD = "add"
DEL = "del"
+MOD = "mod"
class SynchronizedCollectionBase(CollectionBase):
def __init__(self, parent=None):
@@ -716,7 +717,10 @@ class SynchronizedCollectionBase(CollectionBase):
self._items[p[0]] = item
self.notify(self, ADD, p[0], item)
del p[0]
- return item.find("/".join(p), create=create)
+ if isinstance(item, SynchronizedCollectionBase):
+ return item.find("/".join(p), create=create)
+ else:
+ raise errors.ArgumentError("Interior path components must be subcollection")
else:
return self
@@ -866,11 +870,11 @@ class SynchronizedCollectionBase(CollectionBase):
@_must_be_writable
@_synchronized
- def copy(self, source_path, target_path, source_collection=None, overwrite=False):
+ def copy(self, source, target_path, source_collection=None, overwrite=False):
"""Copy a file or subcollection to a new path in this collection.
- :source_path:
- Source file or subcollection
+ :source:
+ An ArvadosFile, Subcollection, or string with a path to source file or subcollection
:target_path:
Destination file or path. If the target path already exists and is a
@@ -888,20 +892,28 @@ class SynchronizedCollectionBase(CollectionBase):
source_collection = self
# Find the object to copy
- sp = source_path.split("/")
- source_obj = source_collection.find(source_path)
- if source_obj is None:
- raise IOError((errno.ENOENT, "File not found"))
+ if isinstance(source, basestring):
+ source_obj = source_collection.find(source)
+ if source_obj is None:
+ raise IOError((errno.ENOENT, "File not found"))
+ sp = source.split("/")
+ else:
+ source_obj = source
+ sp = None
# Find parent collection the target path
tp = target_path.split("/")
- target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
# Determine the name to use.
- target_name = tp[-1] if tp[-1] else sp[-1]
+ target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
+
+ if not target_name:
+ raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
+
+ target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
if target_name in target_dir:
- if isinstance(target_dir[target_name], SynchronizedCollectionBase):
+ if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
target_dir = target_dir[target_name]
target_name = sp[-1]
elif not overwrite:
@@ -938,22 +950,67 @@ class SynchronizedCollectionBase(CollectionBase):
else:
return self._manifest_text
+ @_synchronized
+ def diff(self, start_collection, prefix="."):
+ """
+ Generate list of add/delete actions which change `start_collection` to result in `self`
+ """
+ changes = []
+ for k in start_collection:
+ if k not in self:
+ changes.append((DEL, os.path.join(prefix, k), start_collection[k]))
+ for k in self:
+ if k in start_collection:
+ if isinstance(self[k], Subcollection) and isinstance(start_collection[k], Subcollection):
+ changes.extend(self[k].diff(start_collection[k], os.path.join(prefix, k)))
+ elif self[k] != start_collection[k]:
+ changes.append((MOD, os.path.join(prefix, k), start_collection[k], self[k]))
+ else:
+ changes.append((ADD, os.path.join(prefix, k), self[k]))
+ return changes
+
@_must_be_writable
@_synchronized
- def merge(self, other):
- for k in other.keys():
- if k in self:
- if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
- self[k].merge(other[k])
+ def apply(self, changes):
+ """
+ Apply changes from `diff`. If a change conflicts with a local change, it
+ will be saved to an alternate path indicating the conflict.
+ """
+ for c in changes:
+ path = c[1]
+ initial = c[2]
+ local = self.find(path)
+ conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
+ time.gmtime()))
+ if c[0] == ADD:
+ if local is None:
+ # No local file at path, safe to copy over new file
+ self.copy(initial, path)
+ elif local is not None and local != initial:
+ # There is already local file and it is different:
+ # save change to conflict file.
+ self.copy(initial, conflictpath)
+ elif c[1] == MOD:
+ if local == initial:
+ # Local matches the "initial" item so assume it hasn't
+ # changed locally and is safe to update.
+ if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
+ # Replace contents of local file with new contents
+ local.replace_contents(c[3])
+ else:
+ # Overwrite path with new item; this can happen if if
+ # path was a file and is now a collection or vice versa
+ self.copy(c[3], path, overwrite=True)
else:
- if self[k] != other[k]:
- name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d_%H:%M:%S",
- time.gmtime()))
- self._items[name] = other[k].clone(self)
- self.notify(self, name, ADD, self[name])
- else:
- self._items[k] = other[k].clone(self)
- self.notify(self, k, ADD, self[k])
+ # Local is missing (presumably deleted) or local doesn't
+ # match the "start" value, so save change to conflict file
+ self.copy(c[3], conflictpath)
+ elif c[1] == DEL:
+ if local == initial
+ # Local item matches "initial" value, so it is safe to remove.
+ self.remove(path, rm_r=True)
+ # else, the file is modified or already removed, in either
+ # case we don't want to try to remove it.
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
@@ -1016,6 +1073,7 @@ class Collection(SynchronizedCollectionBase):
self.lock = threading.RLock()
self.callbacks = []
self.events = None
+ self._baseline_manifest
if manifest_locator_or_text:
if re.match(util.keep_locator_pattern, manifest_locator_or_text):
@@ -1052,9 +1110,10 @@ class Collection(SynchronizedCollectionBase):
@_synchronized
def update(self):
- n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
+ n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
other = import_collection(n["manifest_text"])
- self.merge(other)
+ baseline = import_collection(self._baseline_manifest)
+ self.apply(other.diff(baseline))
@_synchronized
def _my_api(self):
@@ -1136,6 +1195,7 @@ class Collection(SynchronizedCollectionBase):
error_via_api,
error_via_keep))
# populate
+ self._baseline_manifest = self._manifest_text
import_manifest(self._manifest_text, self)
if self._sync == SYNC_READONLY:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list