[ARVADOS] updated: a4ab52b4305023d8c679cc7c3b09361050d167b6
git at public.curoverse.com
git at public.curoverse.com
Thu Sep 18 15:55:02 EDT 2014
Summary of changes:
sdk/cli/bin/crunch-job | 22 ++++-
sdk/python/arvados/collection.py | 150 ++++++++++++++++++------------
sdk/python/arvados/commands/_util.py | 12 +++
sdk/python/arvados/commands/keepdocker.py | 55 ++++++-----
sdk/python/arvados/commands/put.py | 45 ++++-----
sdk/python/arvados/keep.py | 30 ++++--
sdk/python/arvados/retry.py | 18 ++++
sdk/python/arvados/stream.py | 68 ++++++++------
sdk/python/arvados/util.py | 4 +-
sdk/python/bin/arv-get | 46 ++++-----
sdk/python/bin/arv-ls | 21 ++---
sdk/python/setup.py | 2 +-
sdk/python/tests/arvados_testutil.py | 11 +++
sdk/python/tests/test_arv_put.py | 8 +-
sdk/python/tests/test_collections.py | 143 +++++++++++++++++++++++++++-
sdk/python/tests/test_keep_client.py | 69 +++++++-------
sdk/python/tests/test_retry.py | 27 ++++++
sdk/python/tests/test_stream.py | 117 +++++++++++++++++++++++
services/fuse/arvados_fuse/__init__.py | 108 +++++++++++++--------
services/fuse/bin/arv-mount | 66 +++++++------
services/fuse/setup.py | 2 +-
services/fuse/tests/test_mount.py | 12 +--
22 files changed, 737 insertions(+), 299 deletions(-)
create mode 100644 sdk/python/tests/test_stream.py
via a4ab52b4305023d8c679cc7c3b09361050d167b6 (commit)
via 1da127f1e63485ac225cb16511013094fd7e84f6 (commit)
via 876d1f80e0344989f8b68799918c467fa5a52e19 (commit)
via eb3c3c9eeb73529353ee385385865d6d3dc99913 (commit)
via 8ba7f9450ce51aa4ec8b0944991abe5a692ab4d1 (commit)
via 1f45f7666edec7b10cee415f238931744247b762 (commit)
via cf9b56161704477075cda297b44dae4d9342c24a (commit)
via 1f354e0ba1b5b23c2d36c0cb60451260b29e1d3f (commit)
via 80c57e3536f41d8419f580b577776d85209f6111 (commit)
via 5a9aaf3f32fc0414ae16aed17a77388cf4af9f90 (commit)
via af2c75821456e28875dc8006efb03f38ec1a74b8 (commit)
via 9facc2aeba6770ee80730001ca66bdaec8bde738 (commit)
via 8d554a2734be8bbe015d38a48494f7699f2de439 (commit)
via f51c721f97c6e5ef4fda47e6d0fc45cdd2c2b77d (commit)
from 58ccc8f13d66fc3da1aecb392a6698a29958e203 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a4ab52b4305023d8c679cc7c3b09361050d167b6
Merge: 58ccc8f 1da127f
Author: Brett Smith <brett at curoverse.com>
Date: Thu Sep 18 15:56:28 2014 -0400
Merge branch '3147-py-collection-retries-wip2'
Closes #3147.
commit 1da127f1e63485ac225cb16511013094fd7e84f6
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 18:13:40 2014 -0400
3147: crunch-job retries data uploads more for longer-running jobs.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 613be70..70f379e 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1207,7 +1207,8 @@ sub collate_output
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
+ my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
+ '--retries', put_retry_count());
my $joboutput;
for (@jobstep)
{
@@ -1347,8 +1348,9 @@ sub save_meta
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
$local_logfile->flush;
- my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
- . quotemeta($local_logfile->filename);
+ my $retry_count = put_retry_count();
+ my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
+ "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
my $loglocator = `$cmd`;
die "system $cmd failed: $?" if $?;
chomp($loglocator);
@@ -1494,6 +1496,20 @@ sub find_docker_image {
}
}
+sub put_retry_count {
+ # Calculate a --retries argument for arv-put that will have it try
+ # approximately as long as this Job has been running.
+ my $stoptime = shift || time;
+ my $starttime = $jobstep[0]->{starttime};
+ my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
+ my $retries = 0;
+ while ($timediff >= 2) {
+ $retries++;
+ $timediff /= 2;
+ }
+ return ($retries > 3) ? $retries : 3;
+}
+
__DATA__
#!/usr/bin/perl
commit 876d1f80e0344989f8b68799918c467fa5a52e19
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:29:38 2014 -0400
3147: Add retry support to FUSE driver.
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index d5523e8..e40b88c 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -289,10 +289,11 @@ class Directory(FreshBase):
class CollectionDirectory(Directory):
'''Represents the root of a directory tree holding a collection.'''
- def __init__(self, parent_inode, inodes, api, collection):
+ def __init__(self, parent_inode, inodes, api, num_retries, collection):
super(CollectionDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.collection_object_file = None
self.collection_object = None
if isinstance(collection, dict):
@@ -310,7 +311,9 @@ class CollectionDirectory(Directory):
self.collection_object_file.update(self.collection_object)
self.clear()
- collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
+ collection = arvados.CollectionReader(
+ self.collection_object["manifest_text"], self.api,
+ self.api.localkeep(), num_retries=self.num_retries)
for s in collection.all_streams():
cwd = self
for part in s.name().split('/'):
@@ -328,7 +331,9 @@ class CollectionDirectory(Directory):
return True
with llfuse.lock_released:
- new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+ new_collection_object = self.api.collections().get(
+ uuid=self.collection_locator
+ ).execute(num_retries=self.num_retries)
if "portable_data_hash" not in new_collection_object:
new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
# end with llfuse.lock_released, re-acquire lock
@@ -386,10 +391,11 @@ class MagicDirectory(Directory):
to readdir().
'''
- def __init__(self, parent_inode, inodes, api):
+ def __init__(self, parent_inode, inodes, api, num_retries):
super(MagicDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
# Have to defer creating readme_file because at this point we don't
# yet have an inode assigned.
self.readme_file = None
@@ -418,7 +424,8 @@ will appear if it exists.
return False
try:
- e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
+ e = self.inodes.add_entry(CollectionDirectory(
+ self.inode, self.inodes, self.api, self.num_retries, k))
if e.update():
self._entries[k] = e
return True
@@ -457,21 +464,25 @@ class RecursiveInvalidateDirectory(Directory):
class TagsDirectory(RecursiveInvalidateDirectory):
'''A special directory that contains as subdirectories all tags visible to the user.'''
- def __init__(self, parent_inode, inodes, api, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
super(TagsDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+ tags = self.api.links().list(
+ filters=[['link_class', '=', 'tag']],
+ select=['name'], distinct=True
+ ).execute(num_retries=self.num_retries)
if "items" in tags:
self.merge(tags['items'],
lambda i: i['name'] if 'name' in i else i['uuid'],
lambda a, i: a.tag == i,
- lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
class TagDirectory(Directory):
@@ -479,45 +490,51 @@ class TagDirectory(Directory):
to the user that are tagged with a particular tag.
'''
- def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, tag,
+ poll=False, poll_time=60):
super(TagDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.tag = tag
self._poll = poll
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']).execute()
+ taggedcollections = self.api.links().list(
+ filters=[['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection']],
+ select=['head_uuid']
+ ).execute(num_retries=self.num_retries)
self.merge(taggedcollections['items'],
lambda i: i['head_uuid'],
lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
+ lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
class ProjectDirectory(Directory):
'''A special directory that contains the contents of a project.'''
- def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+ poll=False, poll_time=60):
super(ProjectDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.project_object = project_object
self.project_object_file = None
self.uuid = project_object['uuid']
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i)
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
@@ -557,13 +574,19 @@ class ProjectDirectory(Directory):
with llfuse.lock_released:
if group_uuid_pattern.match(self.uuid):
- self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+ self.project_object = self.api.groups().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
elif user_uuid_pattern.match(self.uuid):
- self.project_object = self.api.users().get(uuid=self.uuid).execute()
+ self.project_object = self.api.users().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+ contents = arvados.util.list_all(self.api.groups().contents,
+ self.num_retries, uuid=self.uuid)
# Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+ contents += arvados.util.list_all(
+ self.api.links().list, self.num_retries,
+ filters=[['tail_uuid', '=', self.uuid],
+ ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
@@ -589,17 +612,21 @@ class ProjectDirectory(Directory):
class SharedDirectory(Directory):
'''A special directory that represents users or groups who have shared projects with me.'''
- def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+ poll=False, poll_time=60):
super(SharedDirectory, self).__init__(parent_inode)
- self.current_user = api.users().current().execute()
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
+ self.current_user = api.users().current().execute(num_retries=num_retries)
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+ all_projects = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['group_class','=','project']])
objects = {}
for ob in all_projects:
objects[ob['uuid']] = ob
@@ -611,8 +638,12 @@ class SharedDirectory(Directory):
roots.append(ob)
root_owners[ob['owner_uuid']] = True
- lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+ lusers = arvados.util.list_all(
+ self.api.users().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
users = {}
groups = {}
@@ -641,7 +672,7 @@ class SharedDirectory(Directory):
self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception as e:
_logger.exception(e)
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index e07860d..e92b1b4 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -9,6 +9,7 @@ import signal
import subprocess
import time
+import arvados.commands._util as arv_cmd
from arvados_fuse import *
logger = logging.getLogger('arvados.arv-mount')
@@ -16,6 +17,7 @@ logger = logging.getLogger('arvados.arv-mount')
if __name__ == '__main__':
# Handle command line parameters
parser = argparse.ArgumentParser(
+ parents=[arv_cmd.retry_opt],
description='''Mount Keep data under the local filesystem. Default mode is --home''',
epilog="""
Note: When using the --exec feature, you must either specify the
@@ -80,29 +82,42 @@ with "--".
operations = Operations(os.getuid(), os.getgid())
api = SafeApi(arvados.config)
- usr = api.users().current().execute()
+ usr = api.users().current().execute(num_retries=args.retries)
now = time.time()
+ dir_class = None
+ dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
if args.by_id:
# Set up the request handler with the 'magic directory' at the root
- operations.inodes.add_entry(MagicDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+ dir_class = MagicDirectory
elif args.by_tag:
- operations.inodes.add_entry(TagsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+ dir_class = TagsDirectory
elif args.shared:
- operations.inodes.add_entry(SharedDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
+ dir_class = SharedDirectory
+ dir_args.append(usr)
elif args.home:
- operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
- elif args.collection != None:
+ dir_class = ProjectDirectory
+ dir_args.append(usr)
+ elif args.collection is not None:
# Set up the request handler with the collection at the root
- operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
- elif args.project != None:
- operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
+ dir_class = CollectionDirectory
+ dir_args.append(args.collection)
+ elif args.project is not None:
+ dir_class = ProjectDirectory
+ dir_args.append(api.groups().get(uuid=args.project).execute(
+ num_retries=args.retries))
+
+ if dir_class is not None:
+ operations.inodes.add_entry(dir_class(*dir_args))
else:
e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+ dir_args[0] = e.inode
- e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(e.inode, operations.inodes, api, usr))
- e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(e.inode, operations.inodes, api, usr))
- e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(e.inode, operations.inodes, api))
- e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
+ e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+ e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
+
+ dir_args.append(usr)
+ e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(*dir_args))
+ e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(*dir_args))
text = '''
Welcome to Arvados! This directory provides file system access to files and objects
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 9627777..a1f7d4b 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -73,7 +73,7 @@ class FuseMountTest(MountTestBase):
def runTest(self):
# Create the request handler
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.testcollection))
+ e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -128,7 +128,7 @@ class FuseMagicTest(MountTestBase):
def runTest(self):
# Create the request handler
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+ e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
self.mounttmp = tempfile.mkdtemp()
@@ -163,7 +163,7 @@ class FuseMagicTest(MountTestBase):
class FuseTagsTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+ e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -188,7 +188,7 @@ class FuseTagsTest(MountTestBase):
class FuseTagsUpdateTest(MountTestBase):
def runRealTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, poll_time=1))
+ e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -241,7 +241,7 @@ class FuseTagsUpdateTest(MountTestBase):
class FuseSharedTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()['uuid']))
+ e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -281,7 +281,7 @@ class FuseSharedTest(MountTestBase):
class FuseHomeTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()))
+ e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
commit eb3c3c9eeb73529353ee385385865d6d3dc99913
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 10:58:11 2014 -0400
3147: FUSE driver requires a Python SDK with retry support.
I also took out some of the older log-handling code while I was at it,
since the stricter versioning makes it unnecessary.
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 76d026e..e07860d 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -59,29 +59,20 @@ with "--".
else:
daemon_ctx = None
- # Configure a logger based on command-line switches.
- # If we're using a contemporary Python SDK (mid-August 2014),
- # configure the arvados hierarchy logger.
- # Otherwise, configure the program root logger.
- base_logger = getattr(arvados, 'logger', None)
-
+ # Configure a log handler based on command-line switches.
if args.logfile:
log_handler = logging.FileHandler(args.logfile)
elif daemon_ctx:
log_handler = logging.NullHandler()
- elif base_logger:
- log_handler = arvados.log_handler
else:
- log_handler = logging.StreamHandler()
+ log_handler = None
- if base_logger is None:
- base_logger = logging.getLogger()
- else:
- base_logger.removeHandler(arvados.log_handler)
- base_logger.addHandler(log_handler)
+ if log_handler is not None:
+ arvados.logger.removeHandler(arvados.log_handler)
+ arvados.logger.addHandler(log_handler)
if args.debug:
- base_logger.setLevel(logging.DEBUG)
+ arvados.logger.setLevel(logging.DEBUG)
logger.debug("arv-mount debugging enabled")
try:
@@ -103,7 +94,7 @@ with "--".
elif args.collection != None:
# Set up the request handler with the collection at the root
operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
- elif args.project != None:
+ elif args.project != None:
operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
else:
e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
@@ -114,8 +105,8 @@ with "--".
e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
text = '''
-Welcome to Arvados! This directory provides file system access to files and objects
-available on the Arvados installation located at '{}'
+Welcome to Arvados! This directory provides file system access to files and objects
+available on the Arvados installation located at '{}'
using credentials for user '{}'.
From here, the following directories are available:
diff --git a/services/fuse/setup.py b/services/fuse/setup.py
index d9fe797..c200a84 100644
--- a/services/fuse/setup.py
+++ b/services/fuse/setup.py
@@ -20,7 +20,7 @@ setup(name='arvados_fuse',
'bin/arv-mount'
],
install_requires=[
- 'arvados-python-client',
+ 'arvados-python-client>=0.1.1411070090', # 2014-09-18
'llfuse',
'python-daemon'
],
commit 8ba7f9450ce51aa4ec8b0944991abe5a692ab4d1
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 11:21:31 2014 -0400
3147: Add retry support to PySDK list_all utility.
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index ada1aec..2609f11 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -338,12 +338,12 @@ def is_hex(s, *length_args):
good_len = True
return bool(good_len and HEX_RE.match(s))
-def list_all(fn, **kwargs):
+def list_all(fn, num_retries=0, **kwargs):
items = []
offset = 0
items_available = sys.maxint
while len(items) < items_available:
- c = fn(offset=offset, **kwargs).execute()
+ c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
items += c['items']
items_available = c['items_available']
offset = c['offset'] + len(c['items'])
commit 1f45f7666edec7b10cee415f238931744247b762
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 10:37:32 2014 -0400
3147: Add retry support to Python CLI tools.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 4f5fd19..782e85c 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -166,8 +166,10 @@ class CollectionReader(CollectionBase):
if not util.portable_data_hash_pattern.match(
self._manifest_locator):
raise
- _logger.warning("API lookup failed for collection %s (%s: %s)",
- self._manifest_locator, type(e), str(e))
+ _logger.warning(
+ "API server did not return Collection '%s'. " +
+ "Trying to fetch directly from Keep (deprecated).",
+ self._manifest_locator)
self._manifest_text = self._my_keep().get(
self._manifest_locator, num_retries=self.num_retries)
self._streams = [sline.split()
diff --git a/sdk/python/arvados/commands/_util.py b/sdk/python/arvados/commands/_util.py
index f7cb80d..c42ee7a 100644
--- a/sdk/python/arvados/commands/_util.py
+++ b/sdk/python/arvados/commands/_util.py
@@ -1,8 +1,20 @@
#!/usr/bin/env python
+import argparse
import errno
import os
+def _pos_int(s):
+ num = int(s)
+ if num < 0:
+ raise ValueError("can't accept negative value: %s" % (num,))
+ return num
+
+retry_opt = argparse.ArgumentParser(add_help=False)
+retry_opt.add_argument('--retries', type=_pos_int, default=3, help="""
+Maximum number of times to retry server requests that encounter temporary
+failures (e.g., server down). Default 3.""")
+
def _ignore_error(error):
return None
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index c96e2cb..d0f60bf 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -47,7 +47,7 @@ keepdocker_parser.add_argument(
# --progress/--no-progress/--batch-progress and --resume/--no-resume.
arg_parser = argparse.ArgumentParser(
description="Upload or list Docker images in Arvados",
- parents=[keepdocker_parser, arv_put.run_opts])
+ parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
class DockerError(Exception):
pass
@@ -152,9 +152,10 @@ def prep_image_file(filename):
image_file = open(file_path, 'w+b' if need_save else 'rb')
return image_file, need_save
-def make_link(link_class, link_name, **link_attrs):
+def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
link_attrs.update({'link_class': link_class, 'name': link_name})
- return arvados.api('v1').links().create(body=link_attrs).execute()
+ return api_client.links().create(body=link_attrs).execute(
+ num_retries=num_retries)
def ptimestamp(t):
s = t.split(".")
@@ -162,8 +163,10 @@ def ptimestamp(t):
t = s[0] + s[1][-1:]
return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%SZ")
-def list_images_in_arv():
- existing_links = arvados.api('v1').links().list(filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]).execute()['items']
+def list_images_in_arv(api_client, num_retries):
+ existing_links = api_client.links().list(
+ filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]
+ ).execute(num_retries=num_retries)['items']
images = {}
for link in existing_links:
collection_uuid = link["head_uuid"]
@@ -196,9 +199,10 @@ def list_images_in_arv():
def main(arguments=None):
args = arg_parser.parse_args(arguments)
+ api = arvados.api('v1')
if args.image is None or args.image == 'images':
- list_images_in_arv()
+ list_images_in_arv(api, args.retries)
sys.exit(0)
# Pull the image if requested, unless the image is specified as a hash
@@ -222,30 +226,35 @@ def main(arguments=None):
else:
collection_name = args.name
- api = arvados.api('v1')
-
if not args.force:
# Check if this image is already in Arvados.
# Project where everything should be owned
- parent_project_uuid = args.project_uuid if args.project_uuid else api.users().current().execute()['uuid']
+ if args.project_uuid:
+ parent_project_uuid = args.project_uuid
+ else:
+ parent_project_uuid = api.users().current().execute(
+ num_retries=args.retries)['uuid']
# Find image hash tags
existing_links = api.links().list(
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', '=', image_hash]]).execute()['items']
+ ['name', '=', image_hash]]
+ ).execute(num_retries=args.retries)['items']
if existing_links:
# get readable collections
collections = api.collections().list(
filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
- select=["uuid", "owner_uuid", "name", "manifest_text"]).execute()['items']
+ select=["uuid", "owner_uuid", "name", "manifest_text"]
+ ).execute(num_retries=args.retries)['items']
if collections:
# check for repo+tag links on these collections
existing_repo_tag = (api.links().list(
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=', image_repo_tag],
- ['head_uuid', 'in', collections]]).execute()['items']) if image_repo_tag else []
+ ['head_uuid', 'in', collections]]
+ ).execute(num_retries=args_retries)['items']) if image_repo_tag else []
# Filter on elements owned by the parent project
owned_col = [c for c in collections if c['owner_uuid'] == parent_project_uuid]
@@ -257,21 +266,25 @@ def main(arguments=None):
coll_uuid = owned_col[0]['uuid']
else:
# create new collection owned by the project
- coll_uuid = api.collections().create(body={"manifest_text": collections[0]['manifest_text'],
- "name": collection_name,
- "owner_uuid": parent_project_uuid},
- ensure_unique_name=True).execute()['uuid']
+ coll_uuid = api.collections().create(
+ body={"manifest_text": collections[0]['manifest_text'],
+ "name": collection_name,
+ "owner_uuid": parent_project_uuid},
+ ensure_unique_name=True
+ ).execute(num_retries=args.retries)['uuid']
link_base = {'owner_uuid': parent_project_uuid,
'head_uuid': coll_uuid }
if not owned_img:
# create image link owned by the project
- make_link('docker_image_hash', image_hash, **link_base)
+ make_link(api, args.retries,
+ 'docker_image_hash', image_hash, **link_base)
if not owned_rep and image_repo_tag:
# create repo+tag link owned by the project
- make_link('docker_image_repo+tag', image_repo_tag, **link_base)
+ make_link(api, args.retries, 'docker_image_repo+tag',
+ image_repo_tag, **link_base)
print(coll_uuid)
@@ -306,10 +319,10 @@ def main(arguments=None):
if args.project_uuid is not None:
link_base['owner_uuid'] = args.project_uuid
- make_link('docker_image_hash', image_hash, **link_base)
+ make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
if image_repo_tag:
- make_link('docker_image_repo+tag', image_repo_tag,
- **link_base)
+ make_link(api, args.retries,
+ 'docker_image_repo+tag', image_repo_tag, **link_base)
# Clean up.
image_file.close()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 15551fa..0099487 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -145,7 +145,7 @@ Do not continue interrupted uploads from cached state.
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
- parents=[upload_opts, run_opts])
+ parents=[upload_opts, run_opts, arv_cmd.retry_opt])
def parse_arguments(arguments):
args = arg_parser.parse_args(arguments)
@@ -246,23 +246,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
['bytes_written', '_seen_inputs'])
def __init__(self, cache=None, reporter=None, bytes_expected=None,
- api_client=None):
+ api_client=None, num_retries=0):
self.bytes_written = 0
self._seen_inputs = []
self.cache = cache
self.reporter = reporter
self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(api_client)
+ super(ArvPutCollectionWriter, self).__init__(
+ api_client, num_retries=num_retries)
@classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None):
+ def from_cache(cls, cache, reporter=None, bytes_expected=None,
+ num_retries=0):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected)
+ writer = cls.from_state(state, cache, reporter, bytes_expected,
+ num_retries=num_retries)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected)
+ return cls(cache, reporter, bytes_expected, num_retries=num_retries)
else:
return writer
@@ -348,18 +351,16 @@ def progress_writer(progress_func, outfile=sys.stderr):
def exit_signal_handler(sigcode, frame):
sys.exit(-sigcode)
-def desired_project_uuid(api_client, project_uuid):
- if project_uuid:
- if arvados.util.user_uuid_pattern.match(project_uuid):
- api_client.users().get(uuid=project_uuid).execute()
- return project_uuid
- elif arvados.util.group_uuid_pattern.match(project_uuid):
- api_client.groups().get(uuid=project_uuid).execute()
- return project_uuid
- else:
- raise ValueError("Not a valid project uuid: {}".format(project_uuid))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+ if not project_uuid:
+ query = api_client.users().current()
+ elif arvados.util.user_uuid_pattern.match(project_uuid):
+ query = api_client.users().get(uuid=project_uuid)
+ elif arvados.util.group_uuid_pattern.match(project_uuid):
+ query = api_client.groups().get(uuid=project_uuid)
else:
- return api_client.users().current().execute()['uuid']
+ raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+ return query.execute(num_retries=num_retries)['uuid']
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
global api_client
@@ -387,7 +388,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
# Determine the parent project
try:
- project_uuid = desired_project_uuid(api_client, args.project_uuid)
+ project_uuid = desired_project_uuid(api_client, args.project_uuid,
+ args.retries)
except (apiclient.errors.Error, ValueError) as error:
print >>stderr, error
sys.exit(1)
@@ -413,10 +415,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
sys.exit(1)
if resume_cache is None:
- writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+ writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
+ num_retries=args.retries)
else:
writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, bytes_expected)
+ resume_cache, reporter, bytes_expected, num_retries=args.retries)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
@@ -456,7 +459,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
'manifest_text': writer.manifest_text()
},
ensure_unique_name=True
- ).execute()
+ ).execute(num_retries=args.retries)
print >>stderr, "Collection saved as '%s'" % collection['name']
diff --git a/sdk/python/bin/arv-get b/sdk/python/bin/arv-get
index 0d403d1..38a9823 100755
--- a/sdk/python/bin/arv-get
+++ b/sdk/python/bin/arv-get
@@ -9,6 +9,7 @@ import sys
import logging
import arvados
+import arvados.commands._util as arv_cmd
logger = logging.getLogger('arvados.arv-get')
@@ -17,7 +18,8 @@ def abort(msg, code=1):
exit(code)
parser = argparse.ArgumentParser(
- description='Copy data from Keep to a local file or pipe.')
+ description='Copy data from Keep to a local file or pipe.',
+ parents=[arv_cmd.retry_opt])
parser.add_argument('locator', type=str,
help="""
Collection locator, optionally with a file path or prefix.
@@ -123,36 +125,28 @@ collection = r.group(1)
get_prefix = r.group(2)
if args.r and not get_prefix:
get_prefix = os.sep
-
-todo = []
-todo_bytes = 0
api_client = arvados.api('v1')
+reader = arvados.CollectionReader(collection, num_retries=args.retries)
+
if not get_prefix:
- try:
- if not args.n:
- if not args.f and os.path.exists(args.destination):
- abort('Local file %s already exists.' % (args.destination,))
- with open(args.destination, 'wb') as f:
- try:
- c = api_client.collections().get(uuid=collection).execute()
- manifest = c['manifest_text']
- except Exception as e:
- logger.warning(
- "Collection %s not found. " +
- "Trying to fetch directly from Keep (deprecated).",
- collection)
- manifest = arvados.KeepClient(
- api_client=api_client).get(collection)
- f.write(manifest)
- sys.exit(0)
- except arvados.errors.NotFoundError as e:
- abort(e)
-
-reader = arvados.CollectionReader(collection)
+ if not args.n:
+ open_flags = os.O_CREAT | os.O_WRONLY
+ if not args.f:
+ open_flags |= os.O_EXCL
+ try:
+ out_fd = os.open(args.destination, open_flags)
+ with os.fdopen(out_fd, 'wb') as out_file:
+ out_file.write(reader.manifest_text())
+ except (IOError, OSError) as error:
+ abort("can't write to '{}': {}".format(args.destination, error))
+ except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
+ abort("failed to download '{}': {}".format(collection, e))
+ sys.exit(0)
# Scan the collection. Make an array of (stream, file, local
# destination filename) tuples, and add up total size to extract.
-
+todo = []
+todo_bytes = 0
try:
for s in reader.all_streams():
for f in s.all_files():
diff --git a/sdk/python/bin/arv-ls b/sdk/python/bin/arv-ls
index ee74bb8..382bfe8 100755
--- a/sdk/python/bin/arv-ls
+++ b/sdk/python/bin/arv-ls
@@ -1,27 +1,20 @@
#!/usr/bin/env python
import argparse
-import hashlib
-import os
-import re
-import string
-import sys
+
+import arvados
+import arvados.commands._util as arv_cmd
parser = argparse.ArgumentParser(
- description='List contents of a manifest')
+ description='List contents of a manifest',
+ parents=[arv_cmd.retry_opt])
parser.add_argument('locator', type=str,
- help="""
-Collection locator
-""")
-
+ help="Collection UUID or locator")
parser.add_argument('-s', action='store_true', help="""List file sizes, in KiB.""")
args = parser.parse_args()
-
-import arvados
-
-cr = arvados.CollectionReader(args.locator)
+cr = arvados.CollectionReader(args.locator, num_retries=args.retries)
for f in cr.all_files():
if args.s:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index c7b01bc..001add3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -440,14 +440,15 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
def test_check_real_project_found(self):
self.authorize_with('active')
- self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID),
+ self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
"did not correctly find test fixture project")
def test_check_error_finding_nonexistent_uuid(self):
BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
self.authorize_with('active')
try:
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
except ValueError as error:
self.assertIn(BAD_UUID, error.message)
else:
@@ -457,7 +458,8 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
def test_short_put_from_stdin(self):
# Have to run this as an integration test since arv-put can't
commit cf9b56161704477075cda297b44dae4d9342c24a
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:27:51 2014 -0400
3147: Add retry support to PySDK Collection objects.
This required updating the FUSE driver's SafeApi object to better
imitate a real API object, now that CollectionReader will pass it down
to the underlying KeepClient.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 496136e..4f5fd19 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -91,10 +91,45 @@ def normalize(collection):
return normalized_streams
-class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
+class CollectionBase(object):
+ def __enter__(self):
+ pass
+
+ def __exit__(self):
+ pass
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ return self._keep_client
+
+
+class CollectionReader(CollectionBase):
+ def __init__(self, manifest_locator_or_text, api_client=None,
+ keep_client=None, num_retries=0):
+ """Instantiate a CollectionReader.
+
+ This class parses Collection manifests to provide a simple interface
+ to read its underlying files.
+
+ Arguments:
+ * manifest_locator_or_text: One of a Collection UUID, portable data
+ hash, or full manifest text.
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * keep_client: The KeepClient to use to download Collection data.
+ If not provided, CollectionReader will build one from available
+ Arvados configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ """
self._api_client = api_client
self._keep_client = keep_client
+ self.num_retries = num_retries
if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
@@ -109,12 +144,6 @@ class CollectionReader(object):
"Argument to CollectionReader must be a manifest or a collection UUID")
self._streams = None
- def __enter__(self):
- pass
-
- def __exit__(self):
- pass
-
def _populate(self):
if self._streams is not None:
return
@@ -128,11 +157,10 @@ class CollectionReader(object):
# just like any other Collection lookup failure.
if self._api_client is None:
self._api_client = arvados.api('v1')
- self._keep_client = KeepClient(api_client=self._api_client)
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
+ self._keep_client = None # Make a new one with the new api.
c = self._api_client.collections().get(
- uuid=self._manifest_locator).execute()
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
self._manifest_text = c['manifest_text']
except Exception as e:
if not util.portable_data_hash_pattern.match(
@@ -140,29 +168,24 @@ class CollectionReader(object):
raise
_logger.warning("API lookup failed for collection %s (%s: %s)",
self._manifest_locator, type(e), str(e))
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
- self._manifest_text = self._keep_client.get(self._manifest_locator)
- self._streams = []
- for stream_line in self._manifest_text.split("\n"):
- if stream_line != '':
- stream_tokens = stream_line.split()
- self._streams += [stream_tokens]
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ self._streams = [sline.split()
+ for sline in self._manifest_text.split("\n")
+ if sline]
self._streams = normalize(self)
# now regenerate the manifest text based on the normalized stream
#print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
+ self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
#print "result", self._manifest_text
def all_streams(self):
self._populate()
- resp = []
- for s in self._streams:
- resp.append(StreamReader(s, keep=self._keep_client))
- return resp
+ return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+ for s in self._streams]
def all_files(self):
for s in self.all_streams():
@@ -172,16 +195,34 @@ class CollectionReader(object):
def manifest_text(self, strip=False):
self._populate()
if strip:
- m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
+ m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
return m
else:
return self._manifest_text
-class CollectionWriter(object):
+
+class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
+ """Instantiate a CollectionWriter.
+
+ CollectionWriter lets you build a new Arvados Collection from scratch.
+ Write files to it. The CollectionWriter will upload data to Keep as
+ appropriate, and provide you with the Collection manifest text when
+ you're finished.
+
+ Arguments:
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ """
self._api_client = api_client
+ self.num_retries = num_retries
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
@@ -197,16 +238,9 @@ class CollectionWriter(object):
self._queued_dirents = deque()
self._queued_trees = deque()
- def __enter__(self):
- pass
-
def __exit__(self):
self.finish()
- def _prep_keep_client(self):
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
-
def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
@@ -303,7 +337,7 @@ class CollectionWriter(object):
for s in newdata:
self.write(s)
return
- self._data_buffer += [newdata]
+ self._data_buffer.append(newdata)
self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
@@ -311,10 +345,9 @@ class CollectionWriter(object):
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
- if data_buffer != '':
- self._prep_keep_client()
+ if data_buffer:
self._current_stream_locators.append(
- self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
@@ -342,9 +375,10 @@ class CollectionWriter(object):
(self._current_stream_length - self._current_file_pos,
self._current_file_pos,
self._current_stream_name))
- self._current_stream_files += [[self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name]]
+ self._current_stream_files.append([
+ self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name])
self._current_file_pos = self._current_stream_length
def start_new_stream(self, newstreamname='.'):
@@ -363,18 +397,18 @@ class CollectionWriter(object):
def finish_current_stream(self):
self.finish_current_file()
self.flush_data()
- if len(self._current_stream_files) == 0:
+ if not self._current_stream_files:
pass
- elif self._current_stream_name == None:
+ elif self._current_stream_name is None:
raise errors.AssertionError(
"Cannot finish an unnamed stream (%d bytes in %d files)" %
(self._current_stream_length, len(self._current_stream_files)))
else:
- if len(self._current_stream_locators) == 0:
- self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
- self._finished_streams += [[self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files]]
+ if not self._current_stream_locators:
+ self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+ self._finished_streams.append([self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files])
self._current_stream_files = []
self._current_stream_length = 0
self._current_stream_locators = []
@@ -384,8 +418,7 @@ class CollectionWriter(object):
def finish(self):
# Store the manifest in Keep and return its locator.
- self._prep_keep_client()
- return self._keep_client.put(self.manifest_text())
+ return self._my_keep().put(self.manifest_text())
def stripped_manifest(self):
"""
@@ -414,8 +447,8 @@ class CollectionWriter(object):
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- if len(manifest) > 0:
- return CollectionReader(manifest).manifest_text()
+ if manifest:
+ return CollectionReader(manifest, self._api_client).manifest_text()
else:
return ""
@@ -433,9 +466,10 @@ class ResumableCollectionWriter(CollectionWriter):
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(api_client)
+ super(ResumableCollectionWriter, self).__init__(
+ api_client, num_retries=num_retries)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 48fb013..0dbf9bc 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -20,9 +20,9 @@ def fake_httplib2_response(code, **headers):
reason=httplib.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
-def mock_responses(body, *codes):
+def mock_responses(body, *codes, **headers):
return mock.patch('httplib2.Http.request', side_effect=(
- (fake_httplib2_response(code), body) for code in codes))
+ (fake_httplib2_response(code, **headers), body) for code in codes))
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f066c5d..98a72f6 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -5,6 +5,7 @@
import arvados
import bz2
import copy
+import mock
import os
import pprint
import subprocess
@@ -12,7 +13,7 @@ import tempfile
import unittest
import run_test_server
-from arvados_testutil import ArvadosBaseTestCase
+import arvados_testutil as tutil
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
@@ -22,7 +23,7 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+ tutil.ArvadosBaseTestCase):
MAIN_SERVER = {}
@classmethod
@@ -620,5 +621,136 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
cwriter.write, "badtext")
+class CollectionTestMixin(object):
+ PROXY_RESPONSE = {
+ 'items_available': 1,
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-mockproxy012345',
+ 'owner_uuid': 'zzzzz-tpzed-mockowner012345',
+ 'service_host': tutil.TEST_HOST,
+ 'service_port': 65535,
+ 'service_ssl_flag': True,
+ 'service_type': 'proxy',
+ }]}
+ API_COLLECTIONS = run_test_server.fixture('collections')
+ DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
+ DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
+ DEFAULT_MANIFEST = DEFAULT_COLLECTION['manifest_text']
+ DEFAULT_UUID = DEFAULT_COLLECTION['uuid']
+
+ def _mock_api_call(self, mock_method, code, body):
+ mock_method = mock_method().execute
+ if code == 200:
+ mock_method.return_value = body
+ else:
+ mock_method.side_effect = arvados.errors.ApiError(
+ tutil.fake_httplib2_response(code), "{}")
+
+ def mock_keep_services(self, api_mock, code, body):
+ self._mock_api_call(api_mock.keep_services().accessible, code, body)
+
+ def api_client_mock(self, code=200):
+ client = mock.MagicMock(name='api_client')
+ self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+ return client
+
+
+ at tutil.skip_sleep
+class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_get_collection(self, api_mock, code, body):
+ body = self.API_COLLECTIONS.get(body)
+ self._mock_api_call(api_mock.collections().get, code, body)
+
+ def api_client_mock(self, code=200):
+ client = super(CollectionReaderTestCase, self).api_client_mock(code)
+ self.mock_get_collection(client, code, 'foo_file')
+ return client
+
+ def test_init_no_default_retries(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ reader.manifest_text()
+ client.collections().get().execute.assert_called_with(num_retries=0)
+
+ def test_uuid_init_success(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ self.assertEqual(self.DEFAULT_COLLECTION['manifest_text'],
+ reader.manifest_text())
+ client.collections().get().execute.assert_called_with(num_retries=3)
+
+ def test_uuid_init_failure_raises_api_error(self):
+ client = self.api_client_mock(500)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ with self.assertRaises(arvados.errors.ApiError):
+ reader.manifest_text()
+
+ def test_locator_init(self):
+ client = self.api_client_mock(200)
+ # Ensure Keep will not return anything if asked.
+ with tutil.mock_responses(None, 404):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_locator_init_falls_back_to_keep(self):
+ # Reading manifests from Keep is deprecated. Feel free to
+ # remove this test when we remove the fallback.
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 404, None)
+ with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client, num_retries=3)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_init_num_retries_propagated(self):
+ # More of an integration test...
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ with tutil.mock_responses('foo', 500, 500, 200):
+ self.assertEqual('foo',
+ ''.join(f.read(9) for f in reader.all_files()))
+
+
+ at tutil.skip_sleep
+class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_keep(self, body, *codes, **headers):
+ headers.setdefault('x-keep-replicas-stored', 2)
+ return tutil.mock_responses(body, *codes, **headers)
+
+ def foo_writer(self, **kwargs):
+ api_client = self.api_client_mock()
+ writer = arvados.CollectionWriter(api_client, **kwargs)
+ writer.start_new_file('foo')
+ writer.write('foo')
+ return writer
+
+ def test_write_whole_collection(self):
+ writer = self.foo_writer()
+ with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_write_no_default(self):
+ writer = self.foo_writer()
+ with self.mock_keep(None, 500):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ writer.finish()
+
+ def test_write_whole_collection_through_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ with self.mock_keep(self.DEFAULT_DATA_HASH,
+ 500, 500, 200, 500, 500, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_flush_data_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ foo_hash = self.DEFAULT_MANIFEST.split()[1]
+ with self.mock_keep(foo_hash, 500, 200):
+ writer.flush_data()
+ self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index f49b947..d5523e8 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -31,14 +31,15 @@ class SafeApi(object):
def __init__(self, config):
self.host = config.get('ARVADOS_API_HOST')
- self.token = config.get('ARVADOS_API_TOKEN')
+ self.api_token = config.get('ARVADOS_API_TOKEN')
self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
self.local = threading.local()
self.block_cache = arvados.KeepBlockCache()
def localapi(self):
if 'api' not in self.local.__dict__:
- self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
+ self.local.api = arvados.api('v1', False, self.host,
+ self.api_token, self.insecure)
return self.local.api
def localkeep(self):
@@ -46,17 +47,13 @@ class SafeApi(object):
self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
return self.local.keep
- def collections(self):
- return self.localapi().collections()
-
- def links(self):
- return self.localapi().links()
-
- def groups(self):
- return self.localapi().groups()
+ def __getattr__(self, name):
+ # Proxy nonexistent attributes to the local API client.
+ try:
+ return getattr(self.localapi(), name)
+ except AttributeError:
+ return super(SafeApi, self).__getattr__(name)
- def users(self):
- return self.localapi().users()
def convertTime(t):
'''Parse Arvados timestamp to unix time.'''
commit 1f354e0ba1b5b23c2d36c0cb60451260b29e1d3f
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 09:28:47 2014 -0400
3147: Add retry support to PySDK StreamReader classes.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 9522976..323251d 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -707,7 +707,10 @@ class KeepClient(object):
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, copies, thread_limiter.done()))
- def local_store_put(self, data):
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -716,7 +719,7 @@ class KeepClient(object):
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=0):
try:
locator = KeepLocator(loc_s)
except ValueError:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index b98937f..04b6b81 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -18,6 +18,7 @@ import time
import threading
import collections
+from arvados.retry import retry_method
from keep import *
import config
import errors
@@ -108,6 +109,7 @@ class StreamFileReader(object):
self.segments = segments
self._name = name
self._filepos = 0L
+ self.num_retries = stream.num_retries
def name(self):
return self._name
@@ -128,7 +130,8 @@ class StreamFileReader(object):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size):
+ @retry_method
+ def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
@@ -137,52 +140,58 @@ class StreamFileReader(object):
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream.readfrom(locator+segmentoffset, segmentsize)
+ data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
self._filepos += len(data)
return data
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
+ data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
- def readall(self, size=2**20):
+ @retry_method
+ def readall(self, size=2**20, num_retries=None):
while True:
- data = self.read(size)
+ data = self.read(size, num_retries=num_retries)
if data == '':
break
yield data
- def decompress(self, decompress, size):
- for segment in self.readall(size):
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
data = decompress(segment)
if data and data != '':
yield data
- def readall_decompressed(self, size=2**20):
+ @retry_method
+ def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
if re.search('\.bz2$', self._name):
dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
elif re.search('\.gz$', self._name):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
else:
- return self.readall(size)
+ return self.readall(size, num_retries=num_retries)
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
+ @retry_method
+ def readlines(self, decompress=True, num_retries=None):
+ read_func = self.readall_decompressed if decompress else self.readall
data = ''
- for newdata in datasource:
+ for newdata in read_func(num_retries=num_retries):
data += newdata
sol = 0
while True:
@@ -201,12 +210,15 @@ class StreamFileReader(object):
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
self._keep = keep
+ self.num_retries = num_retries
streamoffset = 0L
@@ -254,16 +266,17 @@ class StreamReader(object):
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
if self._keep is None:
- self._keep = KeepClient()
- data = ''
+ self._keep = KeepClient(num_retries=self.num_retries)
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- return data
+ data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+ return ''.join(data)
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 284854b..f066c5d 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -349,8 +349,9 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
class MockStreamReader(object):
def __init__(self, content):
self.content = content
+ self.num_retries = 0
- def readfrom(self, start, size):
+ def readfrom(self, start, size, num_retries=0):
return self.content[start:start+size]
def test_file_stream(self):
@@ -422,7 +423,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator):
+ def get(self, locator, num_retries=0):
return self.content[locator]
def test_stream_reader(self):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 060a26a..3970d67 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,18 +1,82 @@
#!/usr/bin/env python
+import mock
import unittest
import arvados
from arvados import StreamReader, StreamFileReader
+import arvados_testutil as tutil
import run_test_server
-class StreamReaderTestCase(unittest.TestCase):
+class StreamRetryTestMixin(object):
+ # Define reader_for(coll_name, **kwargs)
+ # and read_for_test(reader, size, **kwargs).
API_COLLECTIONS = run_test_server.fixture('collections')
+ def keep_client(self):
+ return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
+ local_store='')
+
def manifest_for(self, coll_name):
return self.API_COLLECTIONS[coll_name]['manifest_text']
+ @tutil.skip_sleep
+ def test_success_without_retries(self):
+ reader = self.reader_for('bar_file')
+ with tutil.mock_responses('bar', 200):
+ self.assertEqual('bar', self.read_for_test(reader, 3))
+
+ @tutil.skip_sleep
+ def test_read_no_default_retry(self):
+ reader = self.reader_for('user_agreement')
+ with tutil.mock_responses('', 500):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 10)
+
+ @tutil.skip_sleep
+ def test_read_with_instance_retries(self):
+ reader = self.reader_for('foo_file', num_retries=3)
+ with tutil.mock_responses('foo', 500, 200):
+ self.assertEqual('foo', self.read_for_test(reader, 3))
+
+ @tutil.skip_sleep
+ def test_read_with_method_retries(self):
+ reader = self.reader_for('foo_file')
+ with tutil.mock_responses('foo', 500, 200):
+ self.assertEqual('foo',
+ self.read_for_test(reader, 3, num_retries=3))
+
+ @tutil.skip_sleep
+ def test_read_instance_retries_exhausted(self):
+ reader = self.reader_for('bar_file', num_retries=3)
+ with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 3)
+
+ @tutil.skip_sleep
+ def test_read_method_retries_exhausted(self):
+ reader = self.reader_for('bar_file')
+ with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 3, num_retries=3)
+
+ @tutil.skip_sleep
+ def test_method_retries_take_precedence(self):
+ reader = self.reader_for('user_agreement', num_retries=10)
+ with tutil.mock_responses('', 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 10, num_retries=1)
+
+
+class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+ def reader_for(self, coll_name, **kwargs):
+ return StreamReader(self.manifest_for(coll_name).split(),
+ self.keep_client(), **kwargs)
+
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.readfrom(0, byte_count, **kwargs)
+
def test_manifest_text_without_keep_client(self):
mtext = self.manifest_for('multilevel_collection_1')
for line in mtext.rstrip('\n').split('\n'):
@@ -20,5 +84,34 @@ class StreamReaderTestCase(unittest.TestCase):
self.assertEqual(line + '\n', reader.manifest_text())
+class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+ def reader_for(self, coll_name, **kwargs):
+ return StreamReader(self.manifest_for(coll_name).split(),
+ self.keep_client(), **kwargs).all_files()[0]
+
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.read(byte_count, **kwargs)
+
+
+class StreamFileReadFromTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.readfrom(0, byte_count, **kwargs)
+
+
+class StreamFileReadAllTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readall(**kwargs))
+
+
+class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readall_decompressed(**kwargs))
+
+
+class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readlines(**kwargs))
+
+
if __name__ == '__main__':
unittest.main()
commit 80c57e3536f41d8419f580b577776d85209f6111
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:24:31 2014 -0400
3147: Make PySDK KeepClient.get and put retry_methods.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 64d82ba..9522976 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -392,7 +392,8 @@ class KeepClient(object):
def __init__(self, api_client=None, proxy=None, timeout=300,
- api_token=None, local_store=None, block_cache=None):
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
"""Initialize a new KeepClient.
Arguments:
@@ -416,6 +417,9 @@ class KeepClient(object):
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
@@ -436,7 +440,7 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.timeout = timeout
-
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -558,7 +562,8 @@ class KeepClient(object):
else:
return None
- def get(self, loc_s, num_retries=0):
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
@@ -575,7 +580,8 @@ class KeepClient(object):
*each* Keep server if it returns temporary failures, with
exponential backoff. Note that, in each loop, the method may try
to fetch data from every available Keep service, along with any
- that are named in location hints in the locator. Default 0.
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
@@ -638,7 +644,8 @@ class KeepClient(object):
else:
raise arvados.errors.KeepReadError(loc_s)
- def put(self, data, copies=2, num_retries=0):
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
@@ -653,7 +660,8 @@ class KeepClient(object):
Default 2.
* num_retries: The number of times to retry PUT requests to
*each* Keep server if it returns temporary failures, with
- exponential backoff. Default 0.
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
"""
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f6abe2b..284854b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -419,7 +419,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
class MockKeep(object):
- def __init__(self, content):
+ def __init__(self, content, num_retries=0):
self.content = content
def get(self, locator):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9a01d86..8cc22a6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -236,8 +236,13 @@ class KeepClientRetryTestMixin(object):
TEST_DATA = 'testdata'
TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
- def new_client(self):
- return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+ def setUp(self):
+ self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
+
+ def new_client(self, **caller_kwargs):
+ kwargs = self.client_kwargs.copy()
+ kwargs.update(caller_kwargs)
+ return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
raise NotImplementedError("test subclasses must define run_method")
@@ -272,6 +277,11 @@ class KeepClientRetryTestMixin(object):
with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
self.check_exception(num_retries=1)
+ def test_num_retries_instance_fallback(self):
+ self.client_kwargs['num_retries'] = 3
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_success()
+
@tutil.skip_sleep
class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
commit 5a9aaf3f32fc0414ae16aed17a77388cf4af9f90
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 08:49:21 2014 -0400
3147: Add retry_method to the Python SDK.
This gives us a way to make retry support flexible and consistent
across SDK classes. Any class that has retryable operations should
take a num_retries argument at initialization. Then, the specific
methods that implement those operations should also take a num_retries
argument, which will fall back to the instance's setting by default.
This lets SDK users express their retry preferences wherever it's most
convenient for them.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 5dc31ae..3d2fc48 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python
+import functools
+import inspect
import time
from collections import deque
@@ -138,3 +140,19 @@ def check_http_response_success(result):
return False
else:
return None # Get well soon, server.
+
+def retry_method(orig_func):
+ """Provide a default value for a method's num_retries argument.
+
+ This is a decorator for instance and class methods that accept a
+ num_retries argument, with a None default. When the method is called
+ without a value for num_retries, it will be set from the underlying
+ instance or class' num_retries attribute.
+ """
+ @functools.wraps(orig_func)
+ def num_retries_setter(self, *args, **kwargs):
+ arg_vals = inspect.getcallargs(orig_func, self, *args, **kwargs)
+ if arg_vals['num_retries'] is None:
+ kwargs['num_retries'] = self.num_retries
+ return orig_func(self, *args, **kwargs)
+ return num_retries_setter
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index ed0a406..8c3916b 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -194,5 +194,32 @@ class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
self.check_is(None, 0, 99, 600, -200)
+class RetryMethodTestCase(unittest.TestCase):
+ class Tester(object):
+ def __init__(self):
+ self.num_retries = 1
+
+ @arv_retry.retry_method
+ def check(self, a, num_retries=None, z=0):
+ return (a, num_retries, z)
+
+
+ def test_positional_arg_passed(self):
+ self.assertEqual((3, 2, 0), self.Tester().check(3, 2))
+
+ def test_keyword_arg_passed(self):
+ self.assertEqual((4, 3, 0), self.Tester().check(num_retries=3, a=4))
+
+ def test_not_specified(self):
+ self.assertEqual((0, 1, 0), self.Tester().check(0))
+
+ def test_not_specified_with_other_kwargs(self):
+ self.assertEqual((1, 1, 1), self.Tester().check(1, z=1))
+
+ def test_bad_call(self):
+ with self.assertRaises(TypeError):
+ self.Tester().check(num_retries=2)
+
+
if __name__ == '__main__':
unittest.main()
commit af2c75821456e28875dc8006efb03f38ec1a74b8
Author: Brett Smith <brett at curoverse.com>
Date: Thu Sep 11 14:46:37 2014 -0400
3147: Fix variable name typo.
This variable was renamed in previous branch work, but this last user
of it wasn't caught until just now.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 22bf327..64d82ba 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -316,7 +316,8 @@ class KeepClient(object):
resp_md5 = hashlib.md5(content).hexdigest()
if resp_md5 == locator.md5sum:
return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
return None
def put(self, http, hash_s, body):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 4ed2912..9a01d86 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,6 +312,12 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
+ def test_retry_data_with_wrong_checksum(self):
+ side_effects = ((tutil.fake_httplib2_response(200), s)
+ for s in ['baddata', self.TEST_DATA])
+ with mock.patch('httplib2.Http.request', side_effect=side_effects):
+ self.check_success(locator=self.HINTED_LOCATOR)
+
@tutil.skip_sleep
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
commit 9facc2aeba6770ee80730001ca66bdaec8bde738
Author: Brett Smith <brett at curoverse.com>
Date: Thu Sep 11 14:22:12 2014 -0400
3147: PySDK StreamReader instantiates a KeepClient late.
Similarly with API client instantiation in KeepClient, this helps with
testing. Refs #3693.
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index e13e1a6..b98937f 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -206,9 +206,6 @@ class StreamReader(object):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
-
- if keep is None:
- keep = KeepClient()
self._keep = keep
streamoffset = 0L
@@ -261,6 +258,8 @@ class StreamReader(object):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
+ if self._keep is None:
+ self._keep = KeepClient()
data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
new file mode 100644
index 0000000..060a26a
--- /dev/null
+++ b/sdk/python/tests/test_stream.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvados
+from arvados import StreamReader, StreamFileReader
+
+import run_test_server
+
+class StreamReaderTestCase(unittest.TestCase):
+ API_COLLECTIONS = run_test_server.fixture('collections')
+
+ def manifest_for(self, coll_name):
+ return self.API_COLLECTIONS[coll_name]['manifest_text']
+
+ def test_manifest_text_without_keep_client(self):
+ mtext = self.manifest_for('multilevel_collection_1')
+ for line in mtext.rstrip('\n').split('\n'):
+ reader = StreamReader(line.split())
+ self.assertEqual(line + '\n', reader.manifest_text())
+
+
+if __name__ == '__main__':
+ unittest.main()
commit 8d554a2734be8bbe015d38a48494f7699f2de439
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 16:46:54 2014 -0400
3147: Move PySDK HTTP test infrastructure to testutil.
Making this available to other test collections.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 77146db..48fb013 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -3,16 +3,27 @@
import errno
import httplib
import httplib2
+import mock
import os
import shutil
import tempfile
import unittest
+# Use this hostname when you want to make sure the traffic will be
+# instantly refused. 100::/64 is a dedicated black hole.
+TEST_HOST = '100::'
+
+skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me
+
def fake_httplib2_response(code, **headers):
headers.update(status=str(code),
reason=httplib.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
+def mock_responses(body, *codes):
+ return mock.patch('httplib2.Http.request', side_effect=(
+ (fake_httplib2_response(code), body) for code in codes))
+
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 900143e..4ed2912 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -5,8 +5,8 @@ import unittest
import arvados
import arvados.retry
+import arvados_testutil as tutil
import run_test_server
-from arvados_testutil import fake_httplib2_response
class KeepTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
@@ -232,15 +232,10 @@ class KeepClientRetryTestMixin(object):
# supporting servers, and prevents side effects in case something hiccups.
# To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
# run_method().
- PROXY_ADDR = 'http://[100::]/'
+ PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
TEST_DATA = 'testdata'
TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
- @staticmethod
- def mock_responses(body, *codes):
- return mock.patch('httplib2.Http.request', side_effect=(
- (fake_httplib2_response(code), body) for code in codes))
-
def new_client(self):
return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
@@ -258,30 +253,28 @@ class KeepClientRetryTestMixin(object):
self.assertRaises(error_class, self.run_method, *args, **kwargs)
def test_immediate_success(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
self.check_success()
def test_retry_then_success(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
self.check_success(num_retries=3)
def test_no_default_retry(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
self.check_exception()
def test_no_retry_after_permanent_error(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 403, 200):
self.check_exception(num_retries=3)
def test_error_after_retries_exhausted(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
self.check_exception(num_retries=1)
-# Don't delay from HTTPRetryLoop's exponential backoff.
-no_backoff = mock.patch('time.sleep', lambda n: None)
- at no_backoff
-class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
@@ -291,7 +284,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
return self.new_client().get(locator, *args, **kwargs)
def test_specific_exception_when_not_found(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200):
self.check_exception(arvados.errors.NotFoundError, num_retries=3)
def test_general_exception_with_mixed_errors(self):
@@ -300,7 +293,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
client = self.new_client()
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.get(self.HINTED_LOCATOR)
self.assertNotIsInstance(
@@ -308,19 +301,20 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
"mixed errors raised NotFoundError")
def test_hint_server_can_succeed_without_retries(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- side_effects = [socket.timeout("timed out"),
- (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+ side_effects = [
+ socket.timeout("timed out"),
+ (tutil.fake_httplib2_response(200), self.DEFAULT_EXPECT)]
with mock.patch('httplib2.Http.request',
side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
- at no_backoff
-class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@@ -329,5 +323,5 @@ class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
return self.new_client().put(data, copies, *args, **kwargs)
def test_do_not_send_multiple_copies_to_same_server(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
self.check_exception(copies=2, num_retries=3)
commit f51c721f97c6e5ef4fda47e6d0fc45cdd2c2b77d
Author: Brett Smith <brett at curoverse.com>
Date: Wed Sep 10 16:44:25 2014 -0400
3147: PySDK tests use mock>=1.0 and easier mock side_effect.
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 30cc779..e0afe89 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
'ws4py'
],
test_suite='tests',
- tests_require=['mock', 'PyYAML'],
+ tests_require=['mock>=1.0', 'PyYAML'],
zip_safe=False)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 04bcab8..900143e 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,15 +312,10 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- responses = iter([None, (fake_httplib2_response(200),
- self.DEFAULT_EXPECT)])
- def side_effect(*args, **kwargs):
- response = next(responses)
- if response is None:
- raise socket.timeout("timed out")
- else:
- return response
- with mock.patch('httplib2.Http.request', side_effect=side_effect):
+ side_effects = [socket.timeout("timed out"),
+ (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+ with mock.patch('httplib2.Http.request',
+ side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list