[ARVADOS] updated: 40e42b7c37293f7482ea75b080dc3742f959e4dd

Git user git at public.curoverse.com
Tue Jul 11 10:59:19 EDT 2017


Summary of changes:
 sdk/python/arvados/keep.py             | 35 ++++++++------------
 sdk/python/arvados/util.py             | 17 ++++------
 services/fuse/arvados_fuse/__init__.py | 30 +++++++++---------
 services/fuse/arvados_fuse/fresh.py    |  3 ++
 services/fuse/arvados_fuse/fusedir.py  | 58 ++++++++++++++++++++++++++++++----
 5 files changed, 90 insertions(+), 53 deletions(-)

       via  40e42b7c37293f7482ea75b080dc3742f959e4dd (commit)
       via  670f4b62dd6861ea170a8d63d6070944f7a4c8e7 (commit)
      from  b25cbdbd1b54a622f4428e29ccffeb165ef29b45 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 40e42b7c37293f7482ea75b080dc3742f959e4dd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 11 10:58:50 2017 -0400

    11158: Support incremental update of project contents based on websocket events.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 1bfd517..49cb8fd 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -390,30 +390,30 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def on_event(self, ev):
-        if 'event_type' not in ev:
+        if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
             return
         with llfuse.lock:
-            new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
-            pdh = new_attrs.get("portable_data_hash")
-            # new_attributes.modified_at currently lacks
-            # subsecond precision (see #6347) so use event_at
-            # which should always be the same.
-            stamp = ev.get("event_at")
+            properties = ev.get("properties") or {}
+            old_attrs = properties.get("old_attributes") or {}
+            new_attrs = properties.get("new_attributes") or {}
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if stamp and pdh and ev.get("object_kind") == "arvados#collection":
-                    item.update(to_record_version=(stamp, pdh))
-                else:
-                    item.update()
-
-            oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+                if ev.get("object_kind") == "arvados#collection":
+                    pdh = new_attrs.get("portable_data_hash")
+                    # new_attributes.modified_at currently lacks
+                    # subsecond precision (see #6347) so use event_at
+                    # which should always be the same.
+                    stamp = ev.get("event_at")
+                    if stamp and pdh and item.writable() and item.collection is not None and item.collection.modified():
+                        item.update(to_record_version=(stamp, pdh))
+
+            oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.invalidate()
-                parent.update()
+                parent.child_event(ev)
 
     @catch_exceptions
     def getattr(self, inode, ctx=None):
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 34295ef..a51dd90 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -139,3 +139,6 @@ class FreshBase(object):
 
     def finalize(self):
         pass
+
+    def child_event(self, ev):
+        pass
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 2d032bd..6dfb8c4 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -773,7 +773,7 @@ class ProjectDirectory(Directory):
         if 'name' in i:
             if i['name'] is None or len(i['name']) == 0:
                 return None
-            elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
+            elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
                 # collection or subproject
                 return i['name']
             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
@@ -832,6 +832,11 @@ class ProjectDirectory(Directory):
         finally:
             self._updating_lock.release()
 
+    def _add_entry(self, i, name):
+        ent = self.createDirectory(i)
+        self._entries[name] = self.inodes.add_entry(ent)
+        return self._entries[name]
+
     @use_counter
     @check_update
     def __getitem__(self, k):
@@ -849,13 +854,11 @@ class ProjectDirectory(Directory):
                                                                 ["name", "=", k]],
                                                        limit=1).execute(num_retries=self.num_retries)["items"]
         if contents:
-            i = contents[0]
-            name = sanitize_filename(self.namefn(i))
+            name = sanitize_filename(self.namefn(contents[0]))
             if name != k:
                 raise KeyError(k)
-            ent = self.createDirectory(i)
-            self._entries[name] = self.inodes.add_entry(ent)
-            return self._entries[name]
+            return self._add_entry(contents[0], name)
+
         # Didn't find item
         raise KeyError(k)
 
@@ -933,6 +936,49 @@ class ProjectDirectory(Directory):
         self._entries[name_new] = ent
         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
+    @use_counter
+    def child_event(self, ev):
+        properties = ev.get("properties") or {}
+        old_attrs = properties.get("old_attributes") or {}
+        new_attrs = properties.get("new_attributes") or {}
+        old_attrs["uuid"] = ev["object_uuid"]
+        new_attrs["uuid"] = ev["object_uuid"]
+        old_name = sanitize_filename(self.namefn(old_attrs))
+        new_name = sanitize_filename(self.namefn(new_attrs))
+
+        # create events will have a new name, but not an old name
+        # delete events will have an old name, but not a new name
+        # update events will have an old and new name, and they may be same or different
+        # if they are the same, an unrelated field changed and there is nothing to do.
+
+        if old_attrs.get("owner_uuid") != self.project_uuid:
+            # Was moved from somewhere else, so don't try to remove entry.
+            old_name = None
+        if ev.get("object_owner_uuid") != self.project_uuid:
+            # Was moved to somewhere else, so don't try to add entry
+            new_name = None
+
+        if ev.get("object_kind") == "arvados#collection" and old_attrs == new_attrs:
+            with llfuse.lock_released:
+                cr = self.api.collections().list(filters=[["uuid", "=", ev["object_uuid"]]], include_trash=True).execute(num_retries=self.num_retries)
+                if cr['items'] and cr['items'][0]['is_trashed']:
+                    new_name = None
+
+        if new_name != old_name:
+            ent = None
+            if old_name in self._entries:
+                ent = self._entries[old_name]
+                del self._entries[old_name]
+            self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+            if new_name:
+                if ent:
+                    self._entries[new_name] = ent
+                else:
+                    self._add_entry(new_attrs, new_name)
+            elif ent:
+                self.inodes.del_entry(ent)
+
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""

commit 670f4b62dd6861ea170a8d63d6070944f7a4c8e7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jul 10 16:52:41 2017 -0400

    Revert python sdk changes.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4103b30..e6e93f0 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,7 +27,6 @@ import sys
 import threading
 from . import timer
 import urllib.parse
-import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -541,7 +540,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-
+    
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +551,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-
+        
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-
+        
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-
+        
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +612,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-
+        
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-
+        
         def done(self):
             return self.queue.successful_copies
-
+        
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-
+        
         def response(self):
             return self.queue.response
-
-
+    
+    
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1131,7 +1130,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1188,16 +1187,8 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-
-        try:
-            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-                return f.read()
-        except IOError as e:
-            if e.errno == errno.ENOENT:
-                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
-            else:
-                raise
-
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+            return f.read()
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 1ab9996..97e1d26 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,14 @@ def is_hex(s, *length_args):
 def list_all(fn, num_retries=0, **kwargs):
     # Default limit to (effectively) api server's MAX_LIMIT
     kwargs.setdefault('limit', sys.maxsize)
-    kwargs.setdefault('order', 'uuid asc')
-    kwargs.setdefault('count', 'none')
-    addfilters = kwargs.get("filters", [])
     items = []
-    while True:
-        c = fn(**kwargs).execute(num_retries=num_retries)
-        items.extend(c['items'])
-        if len(c['items']) < c['limit']:
-            # Didn't return a full page, so we're done.
-            break
-        kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
+    offset = 0
+    items_available = sys.maxsize
+    while len(items) < items_available:
+        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+        items += c['items']
+        items_available = c['items_available']
+        offset = c['offset'] + len(c['items'])
     return items
 
 def ca_certs_path(fallback=httplib2.CA_CERTS):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list