[ARVADOS] created: c19503ef39f3588e3fe2113816e84c0e8dce4074
git at public.curoverse.com
git at public.curoverse.com
Tue Sep 16 18:04:24 EDT 2014
at c19503ef39f3588e3fe2113816e84c0e8dce4074 (commit)
commit c19503ef39f3588e3fe2113816e84c0e8dce4074
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 18:13:40 2014 -0400
3147: crunch-job retries data uploads more for longer-running jobs.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 83931f2..56a3637 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1206,7 +1206,8 @@ sub collate_output
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
+ my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
+ '--retries', put_retry_count());
my $joboutput;
for (@jobstep)
{
@@ -1346,8 +1347,9 @@ sub save_meta
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
$local_logfile->flush;
- my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
- . quotemeta($local_logfile->filename);
+ my $retry_count = put_retry_count();
+ my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
+ "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
my $loglocator = `$cmd`;
die "system $cmd failed: $?" if $?;
chomp($loglocator);
@@ -1493,6 +1495,20 @@ sub find_docker_image {
}
}
+sub put_retry_count {
+ # Calculate a --retries argument for arv-put that will have it try
+ # approximately as long as this Job has been running.
+ my $stoptime = shift || time;
+ my $starttime = $jobstep[0]->{starttime};
+ my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
+ my $retries = 0;
+ while ($timediff >= 2) {
+ $retries++;
+ $timediff /= 2;
+ }
+ return ($retries > 3) ? $retries : 3;
+}
+
__DATA__
#!/usr/bin/perl
commit df8ac6c19c7c1d0ce4f988fec4969df5e5f4909d
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:29:38 2014 -0400
3147: Add retry support to FUSE driver.
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index d5523e8..e40b88c 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -289,10 +289,11 @@ class Directory(FreshBase):
class CollectionDirectory(Directory):
'''Represents the root of a directory tree holding a collection.'''
- def __init__(self, parent_inode, inodes, api, collection):
+ def __init__(self, parent_inode, inodes, api, num_retries, collection):
super(CollectionDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.collection_object_file = None
self.collection_object = None
if isinstance(collection, dict):
@@ -310,7 +311,9 @@ class CollectionDirectory(Directory):
self.collection_object_file.update(self.collection_object)
self.clear()
- collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
+ collection = arvados.CollectionReader(
+ self.collection_object["manifest_text"], self.api,
+ self.api.localkeep(), num_retries=self.num_retries)
for s in collection.all_streams():
cwd = self
for part in s.name().split('/'):
@@ -328,7 +331,9 @@ class CollectionDirectory(Directory):
return True
with llfuse.lock_released:
- new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+ new_collection_object = self.api.collections().get(
+ uuid=self.collection_locator
+ ).execute(num_retries=self.num_retries)
if "portable_data_hash" not in new_collection_object:
new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
# end with llfuse.lock_released, re-acquire lock
@@ -386,10 +391,11 @@ class MagicDirectory(Directory):
to readdir().
'''
- def __init__(self, parent_inode, inodes, api):
+ def __init__(self, parent_inode, inodes, api, num_retries):
super(MagicDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
# Have to defer creating readme_file because at this point we don't
# yet have an inode assigned.
self.readme_file = None
@@ -418,7 +424,8 @@ will appear if it exists.
return False
try:
- e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
+ e = self.inodes.add_entry(CollectionDirectory(
+ self.inode, self.inodes, self.api, self.num_retries, k))
if e.update():
self._entries[k] = e
return True
@@ -457,21 +464,25 @@ class RecursiveInvalidateDirectory(Directory):
class TagsDirectory(RecursiveInvalidateDirectory):
'''A special directory that contains as subdirectories all tags visible to the user.'''
- def __init__(self, parent_inode, inodes, api, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
super(TagsDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+ tags = self.api.links().list(
+ filters=[['link_class', '=', 'tag']],
+ select=['name'], distinct=True
+ ).execute(num_retries=self.num_retries)
if "items" in tags:
self.merge(tags['items'],
lambda i: i['name'] if 'name' in i else i['uuid'],
lambda a, i: a.tag == i,
- lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
class TagDirectory(Directory):
@@ -479,45 +490,51 @@ class TagDirectory(Directory):
to the user that are tagged with a particular tag.
'''
- def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, tag,
+ poll=False, poll_time=60):
super(TagDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.tag = tag
self._poll = poll
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']).execute()
+ taggedcollections = self.api.links().list(
+ filters=[['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection']],
+ select=['head_uuid']
+ ).execute(num_retries=self.num_retries)
self.merge(taggedcollections['items'],
lambda i: i['head_uuid'],
lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
+ lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
class ProjectDirectory(Directory):
'''A special directory that contains the contents of a project.'''
- def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+ poll=False, poll_time=60):
super(ProjectDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
self.project_object = project_object
self.project_object_file = None
self.uuid = project_object['uuid']
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i)
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
@@ -557,13 +574,19 @@ class ProjectDirectory(Directory):
with llfuse.lock_released:
if group_uuid_pattern.match(self.uuid):
- self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+ self.project_object = self.api.groups().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
elif user_uuid_pattern.match(self.uuid):
- self.project_object = self.api.users().get(uuid=self.uuid).execute()
+ self.project_object = self.api.users().get(
+ uuid=self.uuid).execute(num_retries=self.num_retries)
- contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+ contents = arvados.util.list_all(self.api.groups().contents,
+ self.num_retries, uuid=self.uuid)
# Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
- contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+ contents += arvados.util.list_all(
+ self.api.links().list, self.num_retries,
+ filters=[['tail_uuid', '=', self.uuid],
+ ['link_class', '=', 'name']])
# end with llfuse.lock_released, re-acquire lock
@@ -589,17 +612,21 @@ class ProjectDirectory(Directory):
class SharedDirectory(Directory):
'''A special directory that represents users or groups who have shared projects with me.'''
- def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
+ def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+ poll=False, poll_time=60):
super(SharedDirectory, self).__init__(parent_inode)
- self.current_user = api.users().current().execute()
self.inodes = inodes
self.api = api
+ self.num_retries = num_retries
+ self.current_user = api.users().current().execute(num_retries=num_retries)
self._poll = True
self._poll_time = poll_time
def update(self):
with llfuse.lock_released:
- all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+ all_projects = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['group_class','=','project']])
objects = {}
for ob in all_projects:
objects[ob['uuid']] = ob
@@ -611,8 +638,12 @@ class SharedDirectory(Directory):
roots.append(ob)
root_owners[ob['owner_uuid']] = True
- lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+ lusers = arvados.util.list_all(
+ self.api.users().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
users = {}
groups = {}
@@ -641,7 +672,7 @@ class SharedDirectory(Directory):
self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception as e:
_logger.exception(e)
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index e07860d..e92b1b4 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -9,6 +9,7 @@ import signal
import subprocess
import time
+import arvados.commands._util as arv_cmd
from arvados_fuse import *
logger = logging.getLogger('arvados.arv-mount')
@@ -16,6 +17,7 @@ logger = logging.getLogger('arvados.arv-mount')
if __name__ == '__main__':
# Handle command line parameters
parser = argparse.ArgumentParser(
+ parents=[arv_cmd.retry_opt],
description='''Mount Keep data under the local filesystem. Default mode is --home''',
epilog="""
Note: When using the --exec feature, you must either specify the
@@ -80,29 +82,42 @@ with "--".
operations = Operations(os.getuid(), os.getgid())
api = SafeApi(arvados.config)
- usr = api.users().current().execute()
+ usr = api.users().current().execute(num_retries=args.retries)
now = time.time()
+ dir_class = None
+ dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
if args.by_id:
# Set up the request handler with the 'magic directory' at the root
- operations.inodes.add_entry(MagicDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+ dir_class = MagicDirectory
elif args.by_tag:
- operations.inodes.add_entry(TagsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+ dir_class = TagsDirectory
elif args.shared:
- operations.inodes.add_entry(SharedDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
+ dir_class = SharedDirectory
+ dir_args.append(usr)
elif args.home:
- operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
- elif args.collection != None:
+ dir_class = ProjectDirectory
+ dir_args.append(usr)
+ elif args.collection is not None:
# Set up the request handler with the collection at the root
- operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
- elif args.project != None:
- operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
+ dir_class = CollectionDirectory
+ dir_args.append(args.collection)
+ elif args.project is not None:
+ dir_class = ProjectDirectory
+ dir_args.append(api.groups().get(uuid=args.project).execute(
+ num_retries=args.retries))
+
+ if dir_class is not None:
+ operations.inodes.add_entry(dir_class(*dir_args))
else:
e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+ dir_args[0] = e.inode
- e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(e.inode, operations.inodes, api, usr))
- e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(e.inode, operations.inodes, api, usr))
- e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(e.inode, operations.inodes, api))
- e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
+ e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+ e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
+
+ dir_args.append(usr)
+ e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(*dir_args))
+ e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(*dir_args))
text = '''
Welcome to Arvados! This directory provides file system access to files and objects
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 55f565b..789bfe3 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -73,7 +73,7 @@ class FuseMountTest(MountTestBase):
def runTest(self):
# Create the request handler
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.testcollection))
+ e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -128,7 +128,7 @@ class FuseMagicTest(MountTestBase):
def runTest(self):
# Create the request handler
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+ e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
self.mounttmp = tempfile.mkdtemp()
@@ -163,7 +163,7 @@ class FuseMagicTest(MountTestBase):
class FuseTagsTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+ e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -188,7 +188,7 @@ class FuseTagsTest(MountTestBase):
class FuseTagsUpdateTest(MountTestBase):
def runRealTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, poll_time=1))
+ e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -241,7 +241,7 @@ class FuseTagsUpdateTest(MountTestBase):
class FuseSharedTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()['uuid']))
+ e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
@@ -280,7 +280,7 @@ class FuseSharedTest(MountTestBase):
class FuseHomeTest(MountTestBase):
def runTest(self):
operations = fuse.Operations(os.getuid(), os.getgid())
- e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()))
+ e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
llfuse.init(operations, self.mounttmp, [])
t = threading.Thread(None, lambda: llfuse.main())
commit 2ffac087fc1e34fcc0e580765f724b6a42b9ad34
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 10:58:11 2014 -0400
3147: FUSE driver requires a Python SDK with retry support.
I also took out some of the older log-handling code while I was at it,
since the stricter versioning makes it unnecessary.
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 76d026e..e07860d 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -59,29 +59,20 @@ with "--".
else:
daemon_ctx = None
- # Configure a logger based on command-line switches.
- # If we're using a contemporary Python SDK (mid-August 2014),
- # configure the arvados hierarchy logger.
- # Otherwise, configure the program root logger.
- base_logger = getattr(arvados, 'logger', None)
-
+ # Configure a log handler based on command-line switches.
if args.logfile:
log_handler = logging.FileHandler(args.logfile)
elif daemon_ctx:
log_handler = logging.NullHandler()
- elif base_logger:
- log_handler = arvados.log_handler
else:
- log_handler = logging.StreamHandler()
+ log_handler = None
- if base_logger is None:
- base_logger = logging.getLogger()
- else:
- base_logger.removeHandler(arvados.log_handler)
- base_logger.addHandler(log_handler)
+ if log_handler is not None:
+ arvados.logger.removeHandler(arvados.log_handler)
+ arvados.logger.addHandler(log_handler)
if args.debug:
- base_logger.setLevel(logging.DEBUG)
+ arvados.logger.setLevel(logging.DEBUG)
logger.debug("arv-mount debugging enabled")
try:
@@ -103,7 +94,7 @@ with "--".
elif args.collection != None:
# Set up the request handler with the collection at the root
operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
- elif args.project != None:
+ elif args.project != None:
operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
else:
e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
@@ -114,8 +105,8 @@ with "--".
e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
text = '''
-Welcome to Arvados! This directory provides file system access to files and objects
-available on the Arvados installation located at '{}'
+Welcome to Arvados! This directory provides file system access to files and objects
+available on the Arvados installation located at '{}'
using credentials for user '{}'.
From here, the following directories are available:
diff --git a/services/fuse/setup.py b/services/fuse/setup.py
index d9fe797..ba728e6 100644
--- a/services/fuse/setup.py
+++ b/services/fuse/setup.py
@@ -20,7 +20,7 @@ setup(name='arvados_fuse',
'bin/arv-mount'
],
install_requires=[
- 'arvados-python-client',
+ 'arvados-python-client>=0.1.1410902880', # 2014-09-16
'llfuse',
'python-daemon'
],
commit 8d3efbe312f8bb5f42cec903da6a882a484a1d0f
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 11:21:31 2014 -0400
3147: Add retry support to PySDK list_all utility.
diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index ada1aec..2609f11 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -338,12 +338,12 @@ def is_hex(s, *length_args):
good_len = True
return bool(good_len and HEX_RE.match(s))
-def list_all(fn, **kwargs):
+def list_all(fn, num_retries=0, **kwargs):
items = []
offset = 0
items_available = sys.maxint
while len(items) < items_available:
- c = fn(offset=offset, **kwargs).execute()
+ c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
items += c['items']
items_available = c['items_available']
offset = c['offset'] + len(c['items'])
commit f6c14d2b99e646f04a819a4a5df8b87ad55567e9
Author: Brett Smith <brett at curoverse.com>
Date: Mon Sep 15 10:37:32 2014 -0400
3147: Add retry support to Python CLI tools.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a06cad0..aa4973a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -147,8 +147,10 @@ class CollectionReader(CollectionBase):
if not util.portable_data_hash_pattern.match(
self._manifest_locator):
raise
- _logger.warning("API lookup failed for collection %s (%s: %s)",
- self._manifest_locator, type(e), str(e))
+ _logger.warning(
+ "API server did not return Collection '%s'. " +
+ "Trying to fetch directly from Keep (deprecated).",
+ self._manifest_locator)
self._manifest_text = self._my_keep().get(
self._manifest_locator, num_retries=self.num_retries)
self._streams = [sline.split()
diff --git a/sdk/python/arvados/commands/_util.py b/sdk/python/arvados/commands/_util.py
index f7cb80d..c42ee7a 100644
--- a/sdk/python/arvados/commands/_util.py
+++ b/sdk/python/arvados/commands/_util.py
@@ -1,8 +1,20 @@
#!/usr/bin/env python
+import argparse
import errno
import os
+def _pos_int(s):
+ num = int(s)
+ if num < 0:
+ raise ValueError("can't accept negative value: %s" % (num,))
+ return num
+
+retry_opt = argparse.ArgumentParser(add_help=False)
+retry_opt.add_argument('--retries', type=_pos_int, default=3, help="""
+Maximum number of times to retry server requests that encounter temporary
+failures (e.g., server down). Default 3.""")
+
def _ignore_error(error):
return None
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index c96e2cb..d0f60bf 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -47,7 +47,7 @@ keepdocker_parser.add_argument(
# --progress/--no-progress/--batch-progress and --resume/--no-resume.
arg_parser = argparse.ArgumentParser(
description="Upload or list Docker images in Arvados",
- parents=[keepdocker_parser, arv_put.run_opts])
+ parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
class DockerError(Exception):
pass
@@ -152,9 +152,10 @@ def prep_image_file(filename):
image_file = open(file_path, 'w+b' if need_save else 'rb')
return image_file, need_save
-def make_link(link_class, link_name, **link_attrs):
+def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
link_attrs.update({'link_class': link_class, 'name': link_name})
- return arvados.api('v1').links().create(body=link_attrs).execute()
+ return api_client.links().create(body=link_attrs).execute(
+ num_retries=num_retries)
def ptimestamp(t):
s = t.split(".")
@@ -162,8 +163,10 @@ def ptimestamp(t):
t = s[0] + s[1][-1:]
return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%SZ")
-def list_images_in_arv():
- existing_links = arvados.api('v1').links().list(filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]).execute()['items']
+def list_images_in_arv(api_client, num_retries):
+ existing_links = api_client.links().list(
+ filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]
+ ).execute(num_retries=num_retries)['items']
images = {}
for link in existing_links:
collection_uuid = link["head_uuid"]
@@ -196,9 +199,10 @@ def list_images_in_arv():
def main(arguments=None):
args = arg_parser.parse_args(arguments)
+ api = arvados.api('v1')
if args.image is None or args.image == 'images':
- list_images_in_arv()
+ list_images_in_arv(api, args.retries)
sys.exit(0)
# Pull the image if requested, unless the image is specified as a hash
@@ -222,30 +226,35 @@ def main(arguments=None):
else:
collection_name = args.name
- api = arvados.api('v1')
-
if not args.force:
# Check if this image is already in Arvados.
# Project where everything should be owned
- parent_project_uuid = args.project_uuid if args.project_uuid else api.users().current().execute()['uuid']
+ if args.project_uuid:
+ parent_project_uuid = args.project_uuid
+ else:
+ parent_project_uuid = api.users().current().execute(
+ num_retries=args.retries)['uuid']
# Find image hash tags
existing_links = api.links().list(
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', '=', image_hash]]).execute()['items']
+ ['name', '=', image_hash]]
+ ).execute(num_retries=args.retries)['items']
if existing_links:
# get readable collections
collections = api.collections().list(
filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
- select=["uuid", "owner_uuid", "name", "manifest_text"]).execute()['items']
+ select=["uuid", "owner_uuid", "name", "manifest_text"]
+ ).execute(num_retries=args.retries)['items']
if collections:
# check for repo+tag links on these collections
existing_repo_tag = (api.links().list(
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=', image_repo_tag],
- ['head_uuid', 'in', collections]]).execute()['items']) if image_repo_tag else []
+ ['head_uuid', 'in', collections]]
+ ).execute(num_retries=args_retries)['items']) if image_repo_tag else []
# Filter on elements owned by the parent project
owned_col = [c for c in collections if c['owner_uuid'] == parent_project_uuid]
@@ -257,21 +266,25 @@ def main(arguments=None):
coll_uuid = owned_col[0]['uuid']
else:
# create new collection owned by the project
- coll_uuid = api.collections().create(body={"manifest_text": collections[0]['manifest_text'],
- "name": collection_name,
- "owner_uuid": parent_project_uuid},
- ensure_unique_name=True).execute()['uuid']
+ coll_uuid = api.collections().create(
+ body={"manifest_text": collections[0]['manifest_text'],
+ "name": collection_name,
+ "owner_uuid": parent_project_uuid},
+ ensure_unique_name=True
+ ).execute(num_retries=args.retries)['uuid']
link_base = {'owner_uuid': parent_project_uuid,
'head_uuid': coll_uuid }
if not owned_img:
# create image link owned by the project
- make_link('docker_image_hash', image_hash, **link_base)
+ make_link(api, args.retries,
+ 'docker_image_hash', image_hash, **link_base)
if not owned_rep and image_repo_tag:
# create repo+tag link owned by the project
- make_link('docker_image_repo+tag', image_repo_tag, **link_base)
+ make_link(api, args.retries, 'docker_image_repo+tag',
+ image_repo_tag, **link_base)
print(coll_uuid)
@@ -306,10 +319,10 @@ def main(arguments=None):
if args.project_uuid is not None:
link_base['owner_uuid'] = args.project_uuid
- make_link('docker_image_hash', image_hash, **link_base)
+ make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
if image_repo_tag:
- make_link('docker_image_repo+tag', image_repo_tag,
- **link_base)
+ make_link(api, args.retries,
+ 'docker_image_repo+tag', image_repo_tag, **link_base)
# Clean up.
image_file.close()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 15551fa..0099487 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -145,7 +145,7 @@ Do not continue interrupted uploads from cached state.
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
- parents=[upload_opts, run_opts])
+ parents=[upload_opts, run_opts, arv_cmd.retry_opt])
def parse_arguments(arguments):
args = arg_parser.parse_args(arguments)
@@ -246,23 +246,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
['bytes_written', '_seen_inputs'])
def __init__(self, cache=None, reporter=None, bytes_expected=None,
- api_client=None):
+ api_client=None, num_retries=0):
self.bytes_written = 0
self._seen_inputs = []
self.cache = cache
self.reporter = reporter
self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(api_client)
+ super(ArvPutCollectionWriter, self).__init__(
+ api_client, num_retries=num_retries)
@classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None):
+ def from_cache(cls, cache, reporter=None, bytes_expected=None,
+ num_retries=0):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected)
+ writer = cls.from_state(state, cache, reporter, bytes_expected,
+ num_retries=num_retries)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected)
+ return cls(cache, reporter, bytes_expected, num_retries=num_retries)
else:
return writer
@@ -348,18 +351,16 @@ def progress_writer(progress_func, outfile=sys.stderr):
def exit_signal_handler(sigcode, frame):
sys.exit(-sigcode)
-def desired_project_uuid(api_client, project_uuid):
- if project_uuid:
- if arvados.util.user_uuid_pattern.match(project_uuid):
- api_client.users().get(uuid=project_uuid).execute()
- return project_uuid
- elif arvados.util.group_uuid_pattern.match(project_uuid):
- api_client.groups().get(uuid=project_uuid).execute()
- return project_uuid
- else:
- raise ValueError("Not a valid project uuid: {}".format(project_uuid))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+ if not project_uuid:
+ query = api_client.users().current()
+ elif arvados.util.user_uuid_pattern.match(project_uuid):
+ query = api_client.users().get(uuid=project_uuid)
+ elif arvados.util.group_uuid_pattern.match(project_uuid):
+ query = api_client.groups().get(uuid=project_uuid)
else:
- return api_client.users().current().execute()['uuid']
+ raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+ return query.execute(num_retries=num_retries)['uuid']
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
global api_client
@@ -387,7 +388,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
# Determine the parent project
try:
- project_uuid = desired_project_uuid(api_client, args.project_uuid)
+ project_uuid = desired_project_uuid(api_client, args.project_uuid,
+ args.retries)
except (apiclient.errors.Error, ValueError) as error:
print >>stderr, error
sys.exit(1)
@@ -413,10 +415,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
sys.exit(1)
if resume_cache is None:
- writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+ writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
+ num_retries=args.retries)
else:
writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, bytes_expected)
+ resume_cache, reporter, bytes_expected, num_retries=args.retries)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
@@ -456,7 +459,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
'manifest_text': writer.manifest_text()
},
ensure_unique_name=True
- ).execute()
+ ).execute(num_retries=args.retries)
print >>stderr, "Collection saved as '%s'" % collection['name']
diff --git a/sdk/python/bin/arv-get b/sdk/python/bin/arv-get
index 0d403d1..ccf8442 100755
--- a/sdk/python/bin/arv-get
+++ b/sdk/python/bin/arv-get
@@ -9,6 +9,7 @@ import sys
import logging
import arvados
+import arvados.commands._util as arv_cmd
logger = logging.getLogger('arvados.arv-get')
@@ -17,7 +18,8 @@ def abort(msg, code=1):
exit(code)
parser = argparse.ArgumentParser(
- description='Copy data from Keep to a local file or pipe.')
+ description='Copy data from Keep to a local file or pipe.',
+ parents=[arv_cmd.retry_opt])
parser.add_argument('locator', type=str,
help="""
Collection locator, optionally with a file path or prefix.
@@ -123,36 +125,25 @@ collection = r.group(1)
get_prefix = r.group(2)
if args.r and not get_prefix:
get_prefix = os.sep
-
-todo = []
-todo_bytes = 0
api_client = arvados.api('v1')
+reader = arvados.CollectionReader(collection, num_retries=args.retries)
+
if not get_prefix:
- try:
- if not args.n:
- if not args.f and os.path.exists(args.destination):
- abort('Local file %s already exists.' % (args.destination,))
- with open(args.destination, 'wb') as f:
- try:
- c = api_client.collections().get(uuid=collection).execute()
- manifest = c['manifest_text']
- except Exception as e:
- logger.warning(
- "Collection %s not found. " +
- "Trying to fetch directly from Keep (deprecated).",
- collection)
- manifest = arvados.KeepClient(
- api_client=api_client).get(collection)
- f.write(manifest)
- sys.exit(0)
- except arvados.errors.NotFoundError as e:
- abort(e)
-
-reader = arvados.CollectionReader(collection)
+ if not args.n:
+ try:
+ manifest = reader.manifest_text()
+ except (arvados.errors.ApiError, arvados.errors.KeepReadError) as e:
+ abort(e)
+ if not args.f and os.path.exists(args.destination):
+ abort('Local file %s already exists.' % (args.destination,))
+ with open(args.destination, 'wb') as f:
+ f.write(manifest)
+ sys.exit(0)
# Scan the collection. Make an array of (stream, file, local
# destination filename) tuples, and add up total size to extract.
-
+todo = []
+todo_bytes = 0
try:
for s in reader.all_streams():
for f in s.all_files():
diff --git a/sdk/python/bin/arv-ls b/sdk/python/bin/arv-ls
index ee74bb8..382bfe8 100755
--- a/sdk/python/bin/arv-ls
+++ b/sdk/python/bin/arv-ls
@@ -1,27 +1,20 @@
#!/usr/bin/env python
import argparse
-import hashlib
-import os
-import re
-import string
-import sys
+
+import arvados
+import arvados.commands._util as arv_cmd
parser = argparse.ArgumentParser(
- description='List contents of a manifest')
+ description='List contents of a manifest',
+ parents=[arv_cmd.retry_opt])
parser.add_argument('locator', type=str,
- help="""
-Collection locator
-""")
-
+ help="Collection UUID or locator")
parser.add_argument('-s', action='store_true', help="""List file sizes, in KiB.""")
args = parser.parse_args()
-
-import arvados
-
-cr = arvados.CollectionReader(args.locator)
+cr = arvados.CollectionReader(args.locator, num_retries=args.retries)
for f in cr.all_files():
if args.s:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index c7b01bc..001add3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -440,14 +440,15 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
def test_check_real_project_found(self):
self.authorize_with('active')
- self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID),
+ self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
"did not correctly find test fixture project")
def test_check_error_finding_nonexistent_uuid(self):
BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
self.authorize_with('active')
try:
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
except ValueError as error:
self.assertIn(BAD_UUID, error.message)
else:
@@ -457,7 +458,8 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+ result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ 0)
def test_short_put_from_stdin(self):
# Have to run this as an integration test since arv-put can't
commit 434a44692c8027849f7194d08559ba7df665da5a
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:27:51 2014 -0400
3147: Add retry support to PySDK Collection objects.
This required updating the FUSE driver's SafeApi object to better
imitate a real API object, now that CollectionReader will pass it down
to the underlying KeepClient.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 496136e..a06cad0 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -91,10 +91,26 @@ def normalize(collection):
return normalized_streams
-class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
+class CollectionBase(object):
+ def __enter__(self):
+ pass
+
+ def __exit__(self):
+ pass
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ return self._keep_client
+
+
+class CollectionReader(CollectionBase):
+ def __init__(self, manifest_locator_or_text, api_client=None,
+ keep_client=None, num_retries=0):
self._api_client = api_client
self._keep_client = keep_client
+ self.num_retries = num_retries
if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
@@ -109,12 +125,6 @@ class CollectionReader(object):
"Argument to CollectionReader must be a manifest or a collection UUID")
self._streams = None
- def __enter__(self):
- pass
-
- def __exit__(self):
- pass
-
def _populate(self):
if self._streams is not None:
return
@@ -128,11 +138,10 @@ class CollectionReader(object):
# just like any other Collection lookup failure.
if self._api_client is None:
self._api_client = arvados.api('v1')
- self._keep_client = KeepClient(api_client=self._api_client)
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
+ self._keep_client = None # Make a new one with the new api.
c = self._api_client.collections().get(
- uuid=self._manifest_locator).execute()
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
self._manifest_text = c['manifest_text']
except Exception as e:
if not util.portable_data_hash_pattern.match(
@@ -140,29 +149,24 @@ class CollectionReader(object):
raise
_logger.warning("API lookup failed for collection %s (%s: %s)",
self._manifest_locator, type(e), str(e))
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
- self._manifest_text = self._keep_client.get(self._manifest_locator)
- self._streams = []
- for stream_line in self._manifest_text.split("\n"):
- if stream_line != '':
- stream_tokens = stream_line.split()
- self._streams += [stream_tokens]
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ self._streams = [sline.split()
+ for sline in self._manifest_text.split("\n")
+ if sline]
self._streams = normalize(self)
# now regenerate the manifest text based on the normalized stream
#print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
+ self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
#print "result", self._manifest_text
def all_streams(self):
self._populate()
- resp = []
- for s in self._streams:
- resp.append(StreamReader(s, keep=self._keep_client))
- return resp
+ return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+ for s in self._streams]
def all_files(self):
for s in self.all_streams():
@@ -172,16 +176,18 @@ class CollectionReader(object):
def manifest_text(self, strip=False):
self._populate()
if strip:
- m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
+ m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
return m
else:
return self._manifest_text
-class CollectionWriter(object):
+
+class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
self._api_client = api_client
+ self.num_retries = num_retries
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
@@ -197,16 +203,9 @@ class CollectionWriter(object):
self._queued_dirents = deque()
self._queued_trees = deque()
- def __enter__(self):
- pass
-
def __exit__(self):
self.finish()
- def _prep_keep_client(self):
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
-
def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
@@ -303,7 +302,7 @@ class CollectionWriter(object):
for s in newdata:
self.write(s)
return
- self._data_buffer += [newdata]
+ self._data_buffer.append(newdata)
self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
@@ -311,10 +310,9 @@ class CollectionWriter(object):
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
- if data_buffer != '':
- self._prep_keep_client()
+ if data_buffer:
self._current_stream_locators.append(
- self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
@@ -342,9 +340,10 @@ class CollectionWriter(object):
(self._current_stream_length - self._current_file_pos,
self._current_file_pos,
self._current_stream_name))
- self._current_stream_files += [[self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name]]
+ self._current_stream_files.append([
+ self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name])
self._current_file_pos = self._current_stream_length
def start_new_stream(self, newstreamname='.'):
@@ -363,18 +362,18 @@ class CollectionWriter(object):
def finish_current_stream(self):
self.finish_current_file()
self.flush_data()
- if len(self._current_stream_files) == 0:
+ if not self._current_stream_files:
pass
- elif self._current_stream_name == None:
+ elif self._current_stream_name is None:
raise errors.AssertionError(
"Cannot finish an unnamed stream (%d bytes in %d files)" %
(self._current_stream_length, len(self._current_stream_files)))
else:
- if len(self._current_stream_locators) == 0:
- self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
- self._finished_streams += [[self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files]]
+ if not self._current_stream_locators:
+ self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+ self._finished_streams.append([self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files])
self._current_stream_files = []
self._current_stream_length = 0
self._current_stream_locators = []
@@ -384,8 +383,7 @@ class CollectionWriter(object):
def finish(self):
# Store the manifest in Keep and return its locator.
- self._prep_keep_client()
- return self._keep_client.put(self.manifest_text())
+ return self._my_keep().put(self.manifest_text())
def stripped_manifest(self):
"""
@@ -414,8 +412,8 @@ class CollectionWriter(object):
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- if len(manifest) > 0:
- return CollectionReader(manifest).manifest_text()
+ if manifest:
+ return CollectionReader(manifest, self._api_client).manifest_text()
else:
return ""
@@ -433,9 +431,10 @@ class ResumableCollectionWriter(CollectionWriter):
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(api_client)
+ super(ResumableCollectionWriter, self).__init__(
+ api_client, num_retries=num_retries)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 48fb013..0dbf9bc 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -20,9 +20,9 @@ def fake_httplib2_response(code, **headers):
reason=httplib.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
-def mock_responses(body, *codes):
+def mock_responses(body, *codes, **headers):
return mock.patch('httplib2.Http.request', side_effect=(
- (fake_httplib2_response(code), body) for code in codes))
+ (fake_httplib2_response(code, **headers), body) for code in codes))
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f066c5d..98a72f6 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -5,6 +5,7 @@
import arvados
import bz2
import copy
+import mock
import os
import pprint
import subprocess
@@ -12,7 +13,7 @@ import tempfile
import unittest
import run_test_server
-from arvados_testutil import ArvadosBaseTestCase
+import arvados_testutil as tutil
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
@@ -22,7 +23,7 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+ tutil.ArvadosBaseTestCase):
MAIN_SERVER = {}
@classmethod
@@ -620,5 +621,136 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
cwriter.write, "badtext")
+class CollectionTestMixin(object):
+ PROXY_RESPONSE = {
+ 'items_available': 1,
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-mockproxy012345',
+ 'owner_uuid': 'zzzzz-tpzed-mockowner012345',
+ 'service_host': tutil.TEST_HOST,
+ 'service_port': 65535,
+ 'service_ssl_flag': True,
+ 'service_type': 'proxy',
+ }]}
+ API_COLLECTIONS = run_test_server.fixture('collections')
+ DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
+ DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
+ DEFAULT_MANIFEST = DEFAULT_COLLECTION['manifest_text']
+ DEFAULT_UUID = DEFAULT_COLLECTION['uuid']
+
+ def _mock_api_call(self, mock_method, code, body):
+ mock_method = mock_method().execute
+ if code == 200:
+ mock_method.return_value = body
+ else:
+ mock_method.side_effect = arvados.errors.ApiError(
+ tutil.fake_httplib2_response(code), "{}")
+
+ def mock_keep_services(self, api_mock, code, body):
+ self._mock_api_call(api_mock.keep_services().accessible, code, body)
+
+ def api_client_mock(self, code=200):
+ client = mock.MagicMock(name='api_client')
+ self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+ return client
+
+
+ at tutil.skip_sleep
+class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_get_collection(self, api_mock, code, body):
+ body = self.API_COLLECTIONS.get(body)
+ self._mock_api_call(api_mock.collections().get, code, body)
+
+ def api_client_mock(self, code=200):
+ client = super(CollectionReaderTestCase, self).api_client_mock(code)
+ self.mock_get_collection(client, code, 'foo_file')
+ return client
+
+ def test_init_no_default_retries(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ reader.manifest_text()
+ client.collections().get().execute.assert_called_with(num_retries=0)
+
+ def test_uuid_init_success(self):
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ self.assertEqual(self.DEFAULT_COLLECTION['manifest_text'],
+ reader.manifest_text())
+ client.collections().get().execute.assert_called_with(num_retries=3)
+
+ def test_uuid_init_failure_raises_api_error(self):
+ client = self.api_client_mock(500)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+ with self.assertRaises(arvados.errors.ApiError):
+ reader.manifest_text()
+
+ def test_locator_init(self):
+ client = self.api_client_mock(200)
+ # Ensure Keep will not return anything if asked.
+ with tutil.mock_responses(None, 404):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_locator_init_falls_back_to_keep(self):
+ # Reading manifests from Keep is deprecated. Feel free to
+ # remove this test when we remove the fallback.
+ client = self.api_client_mock(200)
+ self.mock_get_collection(client, 404, None)
+ with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+ reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+ api_client=client, num_retries=3)
+ self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+ def test_init_num_retries_propagated(self):
+ # More of an integration test...
+ client = self.api_client_mock(200)
+ reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+ num_retries=3)
+ with tutil.mock_responses('foo', 500, 500, 200):
+ self.assertEqual('foo',
+ ''.join(f.read(9) for f in reader.all_files()))
+
+
+ at tutil.skip_sleep
+class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
+ def mock_keep(self, body, *codes, **headers):
+ headers.setdefault('x-keep-replicas-stored', 2)
+ return tutil.mock_responses(body, *codes, **headers)
+
+ def foo_writer(self, **kwargs):
+ api_client = self.api_client_mock()
+ writer = arvados.CollectionWriter(api_client, **kwargs)
+ writer.start_new_file('foo')
+ writer.write('foo')
+ return writer
+
+ def test_write_whole_collection(self):
+ writer = self.foo_writer()
+ with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_write_no_default(self):
+ writer = self.foo_writer()
+ with self.mock_keep(None, 500):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ writer.finish()
+
+ def test_write_whole_collection_through_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ with self.mock_keep(self.DEFAULT_DATA_HASH,
+ 500, 500, 200, 500, 500, 200):
+ self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+ def test_flush_data_retries(self):
+ writer = self.foo_writer(num_retries=2)
+ foo_hash = self.DEFAULT_MANIFEST.split()[1]
+ with self.mock_keep(foo_hash, 500, 200):
+ writer.flush_data()
+ self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index f49b947..d5523e8 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -31,14 +31,15 @@ class SafeApi(object):
def __init__(self, config):
self.host = config.get('ARVADOS_API_HOST')
- self.token = config.get('ARVADOS_API_TOKEN')
+ self.api_token = config.get('ARVADOS_API_TOKEN')
self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
self.local = threading.local()
self.block_cache = arvados.KeepBlockCache()
def localapi(self):
if 'api' not in self.local.__dict__:
- self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
+ self.local.api = arvados.api('v1', False, self.host,
+ self.api_token, self.insecure)
return self.local.api
def localkeep(self):
@@ -46,17 +47,13 @@ class SafeApi(object):
self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
return self.local.keep
- def collections(self):
- return self.localapi().collections()
-
- def links(self):
- return self.localapi().links()
-
- def groups(self):
- return self.localapi().groups()
+ def __getattr__(self, name):
+ # Proxy nonexistent attributes to the local API client.
+ try:
+ return getattr(self.localapi(), name)
+ except AttributeError:
+ return super(SafeApi, self).__getattr__(name)
- def users(self):
- return self.localapi().users()
def convertTime(t):
'''Parse Arvados timestamp to unix time.'''
commit 3b5c8cf8a3cf5ab3ccd33566c3c96c415bd0d3d6
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 09:28:47 2014 -0400
3147: Add retry support to PySDK StreamReader classes.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 9522976..323251d 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -707,7 +707,10 @@ class KeepClient(object):
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, copies, thread_limiter.done()))
- def local_store_put(self, data):
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -716,7 +719,7 @@ class KeepClient(object):
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s):
+ def local_store_get(self, loc_s, num_retries=0):
try:
locator = KeepLocator(loc_s)
except ValueError:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index b98937f..04b6b81 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -18,6 +18,7 @@ import time
import threading
import collections
+from arvados.retry import retry_method
from keep import *
import config
import errors
@@ -108,6 +109,7 @@ class StreamFileReader(object):
self.segments = segments
self._name = name
self._filepos = 0L
+ self.num_retries = stream.num_retries
def name(self):
return self._name
@@ -128,7 +130,8 @@ class StreamFileReader(object):
n = self.segments[-1]
return n[OFFSET] + n[BLOCKSIZE]
- def read(self, size):
+ @retry_method
+ def read(self, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at the current file position"""
if size == 0:
return ''
@@ -137,52 +140,58 @@ class StreamFileReader(object):
available_chunks = locators_and_ranges(self.segments, self._filepos, size)
if available_chunks:
locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
- data = self._stream.readfrom(locator+segmentoffset, segmentsize)
+ data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries)
self._filepos += len(data)
return data
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
- data = ''
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
- data += self._stream.readfrom(locator+segmentoffset, segmentsize)
- return data
+ data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+ num_retries=num_retries))
+ return ''.join(data)
- def readall(self, size=2**20):
+ @retry_method
+ def readall(self, size=2**20, num_retries=None):
while True:
- data = self.read(size)
+ data = self.read(size, num_retries=num_retries)
if data == '':
break
yield data
- def decompress(self, decompress, size):
- for segment in self.readall(size):
+ @retry_method
+ def decompress(self, decompress, size, num_retries=None):
+ for segment in self.readall(size, num_retries):
data = decompress(segment)
if data and data != '':
yield data
- def readall_decompressed(self, size=2**20):
+ @retry_method
+ def readall_decompressed(self, size=2**20, num_retries=None):
self.seek(0)
if re.search('\.bz2$', self._name):
dc = bz2.BZ2Decompressor()
- return self.decompress(lambda segment: dc.decompress(segment), size)
+ return self.decompress(dc.decompress, size,
+ num_retries=num_retries)
elif re.search('\.gz$', self._name):
dc = zlib.decompressobj(16+zlib.MAX_WBITS)
- return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+ return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+ size, num_retries=num_retries)
else:
- return self.readall(size)
+ return self.readall(size, num_retries=num_retries)
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- datasource = self.readall()
+ @retry_method
+ def readlines(self, decompress=True, num_retries=None):
+ read_func = self.readall_decompressed if decompress else self.readall
data = ''
- for newdata in datasource:
+ for newdata in read_func(num_retries=num_retries):
data += newdata
sol = 0
while True:
@@ -201,12 +210,15 @@ class StreamFileReader(object):
manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
+
class StreamReader(object):
- def __init__(self, tokens, keep=None, debug=False, _empty=False):
+ def __init__(self, tokens, keep=None, debug=False, _empty=False,
+ num_retries=0):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
self._keep = keep
+ self.num_retries = num_retries
streamoffset = 0L
@@ -254,16 +266,17 @@ class StreamReader(object):
def locators_and_ranges(self, range_start, range_size):
return locators_and_ranges(self._data_locators, range_start, range_size)
- def readfrom(self, start, size):
+ @retry_method
+ def readfrom(self, start, size, num_retries=None):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
if self._keep is None:
- self._keep = KeepClient()
- data = ''
+ self._keep = KeepClient(num_retries=self.num_retries)
+ data = []
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
- data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
- return data
+ data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+ return ''.join(data)
def manifest_text(self, strip=False):
manifest_text = [self.name().replace(' ', '\\040')]
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 284854b..f066c5d 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -349,8 +349,9 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
class MockStreamReader(object):
def __init__(self, content):
self.content = content
+ self.num_retries = 0
- def readfrom(self, start, size):
+ def readfrom(self, start, size, num_retries=0):
return self.content[start:start+size]
def test_file_stream(self):
@@ -422,7 +423,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator):
+ def get(self, locator, num_retries=0):
return self.content[locator]
def test_stream_reader(self):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 060a26a..3970d67 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,18 +1,82 @@
#!/usr/bin/env python
+import mock
import unittest
import arvados
from arvados import StreamReader, StreamFileReader
+import arvados_testutil as tutil
import run_test_server
-class StreamReaderTestCase(unittest.TestCase):
+class StreamRetryTestMixin(object):
+ # Define reader_for(coll_name, **kwargs)
+ # and read_for_test(reader, size, **kwargs).
API_COLLECTIONS = run_test_server.fixture('collections')
+ def keep_client(self):
+ return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
+ local_store='')
+
def manifest_for(self, coll_name):
return self.API_COLLECTIONS[coll_name]['manifest_text']
+ @tutil.skip_sleep
+ def test_success_without_retries(self):
+ reader = self.reader_for('bar_file')
+ with tutil.mock_responses('bar', 200):
+ self.assertEqual('bar', self.read_for_test(reader, 3))
+
+ @tutil.skip_sleep
+ def test_read_no_default_retry(self):
+ reader = self.reader_for('user_agreement')
+ with tutil.mock_responses('', 500):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 10)
+
+ @tutil.skip_sleep
+ def test_read_with_instance_retries(self):
+ reader = self.reader_for('foo_file', num_retries=3)
+ with tutil.mock_responses('foo', 500, 200):
+ self.assertEqual('foo', self.read_for_test(reader, 3))
+
+ @tutil.skip_sleep
+ def test_read_with_method_retries(self):
+ reader = self.reader_for('foo_file')
+ with tutil.mock_responses('foo', 500, 200):
+ self.assertEqual('foo',
+ self.read_for_test(reader, 3, num_retries=3))
+
+ @tutil.skip_sleep
+ def test_read_instance_retries_exhausted(self):
+ reader = self.reader_for('bar_file', num_retries=3)
+ with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 3)
+
+ @tutil.skip_sleep
+ def test_read_method_retries_exhausted(self):
+ reader = self.reader_for('bar_file')
+ with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 3, num_retries=3)
+
+ @tutil.skip_sleep
+ def test_method_retries_take_precedence(self):
+ reader = self.reader_for('user_agreement', num_retries=10)
+ with tutil.mock_responses('', 500, 500, 500, 200):
+ with self.assertRaises(arvados.errors.KeepReadError):
+ self.read_for_test(reader, 10, num_retries=1)
+
+
+class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+ def reader_for(self, coll_name, **kwargs):
+ return StreamReader(self.manifest_for(coll_name).split(),
+ self.keep_client(), **kwargs)
+
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.readfrom(0, byte_count, **kwargs)
+
def test_manifest_text_without_keep_client(self):
mtext = self.manifest_for('multilevel_collection_1')
for line in mtext.rstrip('\n').split('\n'):
@@ -20,5 +84,34 @@ class StreamReaderTestCase(unittest.TestCase):
self.assertEqual(line + '\n', reader.manifest_text())
+class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+ def reader_for(self, coll_name, **kwargs):
+ return StreamReader(self.manifest_for(coll_name).split(),
+ self.keep_client(), **kwargs).all_files()[0]
+
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.read(byte_count, **kwargs)
+
+
+class StreamFileReadFromTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return reader.readfrom(0, byte_count, **kwargs)
+
+
+class StreamFileReadAllTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readall(**kwargs))
+
+
+class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readall_decompressed(**kwargs))
+
+
+class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+ def read_for_test(self, reader, byte_count, **kwargs):
+ return ''.join(reader.readlines(**kwargs))
+
+
if __name__ == '__main__':
unittest.main()
commit ceceb839425dac060ab9baad66a5dba4af78e5e8
Author: Brett Smith <brett at curoverse.com>
Date: Tue Sep 16 17:24:31 2014 -0400
3147: Make PySDK KeepClient.get and put retry_methods.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 64d82ba..9522976 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -392,7 +392,8 @@ class KeepClient(object):
def __init__(self, api_client=None, proxy=None, timeout=300,
- api_token=None, local_store=None, block_cache=None):
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
"""Initialize a new KeepClient.
Arguments:
@@ -416,6 +417,9 @@ class KeepClient(object):
environment variable. If you want to ensure KeepClient does not
use local storage, pass in an empty string. This is primarily
intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
"""
self.lock = threading.Lock()
if proxy is None:
@@ -436,7 +440,7 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.timeout = timeout
-
+ self.num_retries = num_retries
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -558,7 +562,8 @@ class KeepClient(object):
else:
return None
- def get(self, loc_s, num_retries=0):
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
@@ -575,7 +580,8 @@ class KeepClient(object):
*each* Keep server if it returns temporary failures, with
exponential backoff. Note that, in each loop, the method may try
to fetch data from every available Keep service, along with any
- that are named in location hints in the locator. Default 0.
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
"""
if ',' in loc_s:
return ''.join(self.get(x) for x in loc_s.split(','))
@@ -638,7 +644,8 @@ class KeepClient(object):
else:
raise arvados.errors.KeepReadError(loc_s)
- def put(self, data, copies=2, num_retries=0):
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
"""Save data in Keep.
This method will get a list of Keep services from the API server, and
@@ -653,7 +660,8 @@ class KeepClient(object):
Default 2.
* num_retries: The number of times to retry PUT requests to
*each* Keep server if it returns temporary failures, with
- exponential backoff. Default 0.
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
"""
data_hash = hashlib.md5(data).hexdigest()
if copies < 1:
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f6abe2b..284854b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -419,7 +419,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
class MockKeep(object):
- def __init__(self, content):
+ def __init__(self, content, num_retries=0):
self.content = content
def get(self, locator):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9a01d86..8cc22a6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -236,8 +236,13 @@ class KeepClientRetryTestMixin(object):
TEST_DATA = 'testdata'
TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
- def new_client(self):
- return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+ def setUp(self):
+ self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
+
+ def new_client(self, **caller_kwargs):
+ kwargs = self.client_kwargs.copy()
+ kwargs.update(caller_kwargs)
+ return arvados.KeepClient(**kwargs)
def run_method(self, *args, **kwargs):
raise NotImplementedError("test subclasses must define run_method")
@@ -272,6 +277,11 @@ class KeepClientRetryTestMixin(object):
with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
self.check_exception(num_retries=1)
+ def test_num_retries_instance_fallback(self):
+ self.client_kwargs['num_retries'] = 3
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ self.check_success()
+
@tutil.skip_sleep
class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
commit 17dece4b2026cad47ba526cbad8dfce81f5fdebc
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 08:49:21 2014 -0400
3147: Add retry_method to the Python SDK.
This gives us a way to make retry support flexible and consistent
across SDK classes. Any class that has retryable operations should
take a num_retries argument at initialization. Then, the specific
methods that implement those operations should also take a num_retries
argument, which will fall back to the instance's setting by default.
This lets SDK users express their retry preferences wherever it's most
convenient for them.
diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 5dc31ae..3d2fc48 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -1,5 +1,7 @@
#!/usr/bin/env python
+import functools
+import inspect
import time
from collections import deque
@@ -138,3 +140,19 @@ def check_http_response_success(result):
return False
else:
return None # Get well soon, server.
+
+def retry_method(orig_func):
+ """Provide a default value for a method's num_retries argument.
+
+ This is a decorator for instance and class methods that accept a
+ num_retries argument, with a None default. When the method is called
+ without a value for num_retries, it will be set from the underlying
+ instance or class' num_retries attribute.
+ """
+ @functools.wraps(orig_func)
+ def num_retries_setter(self, *args, **kwargs):
+ arg_vals = inspect.getcallargs(orig_func, self, *args, **kwargs)
+ if arg_vals['num_retries'] is None:
+ kwargs['num_retries'] = self.num_retries
+ return orig_func(self, *args, **kwargs)
+ return num_retries_setter
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index ed0a406..8c3916b 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -194,5 +194,32 @@ class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
self.check_is(None, 0, 99, 600, -200)
+class RetryMethodTestCase(unittest.TestCase):
+ class Tester(object):
+ def __init__(self):
+ self.num_retries = 1
+
+ @arv_retry.retry_method
+ def check(self, a, num_retries=None, z=0):
+ return (a, num_retries, z)
+
+
+ def test_positional_arg_passed(self):
+ self.assertEqual((3, 2, 0), self.Tester().check(3, 2))
+
+ def test_keyword_arg_passed(self):
+ self.assertEqual((4, 3, 0), self.Tester().check(num_retries=3, a=4))
+
+ def test_not_specified(self):
+ self.assertEqual((0, 1, 0), self.Tester().check(0))
+
+ def test_not_specified_with_other_kwargs(self):
+ self.assertEqual((1, 1, 1), self.Tester().check(1, z=1))
+
+ def test_bad_call(self):
+ with self.assertRaises(TypeError):
+ self.Tester().check(num_retries=2)
+
+
if __name__ == '__main__':
unittest.main()
commit 4c1de84b27923acf65b815dcf649d73a074f3645
Author: Brett Smith <brett at curoverse.com>
Date: Thu Sep 11 14:46:37 2014 -0400
3147: Fix variable name typo.
This variable was renamed in previous branch work, but this last user
of it wasn't caught until just now.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 22bf327..64d82ba 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -316,7 +316,8 @@ class KeepClient(object):
resp_md5 = hashlib.md5(content).hexdigest()
if resp_md5 == locator.md5sum:
return content
- _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
return None
def put(self, http, hash_s, body):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 4ed2912..9a01d86 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,6 +312,12 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
+ def test_retry_data_with_wrong_checksum(self):
+ side_effects = ((tutil.fake_httplib2_response(200), s)
+ for s in ['baddata', self.TEST_DATA])
+ with mock.patch('httplib2.Http.request', side_effect=side_effects):
+ self.check_success(locator=self.HINTED_LOCATOR)
+
@tutil.skip_sleep
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
commit f749765cb8701c86c123a657540675d332c220df
Author: Brett Smith <brett at curoverse.com>
Date: Thu Sep 11 14:22:12 2014 -0400
3147: PySDK StreamReader instantiates a KeepClient late.
Similarly with API client instantiation in KeepClient, this helps with
testing. Refs #3693.
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index e13e1a6..b98937f 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -206,9 +206,6 @@ class StreamReader(object):
self._stream_name = None
self._data_locators = []
self._files = collections.OrderedDict()
-
- if keep is None:
- keep = KeepClient()
self._keep = keep
streamoffset = 0L
@@ -261,6 +258,8 @@ class StreamReader(object):
"""Read up to 'size' bytes from the stream, starting at 'start'"""
if size == 0:
return ''
+ if self._keep is None:
+ self._keep = KeepClient()
data = ''
for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
new file mode 100644
index 0000000..060a26a
--- /dev/null
+++ b/sdk/python/tests/test_stream.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvados
+from arvados import StreamReader, StreamFileReader
+
+import run_test_server
+
+class StreamReaderTestCase(unittest.TestCase):
+ API_COLLECTIONS = run_test_server.fixture('collections')
+
+ def manifest_for(self, coll_name):
+ return self.API_COLLECTIONS[coll_name]['manifest_text']
+
+ def test_manifest_text_without_keep_client(self):
+ mtext = self.manifest_for('multilevel_collection_1')
+ for line in mtext.rstrip('\n').split('\n'):
+ reader = StreamReader(line.split())
+ self.assertEqual(line + '\n', reader.manifest_text())
+
+
+if __name__ == '__main__':
+ unittest.main()
commit 4d0665892191045a2d03f68b2364bb87a278b6ac
Author: Brett Smith <brett at curoverse.com>
Date: Fri Sep 12 16:46:54 2014 -0400
3147: Move PySDK HTTP test infrastructure to testutil.
Making this available to other test collections.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 77146db..48fb013 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -3,16 +3,27 @@
import errno
import httplib
import httplib2
+import mock
import os
import shutil
import tempfile
import unittest
+# Use this hostname when you want to make sure the traffic will be
+# instantly refused. 100::/64 is a dedicated black hole.
+TEST_HOST = '100::'
+
+skip_sleep = mock.patch('time.sleep', lambda n: None) # clown'll eat me
+
def fake_httplib2_response(code, **headers):
headers.update(status=str(code),
reason=httplib.responses.get(code, "Unknown Response"))
return httplib2.Response(headers)
+def mock_responses(body, *codes):
+ return mock.patch('httplib2.Http.request', side_effect=(
+ (fake_httplib2_response(code), body) for code in codes))
+
class ArvadosBaseTestCase(unittest.TestCase):
# This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 900143e..4ed2912 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -5,8 +5,8 @@ import unittest
import arvados
import arvados.retry
+import arvados_testutil as tutil
import run_test_server
-from arvados_testutil import fake_httplib2_response
class KeepTestCase(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
@@ -232,15 +232,10 @@ class KeepClientRetryTestMixin(object):
# supporting servers, and prevents side effects in case something hiccups.
# To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
# run_method().
- PROXY_ADDR = 'http://[100::]/'
+ PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
TEST_DATA = 'testdata'
TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
- @staticmethod
- def mock_responses(body, *codes):
- return mock.patch('httplib2.Http.request', side_effect=(
- (fake_httplib2_response(code), body) for code in codes))
-
def new_client(self):
return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
@@ -258,30 +253,28 @@ class KeepClientRetryTestMixin(object):
self.assertRaises(error_class, self.run_method, *args, **kwargs)
def test_immediate_success(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
self.check_success()
def test_retry_then_success(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
self.check_success(num_retries=3)
def test_no_default_retry(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
self.check_exception()
def test_no_retry_after_permanent_error(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 403, 200):
self.check_exception(num_retries=3)
def test_error_after_retries_exhausted(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
self.check_exception(num_retries=1)
-# Don't delay from HTTPRetryLoop's exponential backoff.
-no_backoff = mock.patch('time.sleep', lambda n: None)
- at no_backoff
-class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
DEFAULT_EXCEPTION = arvados.errors.KeepReadError
HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
@@ -291,7 +284,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
return self.new_client().get(locator, *args, **kwargs)
def test_specific_exception_when_not_found(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200):
self.check_exception(arvados.errors.NotFoundError, num_retries=3)
def test_general_exception_with_mixed_errors(self):
@@ -300,7 +293,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
# This test rigs up 50/50 disagreement between two servers, and
# checks that it does not become a NotFoundError.
client = self.new_client()
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 500):
with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
client.get(self.HINTED_LOCATOR)
self.assertNotIsInstance(
@@ -308,19 +301,20 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
"mixed errors raised NotFoundError")
def test_hint_server_can_succeed_without_retries(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- side_effects = [socket.timeout("timed out"),
- (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+ side_effects = [
+ socket.timeout("timed out"),
+ (tutil.fake_httplib2_response(200), self.DEFAULT_EXPECT)]
with mock.patch('httplib2.Http.request',
side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
- at no_backoff
-class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
@@ -329,5 +323,5 @@ class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
return self.new_client().put(data, copies, *args, **kwargs)
def test_do_not_send_multiple_copies_to_same_server(self):
- with self.mock_responses(self.DEFAULT_EXPECT, 200):
+ with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
self.check_exception(copies=2, num_retries=3)
commit cd051b592013c18445d837aed1c8dc3354c1cf82
Author: Brett Smith <brett at curoverse.com>
Date: Wed Sep 10 16:44:25 2014 -0400
3147: PySDK tests use mock>=1.0 and easier mock side_effect.
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 30cc779..e0afe89 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
'ws4py'
],
test_suite='tests',
- tests_require=['mock', 'PyYAML'],
+ tests_require=['mock>=1.0', 'PyYAML'],
zip_safe=False)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 04bcab8..900143e 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,15 +312,10 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
self.check_success(locator=self.HINTED_LOCATOR)
def test_try_next_server_after_timeout(self):
- responses = iter([None, (fake_httplib2_response(200),
- self.DEFAULT_EXPECT)])
- def side_effect(*args, **kwargs):
- response = next(responses)
- if response is None:
- raise socket.timeout("timed out")
- else:
- return response
- with mock.patch('httplib2.Http.request', side_effect=side_effect):
+ side_effects = [socket.timeout("timed out"),
+ (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+ with mock.patch('httplib2.Http.request',
+ side_effect=iter(side_effects)):
self.check_success(locator=self.HINTED_LOCATOR)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list