[ARVADOS] updated: a830b5b560251c3143a7b1fd60db3f50a7021b34

git at public.curoverse.com git at public.curoverse.com
Mon Jun 15 10:48:48 EDT 2015


Summary of changes:
 sdk/python/arvados/_ranges.py          | 15 +++++++++++----
 sdk/python/arvados/arvfile.py          | 21 +++++++++++++++------
 sdk/python/arvados/collection.py       | 28 +++++++++++++++++++++-------
 services/fuse/README.rst               |  4 ++++
 services/fuse/arvados_fuse/__init__.py | 26 +++++++++++++-------------
 services/fuse/arvados_fuse/fusedir.py  | 21 +++++++++++----------
 services/fuse/arvados_fuse/fusefile.py |  3 ++-
 7 files changed, 77 insertions(+), 41 deletions(-)

       via  a830b5b560251c3143a7b1fd60db3f50a7021b34 (commit)
       via  05ff1098f0e9eda5d642a1249f8b3a236656320c (commit)
      from  27dc00515f48ed69b4d5e26ff64805b8cda4ccd3 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a830b5b560251c3143a7b1fd60db3f50a7021b34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 15 10:49:00 2015 -0400

    3198: Performance tuning based on benchmarking.  Limit number of segments to
    look ahead for prefetch, don't prefetch blocks over and over again.

diff --git a/sdk/python/arvados/_ranges.py b/sdk/python/arvados/_ranges.py
index d5ff6ed..371d003 100644
--- a/sdk/python/arvados/_ranges.py
+++ b/sdk/python/arvados/_ranges.py
@@ -21,7 +21,7 @@ class Range(object):
                 self.range_size == other.range_size and
                 self.segment_offset == other.segment_offset)
 
-def first_block(data_locators, range_start, range_size):
+def first_block(data_locators, range_start):
     block_start = 0L
 
     # range_start/block_start is the inclusive lower bound
@@ -68,7 +68,7 @@ class LocatorAndRange(object):
     def __repr__(self):
         return "LocatorAndRange(%r, %r, %r, %r)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
 
-def locators_and_ranges(data_locators, range_start, range_size):
+def locators_and_ranges(data_locators, range_start, range_size, limit=None):
     """Get blocks that are covered by a range.
 
     Returns a list of LocatorAndRange objects.
@@ -82,19 +82,26 @@ def locators_and_ranges(data_locators, range_start, range_size):
     :range_size:
       size of range
 
+    :limit:
+      Maximum segments to return, default None (unlimited).  Will truncate the
+      result if there are more segments needed to cover the range than the
+      limit.
+
     """
     if range_size == 0:
         return []
     resp = []
     range_end = range_start + range_size
 
-    i = first_block(data_locators, range_start, range_size)
+    i = first_block(data_locators, range_start)
     if i is None:
         return []
 
     # We should always start at the first segment due to the binary
     # search.
     while i < len(data_locators):
+        if limit and len(resp) > limit:
+            break
         dl = data_locators[i]
         block_start = dl.range_start
         block_size = dl.range_size
@@ -163,7 +170,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
             data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
         return
 
-    i = first_block(data_locators, new_range_start, new_range_size)
+    i = first_block(data_locators, new_range_start)
     if i is None:
         return
 
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 83ef76b..7d6d676 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -639,7 +639,6 @@ class _BlockManager(object):
             if v.owner:
                 v.owner.flush(sync=True)
 
-
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
 
@@ -653,9 +652,13 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
+        if self._keep.get_from_cache(locator) is not None:
+            return
+
         with self.lock:
             if locator in self._bufferblocks:
                 return
+
         self.start_get_threads()
         self._prefetch_queue.put(locator)
 
@@ -811,20 +814,25 @@ class ArvadosFile(object):
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
-            prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
             readsegs = locators_and_ranges(self._segments, offset, size)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
 
-        for lr in prefetch:
-            self.parent._my_block_manager().block_prefetch(lr.locator)
-
+        locs = set()
         data = []
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
                 data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                locs.add(lr.locator)
             else:
                 break
+
+        for lr in prefetch:
+            if lr.locator not in locs:
+                self.parent._my_block_manager().block_prefetch(lr.locator)
+                locs.add(lr.locator)
+
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 81be529..38e794c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1211,11 +1211,11 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
-            if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+            if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
                 # We've merged this record this before.  Don't do anything.
                 return
             else:
-                self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1245,7 +1245,7 @@ class Collection(RichCollectionBase):
 
     def _remember_api_response(self, response):
         self._api_response = response
-        self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+        self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
 
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 4f91de9..91e1907 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -434,7 +434,6 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
-
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:

commit 05ff1098f0e9eda5d642a1249f8b3a236656320c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 12 15:54:51 2015 -0400

    3198: Fix frontrunning (subsequent updates after a commit causing the
    collection to conflict with itself).  Also fix filename character encoding.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ffe3d35..83ef76b 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -821,7 +821,8 @@ class ArvadosFile(object):
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
-                data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+                blockview = memoryview(block)
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
             else:
                 break
         return ''.join(data)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 70341d8..81be529 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1168,6 +1168,7 @@ class Collection(RichCollectionBase):
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
+        self._past_versions = set()
 
         self.lock = threading.RLock()
         self.events = None
@@ -1198,6 +1199,10 @@ class Collection(RichCollectionBase):
         return True
 
     @synchronized
+    def known_past_version(self, modified_at_and_portable_data_hash):
+        return modified_at_and_portable_data_hash in self._past_versions
+
+    @synchronized
     @retry_method
     def update(self, other=None, num_retries=None):
         """Merge the latest collection on the API server with the current collection."""
@@ -1206,6 +1211,11 @@ class Collection(RichCollectionBase):
             if self._manifest_locator is None:
                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+            if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+                # We've merged this record this before.  Don't do anything.
+                return
+            else:
+                self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1233,6 +1243,10 @@ class Collection(RichCollectionBase):
             self._block_manager = _BlockManager(self._my_keep())
         return self._block_manager
 
+    def _remember_api_response(self, response):
+        self._api_response = response
+        self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -1242,9 +1256,9 @@ class Collection(RichCollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            self._api_response = self._my_api().collections().get(
+            self._remember_api_response(self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries)
+                    num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
             return None
         except Exception as e:
@@ -1401,11 +1415,11 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
-            self._api_response = self._my_api().collections().update(
+            self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
                 body={'manifest_text': text}
                 ).execute(
-                    num_retries=num_retries)
+                    num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
             self.set_committed()
 
@@ -1452,7 +1466,7 @@ class Collection(RichCollectionBase):
 
         if create_collection_record:
             if name is None:
-                name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                name = "New collection"
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
@@ -1460,7 +1474,7 @@ class Collection(RichCollectionBase):
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
-            self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+            self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]
@@ -1543,7 +1557,7 @@ class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
     server record.
 
-    It falls under the umbrella of the root collection.
+    Subcollection locking falls under the umbrella lock of its root collection.
 
     """
 
diff --git a/services/fuse/README.rst b/services/fuse/README.rst
index d9a9a07..f0b2677 100644
--- a/services/fuse/README.rst
+++ b/services/fuse/README.rst
@@ -55,6 +55,10 @@ on your system.
 Testing and Development
 -----------------------
 
+Debian packages you need to build llfuse:
+
+$ apt-get install python-dev pkg-config libfuse-dev libattr1-dev
+
 This package is one part of the Arvados source package, and it has
 integration tests to check interoperability with other Arvados
 components.  Our `hacking guide
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index abe9821..4f91de9 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -183,10 +183,11 @@ class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self, inode_cache):
+    def __init__(self, inode_cache, encoding="utf-8"):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self.inode_cache = inode_cache
+        self.encoding = encoding
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -263,10 +264,9 @@ class Operations(llfuse.Operations):
 
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
-        self.inodes = Inodes(inode_cache)
+        self.inodes = Inodes(inode_cache, encoding=encoding)
         self.uid = uid
         self.gid = gid
-        self.encoding = encoding
 
         # dict of inode to filehandle
         self._filehandles = {}
@@ -303,6 +303,7 @@ class Operations(llfuse.Operations):
                                  [["event_type", "in", ["create", "update", "delete"]]],
                                  self.on_event)
 
+    @catch_exceptions
     def on_event(self, ev):
         if 'event_type' in ev:
             with llfuse.lock:
@@ -310,11 +311,13 @@ class Operations(llfuse.Operations):
                 if item is not None:
                     item.invalidate()
                     if ev["object_kind"] == "arvados#collection":
-                        item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+                        new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
+                        record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+                        item.update(to_record_version=record_version)
                     else:
                         item.update()
 
-                oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+                oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
                 olditemparent = self.inodes.inode_cache.find(oldowner)
                 if olditemparent is not None:
                     olditemparent.invalidate()
@@ -335,8 +338,8 @@ class Operations(llfuse.Operations):
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 300
-        entry.attr_timeout = 300
+        entry.entry_timeout = 60
+        entry.attr_timeout = 60
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
@@ -357,7 +360,7 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (e.size()/512)+1
+        entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
         entry.st_mtime = int(e.mtime())
         entry.st_ctime = int(e.mtime())
@@ -379,7 +382,7 @@ class Operations(llfuse.Operations):
 
     @catch_exceptions
     def lookup(self, parent_inode, name):
-        name = unicode(name, self.encoding)
+        name = unicode(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
@@ -431,6 +434,7 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
+
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
@@ -513,10 +517,7 @@ class Operations(llfuse.Operations):
         e = off
         while e < len(handle.entries):
             if handle.entries[e][1].inode in self.inodes:
-                try:
-                    yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
-                except UnicodeEncodeError:
-                    pass
+                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
     @catch_exceptions
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 26e7fc1..f661e41 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -143,8 +143,8 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
-            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
-            llfuse.invalidate_entry(self.inode, str(i))
+            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+            llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
@@ -165,7 +165,7 @@ class Directory(FreshBase):
                     self._entries = oldentries
                     return False
             for n in oldentries:
-                llfuse.invalidate_entry(self.inode, str(n))
+                llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
                 self.inodes.del_entry(oldentries[n])
             llfuse.invalidate_inode(self.inode)
             self.invalidate()
@@ -235,14 +235,15 @@ class CollectionDirectoryBase(Directory):
 
     def on_event(self, event, collection, name, item):
         if collection == self.collection:
-            _logger.debug("%s %s %s %s", event, collection, name, item)
+            name = sanitize_filename(name)
+            _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
                     ent = self._entries[name]
                     del self._entries[name]
-                    llfuse.invalidate_entry(self.inode, name)
+                    llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
                     self.inodes.del_entry(ent)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
@@ -371,7 +372,7 @@ class CollectionDirectory(CollectionDirectoryBase):
         return self.collection_locator
 
     @use_counter
-    def update(self, to_pdh=None):
+    def update(self, to_record_version=None):
         try:
             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
@@ -387,9 +388,9 @@ class CollectionDirectory(CollectionDirectoryBase):
                         return
 
                     _logger.debug("Updating %s", self.collection_locator)
-                    if self.collection:
-                        if self.collection.portable_data_hash() == to_pdh:
-                            _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+                    if self.collection is not None:
+                        if self.collection.known_past_version(to_record_version):
+                            _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
                         else:
                             self.collection.update()
                     else:
@@ -768,7 +769,7 @@ class ProjectDirectory(Directory):
         # Acually move the entry from source directory to this directory.
         del src._entries[name_old]
         self._entries[name_new] = ent
-        llfuse.invalidate_entry(src.inode, name_old)
+        llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
 
 
 class SharedDirectory(Directory):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index d33f9f9..4d472cf 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,7 +46,8 @@ class FuseArvadosFile(File):
         self.arvfile = arvfile
 
     def size(self):
-        return self.arvfile.size()
+        with llfuse.lock_released:
+            return self.arvfile.size()
 
     def readfrom(self, off, size, num_retries=0):
         with llfuse.lock_released:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list