[ARVADOS] updated: a830b5b560251c3143a7b1fd60db3f50a7021b34
git at public.curoverse.com
git at public.curoverse.com
Mon Jun 15 10:48:48 EDT 2015
Summary of changes:
sdk/python/arvados/_ranges.py | 15 +++++++++++----
sdk/python/arvados/arvfile.py | 21 +++++++++++++++------
sdk/python/arvados/collection.py | 28 +++++++++++++++++++++-------
services/fuse/README.rst | 4 ++++
services/fuse/arvados_fuse/__init__.py | 26 +++++++++++++-------------
services/fuse/arvados_fuse/fusedir.py | 21 +++++++++++----------
services/fuse/arvados_fuse/fusefile.py | 3 ++-
7 files changed, 77 insertions(+), 41 deletions(-)
via a830b5b560251c3143a7b1fd60db3f50a7021b34 (commit)
via 05ff1098f0e9eda5d642a1249f8b3a236656320c (commit)
from 27dc00515f48ed69b4d5e26ff64805b8cda4ccd3 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a830b5b560251c3143a7b1fd60db3f50a7021b34
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 15 10:49:00 2015 -0400
3198: Performance tuning based on benchmarking. Limit number of segments to
look ahead for prefetch, don't prefetch blocks over and over again.
diff --git a/sdk/python/arvados/_ranges.py b/sdk/python/arvados/_ranges.py
index d5ff6ed..371d003 100644
--- a/sdk/python/arvados/_ranges.py
+++ b/sdk/python/arvados/_ranges.py
@@ -21,7 +21,7 @@ class Range(object):
self.range_size == other.range_size and
self.segment_offset == other.segment_offset)
-def first_block(data_locators, range_start, range_size):
+def first_block(data_locators, range_start):
block_start = 0L
# range_start/block_start is the inclusive lower bound
@@ -68,7 +68,7 @@ class LocatorAndRange(object):
def __repr__(self):
return "LocatorAndRange(%r, %r, %r, %r)" % (self.locator, self.block_size, self.segment_offset, self.segment_size)
-def locators_and_ranges(data_locators, range_start, range_size):
+def locators_and_ranges(data_locators, range_start, range_size, limit=None):
"""Get blocks that are covered by a range.
Returns a list of LocatorAndRange objects.
@@ -82,19 +82,26 @@ def locators_and_ranges(data_locators, range_start, range_size):
:range_size:
size of range
+ :limit:
+ Maximum segments to return, default None (unlimited). Will truncate the
+ result if there are more segments needed to cover the range than the
+ limit.
+
"""
if range_size == 0:
return []
resp = []
range_end = range_start + range_size
- i = first_block(data_locators, range_start, range_size)
+ i = first_block(data_locators, range_start)
if i is None:
return []
# We should always start at the first segment due to the binary
# search.
while i < len(data_locators):
+ if limit and len(resp) > limit:
+ break
dl = data_locators[i]
block_start = dl.range_start
block_size = dl.range_size
@@ -163,7 +170,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
data_locators.append(Range(new_locator, new_range_start, new_range_size, new_segment_offset))
return
- i = first_block(data_locators, new_range_start, new_range_size)
+ i = first_block(data_locators, new_range_start)
if i is None:
return
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index 83ef76b..7d6d676 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -639,7 +639,6 @@ class _BlockManager(object):
if v.owner:
v.owner.flush(sync=True)
-
def block_prefetch(self, locator):
"""Initiate a background download of a block.
@@ -653,9 +652,13 @@ class _BlockManager(object):
if not self.prefetch_enabled:
return
+ if self._keep.get_from_cache(locator) is not None:
+ return
+
with self.lock:
if locator in self._bufferblocks:
return
+
self.start_get_threads()
self._prefetch_queue.put(locator)
@@ -811,20 +814,25 @@ class ArvadosFile(object):
with self.lock:
if size == 0 or offset >= self.size():
return ''
- prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
readsegs = locators_and_ranges(self._segments, offset, size)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
- for lr in prefetch:
- self.parent._my_block_manager().block_prefetch(lr.locator)
-
+ locs = set()
data = []
for lr in readsegs:
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ locs.add(lr.locator)
else:
break
+
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
+
return ''.join(data)
def _repack_writes(self, num_retries):
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 81be529..38e794c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1211,11 +1211,11 @@ class Collection(RichCollectionBase):
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
- if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+ if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
# We've merged this record this before. Don't do anything.
return
else:
- self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+ self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
@@ -1245,7 +1245,7 @@ class Collection(RichCollectionBase):
def _remember_api_response(self, response):
self._api_response = response
- self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+ self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
def _populate_from_api_server(self):
# As in KeepClient itself, we must wait until the last
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 4f91de9..91e1907 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -434,7 +434,6 @@ class Operations(llfuse.Operations):
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read %i %i %i", fh, off, size)
-
if fh in self._filehandles:
handle = self._filehandles[fh]
else:
commit 05ff1098f0e9eda5d642a1249f8b3a236656320c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jun 12 15:54:51 2015 -0400
3198: Fix frontrunning (subsequent updates after a commit causing the
collection to conflict with itself). Also fix filename character encoding.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index ffe3d35..83ef76b 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -821,7 +821,8 @@ class ArvadosFile(object):
for lr in readsegs:
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
- data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ blockview = memoryview(block)
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
else:
break
return ''.join(data)
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 70341d8..81be529 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -1168,6 +1168,7 @@ class Collection(RichCollectionBase):
self._manifest_locator = None
self._manifest_text = None
self._api_response = None
+ self._past_versions = set()
self.lock = threading.RLock()
self.events = None
@@ -1198,6 +1199,10 @@ class Collection(RichCollectionBase):
return True
@synchronized
+ def known_past_version(self, modified_at_and_portable_data_hash):
+ return modified_at_and_portable_data_hash in self._past_versions
+
+ @synchronized
@retry_method
def update(self, other=None, num_retries=None):
"""Merge the latest collection on the API server with the current collection."""
@@ -1206,6 +1211,11 @@ class Collection(RichCollectionBase):
if self._manifest_locator is None:
raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
+ if self.known_past_version((response["modified_at"], response["portable_data_hash"])):
+ # We've merged this record this before. Don't do anything.
+ return
+ else:
+ self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
other = CollectionReader(response["manifest_text"])
baseline = CollectionReader(self._manifest_text)
self.apply(baseline.diff(other))
@@ -1233,6 +1243,10 @@ class Collection(RichCollectionBase):
self._block_manager = _BlockManager(self._my_keep())
return self._block_manager
+ def _remember_api_response(self, response):
+ self._api_response = response
+ self._past_versions.add((response["modified_at"], response["portable_data_hash"]))
+
def _populate_from_api_server(self):
# As in KeepClient itself, we must wait until the last
# possible moment to instantiate an API client, in order to
@@ -1242,9 +1256,9 @@ class Collection(RichCollectionBase):
# clause, just like any other Collection lookup
# failure. Return an exception, or None if successful.
try:
- self._api_response = self._my_api().collections().get(
+ self._remember_api_response(self._my_api().collections().get(
uuid=self._manifest_locator).execute(
- num_retries=self.num_retries)
+ num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
return None
except Exception as e:
@@ -1401,11 +1415,11 @@ class Collection(RichCollectionBase):
self.update()
text = self.manifest_text(strip=False)
- self._api_response = self._my_api().collections().update(
+ self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
body={'manifest_text': text}
).execute(
- num_retries=num_retries)
+ num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
self.set_committed()
@@ -1452,7 +1466,7 @@ class Collection(RichCollectionBase):
if create_collection_record:
if name is None:
- name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+ name = "New collection"
ensure_unique_name = True
body = {"manifest_text": text,
@@ -1460,7 +1474,7 @@ class Collection(RichCollectionBase):
if owner_uuid:
body["owner_uuid"] = owner_uuid
- self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
+ self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
self._manifest_locator = self._api_response["uuid"]
@@ -1543,7 +1557,7 @@ class Subcollection(RichCollectionBase):
"""This is a subdirectory within a collection that doesn't have its own API
server record.
- It falls under the umbrella of the root collection.
+ Subcollection locking falls under the umbrella lock of its root collection.
"""
diff --git a/services/fuse/README.rst b/services/fuse/README.rst
index d9a9a07..f0b2677 100644
--- a/services/fuse/README.rst
+++ b/services/fuse/README.rst
@@ -55,6 +55,10 @@ on your system.
Testing and Development
-----------------------
+Debian packages you need to build llfuse:
+
+$ apt-get install python-dev pkg-config libfuse-dev libattr1-dev
+
This package is one part of the Arvados source package, and it has
integration tests to check interoperability with other Arvados
components. Our `hacking guide
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index abe9821..4f91de9 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -183,10 +183,11 @@ class Inodes(object):
"""Manage the set of inodes. This is the mapping from a numeric id
to a concrete File or Directory object"""
- def __init__(self, inode_cache):
+ def __init__(self, inode_cache, encoding="utf-8"):
self._entries = {}
self._counter = itertools.count(llfuse.ROOT_INODE)
self.inode_cache = inode_cache
+ self.encoding = encoding
def __getitem__(self, item):
return self._entries[item]
@@ -263,10 +264,9 @@ class Operations(llfuse.Operations):
if not inode_cache:
inode_cache = InodeCache(cap=256*1024*1024)
- self.inodes = Inodes(inode_cache)
+ self.inodes = Inodes(inode_cache, encoding=encoding)
self.uid = uid
self.gid = gid
- self.encoding = encoding
# dict of inode to filehandle
self._filehandles = {}
@@ -303,6 +303,7 @@ class Operations(llfuse.Operations):
[["event_type", "in", ["create", "update", "delete"]]],
self.on_event)
+ @catch_exceptions
def on_event(self, ev):
if 'event_type' in ev:
with llfuse.lock:
@@ -310,11 +311,13 @@ class Operations(llfuse.Operations):
if item is not None:
item.invalidate()
if ev["object_kind"] == "arvados#collection":
- item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+ new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
+ record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+ item.update(to_record_version=record_version)
else:
item.update()
- oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+ oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
olditemparent = self.inodes.inode_cache.find(oldowner)
if olditemparent is not None:
olditemparent.invalidate()
@@ -335,8 +338,8 @@ class Operations(llfuse.Operations):
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
- entry.entry_timeout = 300
- entry.attr_timeout = 300
+ entry.entry_timeout = 60
+ entry.attr_timeout = 60
entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
if isinstance(e, Directory):
@@ -357,7 +360,7 @@ class Operations(llfuse.Operations):
entry.st_size = e.size()
entry.st_blksize = 512
- entry.st_blocks = (e.size()/512)+1
+ entry.st_blocks = (entry.st_size/512)+1
entry.st_atime = int(e.atime())
entry.st_mtime = int(e.mtime())
entry.st_ctime = int(e.mtime())
@@ -379,7 +382,7 @@ class Operations(llfuse.Operations):
@catch_exceptions
def lookup(self, parent_inode, name):
- name = unicode(name, self.encoding)
+ name = unicode(name, self.inodes.encoding)
inode = None
if name == '.':
@@ -431,6 +434,7 @@ class Operations(llfuse.Operations):
@catch_exceptions
def read(self, fh, off, size):
_logger.debug("arv-mount read %i %i %i", fh, off, size)
+
if fh in self._filehandles:
handle = self._filehandles[fh]
else:
@@ -513,10 +517,7 @@ class Operations(llfuse.Operations):
e = off
while e < len(handle.entries):
if handle.entries[e][1].inode in self.inodes:
- try:
- yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
- except UnicodeEncodeError:
- pass
+ yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
e += 1
@catch_exceptions
diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py
index 26e7fc1..f661e41 100644
--- a/services/fuse/arvados_fuse/fusedir.py
+++ b/services/fuse/arvados_fuse/fusedir.py
@@ -143,8 +143,8 @@ class Directory(FreshBase):
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
- llfuse.invalidate_entry(self.inode, str(i))
+ _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+ llfuse.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[i])
changed = True
@@ -165,7 +165,7 @@ class Directory(FreshBase):
self._entries = oldentries
return False
for n in oldentries:
- llfuse.invalidate_entry(self.inode, str(n))
+ llfuse.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
self.inodes.del_entry(oldentries[n])
llfuse.invalidate_inode(self.inode)
self.invalidate()
@@ -235,14 +235,15 @@ class CollectionDirectoryBase(Directory):
def on_event(self, event, collection, name, item):
if collection == self.collection:
- _logger.debug("%s %s %s %s", event, collection, name, item)
+ name = sanitize_filename(name)
+ _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
ent = self._entries[name]
del self._entries[name]
- llfuse.invalidate_entry(self.inode, name)
+ llfuse.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
self.inodes.del_entry(ent)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
@@ -371,7 +372,7 @@ class CollectionDirectory(CollectionDirectoryBase):
return self.collection_locator
@use_counter
- def update(self, to_pdh=None):
+ def update(self, to_record_version=None):
try:
if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
return True
@@ -387,9 +388,9 @@ class CollectionDirectory(CollectionDirectoryBase):
return
_logger.debug("Updating %s", self.collection_locator)
- if self.collection:
- if self.collection.portable_data_hash() == to_pdh:
- _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+ if self.collection is not None:
+ if self.collection.known_past_version(to_record_version):
+ _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
else:
self.collection.update()
else:
@@ -768,7 +769,7 @@ class ProjectDirectory(Directory):
# Acually move the entry from source directory to this directory.
del src._entries[name_old]
self._entries[name_new] = ent
- llfuse.invalidate_entry(src.inode, name_old)
+ llfuse.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
class SharedDirectory(Directory):
diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py
index d33f9f9..4d472cf 100644
--- a/services/fuse/arvados_fuse/fusefile.py
+++ b/services/fuse/arvados_fuse/fusefile.py
@@ -46,7 +46,8 @@ class FuseArvadosFile(File):
self.arvfile = arvfile
def size(self):
- return self.arvfile.size()
+ with llfuse.lock_released:
+ return self.arvfile.size()
def readfrom(self, off, size, num_retries=0):
with llfuse.lock_released:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list