[ARVADOS] updated: 40e42b7c37293f7482ea75b080dc3742f959e4dd
Git user
git at public.curoverse.com
Tue Jul 11 10:59:19 EDT 2017
Summary of changes:
sdk/python/arvados/keep.py | 35 ++++++++------------
sdk/python/arvados/util.py | 17 ++++------
services/fuse/arvados_fuse/__init__.py | 30 +++++++++---------
services/fuse/arvados_fuse/fresh.py | 3 ++
services/fuse/arvados_fuse/fusedir.py | 58 ++++++++++++++++++++++++++++++----
5 files changed, 90 insertions(+), 53 deletions(-)
via 40e42b7c37293f7482ea75b080dc3742f959e4dd (commit)
via 670f4b62dd6861ea170a8d63d6070944f7a4c8e7 (commit)
from b25cbdbd1b54a622f4428e29ccffeb165ef29b45 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 40e42b7c37293f7482ea75b080dc3742f959e4dd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jul 11 10:58:50 2017 -0400
11158: Support incremental update of project contents based on websocket events.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 1bfd517..49cb8fd 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -390,30 +390,30 @@ class Operations(llfuse.Operations):
@catch_exceptions
def on_event(self, ev):
- if 'event_type' not in ev:
+ if 'event_type' not in ev or ev["event_type"] not in ("create", "update", "delete"):
return
with llfuse.lock:
- new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
- pdh = new_attrs.get("portable_data_hash")
- # new_attributes.modified_at currently lacks
- # subsecond precision (see #6347) so use event_at
- # which should always be the same.
- stamp = ev.get("event_at")
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
item.invalidate()
- if stamp and pdh and ev.get("object_kind") == "arvados#collection":
- item.update(to_record_version=(stamp, pdh))
- else:
- item.update()
-
- oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
+ if ev.get("object_kind") == "arvados#collection":
+ pdh = new_attrs.get("portable_data_hash")
+ # new_attributes.modified_at currently lacks
+ # subsecond precision (see #6347) so use event_at
+ # which should always be the same.
+ stamp = ev.get("event_at")
+ if stamp and pdh and item.writable() and item.collection is not None and item.collection.modified():
+ item.update(to_record_version=(stamp, pdh))
+
+ oldowner = old_attrs.get("owner_uuid")
newowner = ev.get("object_owner_uuid")
for parent in (
self.inodes.inode_cache.find_by_uuid(oldowner) +
self.inodes.inode_cache.find_by_uuid(newowner)):
- parent.invalidate()
- parent.update()
+ parent.child_event(ev)
@catch_exceptions
def getattr(self, inode, ctx=None):
diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py
index 34295ef..a51dd90 100644
--- a/services/fuse/arvados_fuse/fresh.py
+++ b/services/fuse/arvados_fuse/fresh.py
@@ -139,3 +139,6 @@ class FreshBase(object):
def finalize(self):
pass
+
+ def child_event(self, ev):
+ pass
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 2d032bd..6dfb8c4 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -773,7 +773,7 @@ class ProjectDirectory(Directory):
if 'name' in i:
if i['name'] is None or len(i['name']) == 0:
return None
- elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
+ elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
# collection or subproject
return i['name']
elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
@@ -832,6 +832,11 @@ class ProjectDirectory(Directory):
finally:
self._updating_lock.release()
+ def _add_entry(self, i, name):
+ ent = self.createDirectory(i)
+ self._entries[name] = self.inodes.add_entry(ent)
+ return self._entries[name]
+
@use_counter
@check_update
def __getitem__(self, k):
@@ -849,13 +854,11 @@ class ProjectDirectory(Directory):
["name", "=", k]],
limit=1).execute(num_retries=self.num_retries)["items"]
if contents:
- i = contents[0]
- name = sanitize_filename(self.namefn(i))
+ name = sanitize_filename(self.namefn(contents[0]))
if name != k:
raise KeyError(k)
- ent = self.createDirectory(i)
- self._entries[name] = self.inodes.add_entry(ent)
- return self._entries[name]
+ return self._add_entry(contents[0], name)
+
# Didn't find item
raise KeyError(k)
@@ -933,6 +936,49 @@ class ProjectDirectory(Directory):
self._entries[name_new] = ent
self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+ @use_counter
+ def child_event(self, ev):
+ properties = ev.get("properties") or {}
+ old_attrs = properties.get("old_attributes") or {}
+ new_attrs = properties.get("new_attributes") or {}
+ old_attrs["uuid"] = ev["object_uuid"]
+ new_attrs["uuid"] = ev["object_uuid"]
+ old_name = sanitize_filename(self.namefn(old_attrs))
+ new_name = sanitize_filename(self.namefn(new_attrs))
+
+ # create events will have a new name, but not an old name
+ # delete events will have an old name, but not a new name
+ # update events will have an old and new name, and they may be same or different
+ # if they are the same, an unrelated field changed and there is nothing to do.
+
+ if old_attrs.get("owner_uuid") != self.project_uuid:
+ # Was moved from somewhere else, so don't try to remove entry.
+ old_name = None
+ if ev.get("object_owner_uuid") != self.project_uuid:
+ # Was moved to somewhere else, so don't try to add entry
+ new_name = None
+
+ if ev.get("object_kind") == "arvados#collection" and old_attrs == new_attrs:
+ with llfuse.lock_released:
+ cr = self.api.collections().list(filters=[["uuid", "=", ev["object_uuid"]]], include_trash=True).execute(num_retries=self.num_retries)
+ if cr['items'] and cr['items'][0]['is_trashed']:
+ new_name = None
+
+ if new_name != old_name:
+ ent = None
+ if old_name in self._entries:
+ ent = self._entries[old_name]
+ del self._entries[old_name]
+ self.inodes.invalidate_entry(self.inode, old_name.encode(self.inodes.encoding))
+
+ if new_name:
+ if ent:
+ self._entries[new_name] = ent
+ else:
+ self._add_entry(new_attrs, new_name)
+ elif ent:
+ self.inodes.del_entry(ent)
+
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
commit 670f4b62dd6861ea170a8d63d6070944f7a4c8e7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jul 10 16:52:41 2017 -0400
Revert python sdk changes.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 4103b30..e6e93f0 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -27,7 +27,6 @@ import sys
import threading
from . import timer
import urllib.parse
-import errno
if sys.version_info >= (3, 0):
from io import BytesIO
@@ -541,7 +540,7 @@ class KeepClient(object):
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
@@ -552,19 +551,19 @@ class KeepClient(object):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
@@ -613,25 +612,25 @@ class KeepClient(object):
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
@@ -1131,7 +1130,7 @@ class KeepClient(object):
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
@@ -1188,16 +1187,8 @@ class KeepClient(object):
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return b''
-
- try:
- with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
- return f.read()
- except IOError as e:
- if e.errno == errno.ENOENT:
- raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
- else:
- raise
-
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+ return f.read()
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index 1ab9996..97e1d26 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -369,17 +369,14 @@ def is_hex(s, *length_args):
def list_all(fn, num_retries=0, **kwargs):
# Default limit to (effectively) api server's MAX_LIMIT
kwargs.setdefault('limit', sys.maxsize)
- kwargs.setdefault('order', 'uuid asc')
- kwargs.setdefault('count', 'none')
- addfilters = kwargs.get("filters", [])
items = []
- while True:
- c = fn(**kwargs).execute(num_retries=num_retries)
- items.extend(c['items'])
- if len(c['items']) < c['limit']:
- # Didn't return a full page, so we're done.
- break
- kwargs["filters"] = addfilters + [["uuid", ">", c['items'][-1]['uuid']]]
+ offset = 0
+ items_available = sys.maxsize
+ while len(items) < items_available:
+ c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
+ items += c['items']
+ items_available = c['items_available']
+ offset = c['offset'] + len(c['items'])
return items
def ca_certs_path(fallback=httplib2.CA_CERTS):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list