[ARVADOS] created: c19503ef39f3588e3fe2113816e84c0e8dce4074

git at public.curoverse.com git at public.curoverse.com
Tue Sep 16 18:04:24 EDT 2014


        at  c19503ef39f3588e3fe2113816e84c0e8dce4074 (commit)


commit c19503ef39f3588e3fe2113816e84c0e8dce4074
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Sep 15 18:13:40 2014 -0400

    3147: crunch-job retries data uploads more for longer-running jobs.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 83931f2..56a3637 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1206,7 +1206,8 @@ sub collate_output
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'arv-put', '--raw');
+  my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
+                  '--retries', put_retry_count());
   my $joboutput;
   for (@jobstep)
   {
@@ -1346,8 +1347,9 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
 
   $local_logfile->flush;
-  my $cmd = "arv-put --portable-data-hash --filename ''\Q$keep_logfile\E "
-      . quotemeta($local_logfile->filename);
+  my $retry_count = put_retry_count();
+  my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
+      "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
   my $loglocator = `$cmd`;
   die "system $cmd failed: $?" if $?;
   chomp($loglocator);
@@ -1493,6 +1495,20 @@ sub find_docker_image {
   }
 }
 
+sub put_retry_count {
+  # Calculate a --retries argument for arv-put that will have it try
+  # approximately as long as this Job has been running.
+  my $stoptime = shift || time;
+  my $starttime = $jobstep[0]->{starttime};
+  my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
+  my $retries = 0;
+  while ($timediff >= 2) {
+    $retries++;
+    $timediff /= 2;
+  }
+  return ($retries > 3) ? $retries : 3;
+}
+
 __DATA__
 #!/usr/bin/perl
 

commit df8ac6c19c7c1d0ce4f988fec4969df5e5f4909d
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Sep 16 17:29:38 2014 -0400

    3147: Add retry support to FUSE driver.

diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index d5523e8..e40b88c 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -289,10 +289,11 @@ class Directory(FreshBase):
 class CollectionDirectory(Directory):
     '''Represents the root of a directory tree holding a collection.'''
 
-    def __init__(self, parent_inode, inodes, api, collection):
+    def __init__(self, parent_inode, inodes, api, num_retries, collection):
         super(CollectionDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.collection_object_file = None
         self.collection_object = None
         if isinstance(collection, dict):
@@ -310,7 +311,9 @@ class CollectionDirectory(Directory):
             self.collection_object_file.update(self.collection_object)
 
         self.clear()
-        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
+        collection = arvados.CollectionReader(
+            self.collection_object["manifest_text"], self.api,
+            self.api.localkeep(), num_retries=self.num_retries)
         for s in collection.all_streams():
             cwd = self
             for part in s.name().split('/'):
@@ -328,7 +331,9 @@ class CollectionDirectory(Directory):
                 return True
 
             with llfuse.lock_released:
-                new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
+                new_collection_object = self.api.collections().get(
+                    uuid=self.collection_locator
+                    ).execute(num_retries=self.num_retries)
                 if "portable_data_hash" not in new_collection_object:
                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
             # end with llfuse.lock_released, re-acquire lock
@@ -386,10 +391,11 @@ class MagicDirectory(Directory):
     to readdir().
     '''
 
-    def __init__(self, parent_inode, inodes, api):
+    def __init__(self, parent_inode, inodes, api, num_retries):
         super(MagicDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         # Have to defer creating readme_file because at this point we don't
         # yet have an inode assigned.
         self.readme_file = None
@@ -418,7 +424,8 @@ will appear if it exists.
             return False
 
         try:
-            e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
+            e = self.inodes.add_entry(CollectionDirectory(
+                    self.inode, self.inodes, self.api, self.num_retries, k))
             if e.update():
                 self._entries[k] = e
                 return True
@@ -457,21 +464,25 @@ class RecursiveInvalidateDirectory(Directory):
 class TagsDirectory(RecursiveInvalidateDirectory):
     '''A special directory that contains as subdirectories all tags visible to the user.'''
 
-    def __init__(self, parent_inode, inodes, api, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
         super(TagsDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
+            tags = self.api.links().list(
+                filters=[['link_class', '=', 'tag']],
+                select=['name'], distinct=True
+                ).execute(num_retries=self.num_retries)
         if "items" in tags:
             self.merge(tags['items'],
                        lambda i: i['name'] if 'name' in i else i['uuid'],
                        lambda a, i: a.tag == i,
-                       lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
+                       lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 
 class TagDirectory(Directory):
@@ -479,45 +490,51 @@ class TagDirectory(Directory):
     to the user that are tagged with a particular tag.
     '''
 
-    def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, tag,
+                 poll=False, poll_time=60):
         super(TagDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.tag = tag
         self._poll = poll
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
-                                                   ['name', '=', self.tag],
-                                                   ['head_uuid', 'is_a', 'arvados#collection']],
-                                          select=['head_uuid']).execute()
+            taggedcollections = self.api.links().list(
+                filters=[['link_class', '=', 'tag'],
+                         ['name', '=', self.tag],
+                         ['head_uuid', 'is_a', 'arvados#collection']],
+                select=['head_uuid']
+                ).execute(num_retries=self.num_retries)
         self.merge(taggedcollections['items'],
                    lambda i: i['head_uuid'],
                    lambda a, i: a.collection_locator == i['head_uuid'],
-                   lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
+                   lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
 
 
 class ProjectDirectory(Directory):
     '''A special directory that contains the contents of a project.'''
 
-    def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+                 poll=False, poll_time=60):
         super(ProjectDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
         self.uuid = project_object['uuid']
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
-            return CollectionDirectory(self.inode, self.inodes, self.api, i)
+            return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
         elif group_uuid_pattern.match(i['uuid']):
-            return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+            return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
         elif link_uuid_pattern.match(i['uuid']):
             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
-                return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
+                return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
             else:
                 return None
         elif uuid_pattern.match(i['uuid']):
@@ -557,13 +574,19 @@ class ProjectDirectory(Directory):
 
         with llfuse.lock_released:
             if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(uuid=self.uuid).execute()
+                self.project_object = self.api.groups().get(
+                    uuid=self.uuid).execute(num_retries=self.num_retries)
             elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(uuid=self.uuid).execute()
+                self.project_object = self.api.users().get(
+                    uuid=self.uuid).execute(num_retries=self.num_retries)
 
-            contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
+            contents = arvados.util.list_all(self.api.groups().contents,
+                                             self.num_retries, uuid=self.uuid)
             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
+            contents += arvados.util.list_all(
+                self.api.links().list, self.num_retries,
+                filters=[['tail_uuid', '=', self.uuid],
+                         ['link_class', '=', 'name']])
 
         # end with llfuse.lock_released, re-acquire lock
 
@@ -589,17 +612,21 @@ class ProjectDirectory(Directory):
 class SharedDirectory(Directory):
     '''A special directory that represents users or groups who have shared projects with me.'''
 
-    def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
+    def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+                 poll=False, poll_time=60):
         super(SharedDirectory, self).__init__(parent_inode)
-        self.current_user = api.users().current().execute()
         self.inodes = inodes
         self.api = api
+        self.num_retries = num_retries
+        self.current_user = api.users().current().execute(num_retries=num_retries)
         self._poll = True
         self._poll_time = poll_time
 
     def update(self):
         with llfuse.lock_released:
-            all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
+            all_projects = arvados.util.list_all(
+                self.api.groups().list, self.num_retries,
+                filters=[['group_class','=','project']])
             objects = {}
             for ob in all_projects:
                 objects[ob['uuid']] = ob
@@ -611,8 +638,12 @@ class SharedDirectory(Directory):
                     roots.append(ob)
                     root_owners[ob['owner_uuid']] = True
 
-            lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
-            lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
+            lusers = arvados.util.list_all(
+                self.api.users().list, self.num_retries,
+                filters=[['uuid','in', list(root_owners)]])
+            lgroups = arvados.util.list_all(
+                self.api.groups().list, self.num_retries,
+                filters=[['uuid','in', list(root_owners)]])
 
             users = {}
             groups = {}
@@ -641,7 +672,7 @@ class SharedDirectory(Directory):
             self.merge(contents.items(),
                        lambda i: i[0],
                        lambda a, i: a.uuid == i[1]['uuid'],
-                       lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
+                       lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception as e:
             _logger.exception(e)
 
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index e07860d..e92b1b4 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -9,6 +9,7 @@ import signal
 import subprocess
 import time
 
+import arvados.commands._util as arv_cmd
 from arvados_fuse import *
 
 logger = logging.getLogger('arvados.arv-mount')
@@ -16,6 +17,7 @@ logger = logging.getLogger('arvados.arv-mount')
 if __name__ == '__main__':
     # Handle command line parameters
     parser = argparse.ArgumentParser(
+        parents=[arv_cmd.retry_opt],
         description='''Mount Keep data under the local filesystem.  Default mode is --home''',
         epilog="""
 Note: When using the --exec feature, you must either specify the
@@ -80,29 +82,42 @@ with "--".
         operations = Operations(os.getuid(), os.getgid())
         api = SafeApi(arvados.config)
 
-        usr = api.users().current().execute()
+        usr = api.users().current().execute(num_retries=args.retries)
         now = time.time()
+        dir_class = None
+        dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
         if args.by_id:
             # Set up the request handler with the 'magic directory' at the root
-            operations.inodes.add_entry(MagicDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+            dir_class = MagicDirectory
         elif args.by_tag:
-            operations.inodes.add_entry(TagsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+            dir_class = TagsDirectory
         elif args.shared:
-            operations.inodes.add_entry(SharedDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
+            dir_class = SharedDirectory
+            dir_args.append(usr)
         elif args.home:
-            operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, usr))
-        elif args.collection != None:
+            dir_class = ProjectDirectory
+            dir_args.append(usr)
+        elif args.collection is not None:
             # Set up the request handler with the collection at the root
-            operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
-        elif args.project != None:
-            operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
+            dir_class = CollectionDirectory
+            dir_args.append(args.collection)
+        elif args.project is not None:
+            dir_class = ProjectDirectory
+            dir_args.append(api.groups().get(uuid=args.project).execute(
+                    num_retries=args.retries))
+
+        if dir_class is not None:
+            operations.inodes.add_entry(dir_class(*dir_args))
         else:
             e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+            dir_args[0] = e.inode
 
-            e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(e.inode, operations.inodes, api, usr))
-            e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(e.inode, operations.inodes, api, usr))
-            e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(e.inode, operations.inodes, api))
-            e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
+            e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
+            e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
+
+            dir_args.append(usr)
+            e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(*dir_args))
+            e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(*dir_args))
 
             text = '''
 Welcome to Arvados!  This directory provides file system access to files and objects
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index 55f565b..789bfe3 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -73,7 +73,7 @@ class FuseMountTest(MountTestBase):
     def runTest(self):
         # Create the request handler
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.testcollection))
+        e = operations.inodes.add_entry(fuse.CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.testcollection))
 
         llfuse.init(operations, self.mounttmp, [])
         t = threading.Thread(None, lambda: llfuse.main())
@@ -128,7 +128,7 @@ class FuseMagicTest(MountTestBase):
     def runTest(self):
         # Create the request handler
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+        e = operations.inodes.add_entry(fuse.MagicDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
 
         self.mounttmp = tempfile.mkdtemp()
 
@@ -163,7 +163,7 @@ class FuseMagicTest(MountTestBase):
 class FuseTagsTest(MountTestBase):
     def runTest(self):
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api))
+        e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0))
 
         llfuse.init(operations, self.mounttmp, [])
         t = threading.Thread(None, lambda: llfuse.main())
@@ -188,7 +188,7 @@ class FuseTagsTest(MountTestBase):
 class FuseTagsUpdateTest(MountTestBase):
     def runRealTest(self):
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, poll_time=1))
+        e = operations.inodes.add_entry(fuse.TagsDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, poll_time=1))
 
         llfuse.init(operations, self.mounttmp, [])
         t = threading.Thread(None, lambda: llfuse.main())
@@ -241,7 +241,7 @@ class FuseTagsUpdateTest(MountTestBase):
 class FuseSharedTest(MountTestBase):
     def runTest(self):
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()['uuid']))
+        e = operations.inodes.add_entry(fuse.SharedDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()['uuid']))
 
         llfuse.init(operations, self.mounttmp, [])
         t = threading.Thread(None, lambda: llfuse.main())
@@ -280,7 +280,7 @@ class FuseSharedTest(MountTestBase):
 class FuseHomeTest(MountTestBase):
     def runTest(self):
         operations = fuse.Operations(os.getuid(), os.getgid())
-        e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, self.api.users().current().execute()))
+        e = operations.inodes.add_entry(fuse.ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, self.api, 0, self.api.users().current().execute()))
 
         llfuse.init(operations, self.mounttmp, [])
         t = threading.Thread(None, lambda: llfuse.main())

commit 2ffac087fc1e34fcc0e580765f724b6a42b9ad34
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Sep 15 10:58:11 2014 -0400

    3147: FUSE driver requires a Python SDK with retry support.
    
    I also took out some of the older log-handling code while I was at it,
    since the stricter versioning makes it unnecessary.

diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index 76d026e..e07860d 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -59,29 +59,20 @@ with "--".
     else:
         daemon_ctx = None
 
-    # Configure a logger based on command-line switches.
-    # If we're using a contemporary Python SDK (mid-August 2014),
-    # configure the arvados hierarchy logger.
-    # Otherwise, configure the program root logger.
-    base_logger = getattr(arvados, 'logger', None)
-
+    # Configure a log handler based on command-line switches.
     if args.logfile:
         log_handler = logging.FileHandler(args.logfile)
     elif daemon_ctx:
         log_handler = logging.NullHandler()
-    elif base_logger:
-        log_handler = arvados.log_handler
     else:
-        log_handler = logging.StreamHandler()
+        log_handler = None
 
-    if base_logger is None:
-        base_logger = logging.getLogger()
-    else:
-        base_logger.removeHandler(arvados.log_handler)
-    base_logger.addHandler(log_handler)
+    if log_handler is not None:
+        arvados.logger.removeHandler(arvados.log_handler)
+        arvados.logger.addHandler(log_handler)
 
     if args.debug:
-        base_logger.setLevel(logging.DEBUG)
+        arvados.logger.setLevel(logging.DEBUG)
         logger.debug("arv-mount debugging enabled")
 
     try:
@@ -103,7 +94,7 @@ with "--".
         elif args.collection != None:
             # Set up the request handler with the collection at the root
             operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, args.collection))
-        elif args.project != None:            
+        elif args.project != None:
             operations.inodes.add_entry(ProjectDirectory(llfuse.ROOT_INODE, operations.inodes, api, api.groups().get(uuid=args.project).execute()))
         else:
             e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
@@ -114,8 +105,8 @@ with "--".
             e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(e.inode, operations.inodes, api))
 
             text = '''
-Welcome to Arvados!  This directory provides file system access to files and objects 
-available on the Arvados installation located at '{}' 
+Welcome to Arvados!  This directory provides file system access to files and objects
+available on the Arvados installation located at '{}'
 using credentials for user '{}'.
 
 From here, the following directories are available:
diff --git a/services/fuse/setup.py b/services/fuse/setup.py
index d9fe797..ba728e6 100644
--- a/services/fuse/setup.py
+++ b/services/fuse/setup.py
@@ -20,7 +20,7 @@ setup(name='arvados_fuse',
         'bin/arv-mount'
         ],
       install_requires=[
-        'arvados-python-client',
+        'arvados-python-client>=0.1.1410902880',  # 2014-09-16
         'llfuse',
         'python-daemon'
         ],

commit 8d3efbe312f8bb5f42cec903da6a882a484a1d0f
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Sep 15 11:21:31 2014 -0400

    3147: Add retry support to PySDK list_all utility.

diff --git a/sdk/python/arvados/util.py b/sdk/python/arvados/util.py
index ada1aec..2609f11 100644
--- a/sdk/python/arvados/util.py
+++ b/sdk/python/arvados/util.py
@@ -338,12 +338,12 @@ def is_hex(s, *length_args):
         good_len = True
     return bool(good_len and HEX_RE.match(s))
 
-def list_all(fn, **kwargs):
+def list_all(fn, num_retries=0, **kwargs):
     items = []
     offset = 0
     items_available = sys.maxint
     while len(items) < items_available:
-        c = fn(offset=offset, **kwargs).execute()
+        c = fn(offset=offset, **kwargs).execute(num_retries=num_retries)
         items += c['items']
         items_available = c['items_available']
         offset = c['offset'] + len(c['items'])

commit f6c14d2b99e646f04a819a4a5df8b87ad55567e9
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Sep 15 10:37:32 2014 -0400

    3147: Add retry support to Python CLI tools.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index a06cad0..aa4973a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -147,8 +147,10 @@ class CollectionReader(CollectionBase):
                 if not util.portable_data_hash_pattern.match(
                       self._manifest_locator):
                     raise
-                _logger.warning("API lookup failed for collection %s (%s: %s)",
-                                self._manifest_locator, type(e), str(e))
+                _logger.warning(
+                    "API server did not return Collection '%s'. " +
+                    "Trying to fetch directly from Keep (deprecated).",
+                    self._manifest_locator)
                 self._manifest_text = self._my_keep().get(
                     self._manifest_locator, num_retries=self.num_retries)
         self._streams = [sline.split()
diff --git a/sdk/python/arvados/commands/_util.py b/sdk/python/arvados/commands/_util.py
index f7cb80d..c42ee7a 100644
--- a/sdk/python/arvados/commands/_util.py
+++ b/sdk/python/arvados/commands/_util.py
@@ -1,8 +1,20 @@
 #!/usr/bin/env python
 
+import argparse
 import errno
 import os
 
+def _pos_int(s):
+    num = int(s)
+    if num < 0:
+        raise ValueError("can't accept negative value: %s" % (num,))
+    return num
+
+retry_opt = argparse.ArgumentParser(add_help=False)
+retry_opt.add_argument('--retries', type=_pos_int, default=3, help="""
+Maximum number of times to retry server requests that encounter temporary
+failures (e.g., server down).  Default 3.""")
+
 def _ignore_error(error):
     return None
 
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index c96e2cb..d0f60bf 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -47,7 +47,7 @@ keepdocker_parser.add_argument(
 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
 arg_parser = argparse.ArgumentParser(
         description="Upload or list Docker images in Arvados",
-        parents=[keepdocker_parser, arv_put.run_opts])
+        parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
 
 class DockerError(Exception):
     pass
@@ -152,9 +152,10 @@ def prep_image_file(filename):
         image_file = open(file_path, 'w+b' if need_save else 'rb')
     return image_file, need_save
 
-def make_link(link_class, link_name, **link_attrs):
+def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
     link_attrs.update({'link_class': link_class, 'name': link_name})
-    return arvados.api('v1').links().create(body=link_attrs).execute()
+    return api_client.links().create(body=link_attrs).execute(
+        num_retries=num_retries)
 
 def ptimestamp(t):
     s = t.split(".")
@@ -162,8 +163,10 @@ def ptimestamp(t):
         t = s[0] + s[1][-1:]
     return datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%SZ")
 
-def list_images_in_arv():
-    existing_links = arvados.api('v1').links().list(filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]).execute()['items']
+def list_images_in_arv(api_client, num_retries):
+    existing_links = api_client.links().list(
+        filters=[['link_class', 'in', ['docker_image_hash', 'docker_image_repo+tag']]]
+        ).execute(num_retries=num_retries)['items']
     images = {}
     for link in existing_links:
         collection_uuid = link["head_uuid"]
@@ -196,9 +199,10 @@ def list_images_in_arv():
 
 def main(arguments=None):
     args = arg_parser.parse_args(arguments)
+    api = arvados.api('v1')
 
     if args.image is None or args.image == 'images':
-        list_images_in_arv()
+        list_images_in_arv(api, args.retries)
         sys.exit(0)
 
     # Pull the image if requested, unless the image is specified as a hash
@@ -222,30 +226,35 @@ def main(arguments=None):
     else:
         collection_name = args.name
 
-    api = arvados.api('v1')
-
     if not args.force:
         # Check if this image is already in Arvados.
 
         # Project where everything should be owned
-        parent_project_uuid = args.project_uuid if args.project_uuid else api.users().current().execute()['uuid']
+        if args.project_uuid:
+            parent_project_uuid = args.project_uuid
+        else:
+            parent_project_uuid = api.users().current().execute(
+                num_retries=args.retries)['uuid']
 
         # Find image hash tags
         existing_links = api.links().list(
             filters=[['link_class', '=', 'docker_image_hash'],
-                     ['name', '=', image_hash]]).execute()['items']
+                     ['name', '=', image_hash]]
+            ).execute(num_retries=args.retries)['items']
         if existing_links:
             # get readable collections
             collections = api.collections().list(
                 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
-                select=["uuid", "owner_uuid", "name", "manifest_text"]).execute()['items']
+                select=["uuid", "owner_uuid", "name", "manifest_text"]
+                ).execute(num_retries=args.retries)['items']
 
             if collections:
                 # check for repo+tag links on these collections
                 existing_repo_tag = (api.links().list(
                     filters=[['link_class', '=', 'docker_image_repo+tag'],
                              ['name', '=', image_repo_tag],
-                             ['head_uuid', 'in', collections]]).execute()['items']) if image_repo_tag else []
+                             ['head_uuid', 'in', collections]]
+                    ).execute(num_retries=args_retries)['items']) if image_repo_tag else []
 
                 # Filter on elements owned by the parent project
                 owned_col = [c for c in collections if c['owner_uuid'] == parent_project_uuid]
@@ -257,21 +266,25 @@ def main(arguments=None):
                     coll_uuid = owned_col[0]['uuid']
                 else:
                     # create new collection owned by the project
-                    coll_uuid = api.collections().create(body={"manifest_text": collections[0]['manifest_text'],
-                                                               "name": collection_name,
-                                                               "owner_uuid": parent_project_uuid},
-                                                         ensure_unique_name=True).execute()['uuid']
+                    coll_uuid = api.collections().create(
+                        body={"manifest_text": collections[0]['manifest_text'],
+                              "name": collection_name,
+                              "owner_uuid": parent_project_uuid},
+                        ensure_unique_name=True
+                        ).execute(num_retries=args.retries)['uuid']
 
                 link_base = {'owner_uuid': parent_project_uuid,
                              'head_uuid':  coll_uuid }
 
                 if not owned_img:
                     # create image link owned by the project
-                    make_link('docker_image_hash', image_hash, **link_base)
+                    make_link(api, args.retries,
+                              'docker_image_hash', image_hash, **link_base)
 
                 if not owned_rep and image_repo_tag:
                     # create repo+tag link owned by the project
-                    make_link('docker_image_repo+tag', image_repo_tag, **link_base)
+                    make_link(api, args.retries, 'docker_image_repo+tag',
+                              image_repo_tag, **link_base)
 
                 print(coll_uuid)
 
@@ -306,10 +319,10 @@ def main(arguments=None):
     if args.project_uuid is not None:
         link_base['owner_uuid'] = args.project_uuid
 
-    make_link('docker_image_hash', image_hash, **link_base)
+    make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
     if image_repo_tag:
-        make_link('docker_image_repo+tag', image_repo_tag,
-                  **link_base)
+        make_link(api, args.retries,
+                  'docker_image_repo+tag', image_repo_tag, **link_base)
 
     # Clean up.
     image_file.close()
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 15551fa..0099487 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -145,7 +145,7 @@ Do not continue interrupted uploads from cached state.
 
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
-    parents=[upload_opts, run_opts])
+    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
 
 def parse_arguments(arguments):
     args = arg_parser.parse_args(arguments)
@@ -246,23 +246,26 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
                    ['bytes_written', '_seen_inputs'])
 
     def __init__(self, cache=None, reporter=None, bytes_expected=None,
-                 api_client=None):
+                 api_client=None, num_retries=0):
         self.bytes_written = 0
         self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(api_client)
+        super(ArvPutCollectionWriter, self).__init__(
+            api_client, num_retries=num_retries)
 
     @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None):
+    def from_cache(cls, cache, reporter=None, bytes_expected=None,
+                   num_retries=0):
         try:
             state = cache.load()
             state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected)
+            writer = cls.from_state(state, cache, reporter, bytes_expected,
+                                    num_retries=num_retries)
         except (TypeError, ValueError,
                 arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected)
+            return cls(cache, reporter, bytes_expected, num_retries=num_retries)
         else:
             return writer
 
@@ -348,18 +351,16 @@ def progress_writer(progress_func, outfile=sys.stderr):
 def exit_signal_handler(sigcode, frame):
     sys.exit(-sigcode)
 
-def desired_project_uuid(api_client, project_uuid):
-    if project_uuid:
-        if arvados.util.user_uuid_pattern.match(project_uuid):
-            api_client.users().get(uuid=project_uuid).execute()
-            return project_uuid
-        elif arvados.util.group_uuid_pattern.match(project_uuid):
-            api_client.groups().get(uuid=project_uuid).execute()
-            return project_uuid
-        else:
-            raise ValueError("Not a valid project uuid: {}".format(project_uuid))
+def desired_project_uuid(api_client, project_uuid, num_retries):
+    if not project_uuid:
+        query = api_client.users().current()
+    elif arvados.util.user_uuid_pattern.match(project_uuid):
+        query = api_client.users().get(uuid=project_uuid)
+    elif arvados.util.group_uuid_pattern.match(project_uuid):
+        query = api_client.groups().get(uuid=project_uuid)
     else:
-        return api_client.users().current().execute()['uuid']
+        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
+    return query.execute(num_retries=num_retries)['uuid']
 
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
@@ -387,7 +388,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     # Determine the parent project
     try:
-        project_uuid = desired_project_uuid(api_client, args.project_uuid)
+        project_uuid = desired_project_uuid(api_client, args.project_uuid,
+                                            args.retries)
     except (apiclient.errors.Error, ValueError) as error:
         print >>stderr, error
         sys.exit(1)
@@ -413,10 +415,11 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
             sys.exit(1)
 
     if resume_cache is None:
-        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+        writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected,
+                                        num_retries=args.retries)
     else:
         writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected)
+            resume_cache, reporter, bytes_expected, num_retries=args.retries)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -456,7 +459,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                     'manifest_text': writer.manifest_text()
                     },
                 ensure_unique_name=True
-                ).execute()
+                ).execute(num_retries=args.retries)
 
             print >>stderr, "Collection saved as '%s'" % collection['name']
 
diff --git a/sdk/python/bin/arv-get b/sdk/python/bin/arv-get
index 0d403d1..ccf8442 100755
--- a/sdk/python/bin/arv-get
+++ b/sdk/python/bin/arv-get
@@ -9,6 +9,7 @@ import sys
 import logging
 
 import arvados
+import arvados.commands._util as arv_cmd
 
 logger = logging.getLogger('arvados.arv-get')
 
@@ -17,7 +18,8 @@ def abort(msg, code=1):
     exit(code)
 
 parser = argparse.ArgumentParser(
-    description='Copy data from Keep to a local file or pipe.')
+    description='Copy data from Keep to a local file or pipe.',
+    parents=[arv_cmd.retry_opt])
 parser.add_argument('locator', type=str,
                     help="""
 Collection locator, optionally with a file path or prefix.
@@ -123,36 +125,25 @@ collection = r.group(1)
 get_prefix = r.group(2)
 if args.r and not get_prefix:
     get_prefix = os.sep
-
-todo = []
-todo_bytes = 0
 api_client = arvados.api('v1')
+reader = arvados.CollectionReader(collection, num_retries=args.retries)
+
 if not get_prefix:
-    try:
-        if not args.n:
-            if not args.f and os.path.exists(args.destination):
-                abort('Local file %s already exists.' % (args.destination,))
-            with open(args.destination, 'wb') as f:
-                try:
-                    c = api_client.collections().get(uuid=collection).execute()
-                    manifest = c['manifest_text']
-                except Exception as e:
-                    logger.warning(
-                        "Collection %s not found. " +
-                        "Trying to fetch directly from Keep (deprecated).",
-                        collection)
-                    manifest = arvados.KeepClient(
-                        api_client=api_client).get(collection)
-                f.write(manifest)
-        sys.exit(0)
-    except arvados.errors.NotFoundError as e:
-        abort(e)
-
-reader = arvados.CollectionReader(collection)
+    if not args.n:
+        try:
+            manifest = reader.manifest_text()
+        except (arvados.errors.ApiError, arvados.errors.KeepReadError) as e:
+            abort(e)
+        if not args.f and os.path.exists(args.destination):
+            abort('Local file %s already exists.' % (args.destination,))
+        with open(args.destination, 'wb') as f:
+            f.write(manifest)
+    sys.exit(0)
 
 # Scan the collection. Make an array of (stream, file, local
 # destination filename) tuples, and add up total size to extract.
-
+todo = []
+todo_bytes = 0
 try:
     for s in reader.all_streams():
         for f in s.all_files():
diff --git a/sdk/python/bin/arv-ls b/sdk/python/bin/arv-ls
index ee74bb8..382bfe8 100755
--- a/sdk/python/bin/arv-ls
+++ b/sdk/python/bin/arv-ls
@@ -1,27 +1,20 @@
 #!/usr/bin/env python
 
 import argparse
-import hashlib
-import os
-import re
-import string
-import sys
+
+import arvados
+import arvados.commands._util as arv_cmd
 
 parser = argparse.ArgumentParser(
-    description='List contents of a manifest')
+    description='List contents of a manifest',
+    parents=[arv_cmd.retry_opt])
 
 parser.add_argument('locator', type=str,
-                    help="""
-Collection locator
-""")
-
+                    help="Collection UUID or locator")
 parser.add_argument('-s', action='store_true', help="""List file sizes, in KiB.""")
 
 args = parser.parse_args()
-
-import arvados
-
-cr = arvados.CollectionReader(args.locator)
+cr = arvados.CollectionReader(args.locator, num_retries=args.retries)
 
 for f in cr.all_files():
     if args.s:
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index c7b01bc..001add3 100644
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -440,14 +440,15 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
 
     def test_check_real_project_found(self):
         self.authorize_with('active')
-        self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID),
+        self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
                         "did not correctly find test fixture project")
 
     def test_check_error_finding_nonexistent_uuid(self):
         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
         self.authorize_with('active')
         try:
-            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+                                                  0)
         except ValueError as error:
             self.assertIn(BAD_UUID, error.message)
         else:
@@ -457,7 +458,8 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
         self.authorize_with('active')
         with self.assertRaises(apiclient.errors.HttpError):
-            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID)
+            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+                                                  0)
 
     def test_short_put_from_stdin(self):
         # Have to run this as an integration test since arv-put can't

commit 434a44692c8027849f7194d08559ba7df665da5a
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Sep 16 17:27:51 2014 -0400

    3147: Add retry support to PySDK Collection objects.
    
    This required updating the FUSE driver's SafeApi object to better
    imitate a real API object, now that CollectionReader will pass it down
    to the underlying KeepClient.

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 496136e..a06cad0 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -91,10 +91,26 @@ def normalize(collection):
     return normalized_streams
 
 
-class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
+class CollectionBase(object):
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        pass
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            self._keep_client = KeepClient(api_client=self._api_client,
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+
+class CollectionReader(CollectionBase):
+    def __init__(self, manifest_locator_or_text, api_client=None,
+                 keep_client=None, num_retries=0):
         self._api_client = api_client
         self._keep_client = keep_client
+        self.num_retries = num_retries
         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
@@ -109,12 +125,6 @@ class CollectionReader(object):
                 "Argument to CollectionReader must be a manifest or a collection UUID")
         self._streams = None
 
-    def __enter__(self):
-        pass
-
-    def __exit__(self):
-        pass
-
     def _populate(self):
         if self._streams is not None:
             return
@@ -128,11 +138,10 @@ class CollectionReader(object):
                 # just like any other Collection lookup failure.
                 if self._api_client is None:
                     self._api_client = arvados.api('v1')
-                    self._keep_client = KeepClient(api_client=self._api_client)
-                if self._keep_client is None:
-                    self._keep_client = KeepClient(api_client=self._api_client)
+                    self._keep_client = None  # Make a new one with the new api.
                 c = self._api_client.collections().get(
-                    uuid=self._manifest_locator).execute()
+                    uuid=self._manifest_locator).execute(
+                    num_retries=self.num_retries)
                 self._manifest_text = c['manifest_text']
             except Exception as e:
                 if not util.portable_data_hash_pattern.match(
@@ -140,29 +149,24 @@ class CollectionReader(object):
                     raise
                 _logger.warning("API lookup failed for collection %s (%s: %s)",
                                 self._manifest_locator, type(e), str(e))
-                if self._keep_client is None:
-                    self._keep_client = KeepClient(api_client=self._api_client)
-                self._manifest_text = self._keep_client.get(self._manifest_locator)
-        self._streams = []
-        for stream_line in self._manifest_text.split("\n"):
-            if stream_line != '':
-                stream_tokens = stream_line.split()
-                self._streams += [stream_tokens]
+                self._manifest_text = self._my_keep().get(
+                    self._manifest_locator, num_retries=self.num_retries)
+        self._streams = [sline.split()
+                         for sline in self._manifest_text.split("\n")
+                         if sline]
         self._streams = normalize(self)
 
         # now regenerate the manifest text based on the normalized stream
 
         #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
+        self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
         #print "result", self._manifest_text
 
 
     def all_streams(self):
         self._populate()
-        resp = []
-        for s in self._streams:
-            resp.append(StreamReader(s, keep=self._keep_client))
-        return resp
+        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+                for s in self._streams]
 
     def all_files(self):
         for s in self.all_streams():
@@ -172,16 +176,18 @@ class CollectionReader(object):
     def manifest_text(self, strip=False):
         self._populate()
         if strip:
-            m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
+            m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
             return m
         else:
             return self._manifest_text
 
-class CollectionWriter(object):
+
+class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self, api_client=None):
+    def __init__(self, api_client=None, num_retries=0):
         self._api_client = api_client
+        self.num_retries = num_retries
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
@@ -197,16 +203,9 @@ class CollectionWriter(object):
         self._queued_dirents = deque()
         self._queued_trees = deque()
 
-    def __enter__(self):
-        pass
-
     def __exit__(self):
         self.finish()
 
-    def _prep_keep_client(self):
-        if self._keep_client is None:
-            self._keep_client = KeepClient(api_client=self._api_client)
-
     def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
@@ -303,7 +302,7 @@ class CollectionWriter(object):
             for s in newdata:
                 self.write(s)
             return
-        self._data_buffer += [newdata]
+        self._data_buffer.append(newdata)
         self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
@@ -311,10 +310,9 @@ class CollectionWriter(object):
 
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
-        if data_buffer != '':
-            self._prep_keep_client()
+        if data_buffer:
             self._current_stream_locators.append(
-                self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -342,9 +340,10 @@ class CollectionWriter(object):
                 (self._current_stream_length - self._current_file_pos,
                  self._current_file_pos,
                  self._current_stream_name))
-        self._current_stream_files += [[self._current_file_pos,
-                                        self._current_stream_length - self._current_file_pos,
-                                        self._current_file_name]]
+        self._current_stream_files.append([
+                self._current_file_pos,
+                self._current_stream_length - self._current_file_pos,
+                self._current_file_name])
         self._current_file_pos = self._current_stream_length
 
     def start_new_stream(self, newstreamname='.'):
@@ -363,18 +362,18 @@ class CollectionWriter(object):
     def finish_current_stream(self):
         self.finish_current_file()
         self.flush_data()
-        if len(self._current_stream_files) == 0:
+        if not self._current_stream_files:
             pass
-        elif self._current_stream_name == None:
+        elif self._current_stream_name is None:
             raise errors.AssertionError(
                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
                 (self._current_stream_length, len(self._current_stream_files)))
         else:
-            if len(self._current_stream_locators) == 0:
-                self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
-            self._finished_streams += [[self._current_stream_name,
-                                        self._current_stream_locators,
-                                        self._current_stream_files]]
+            if not self._current_stream_locators:
+                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+            self._finished_streams.append([self._current_stream_name,
+                                           self._current_stream_locators,
+                                           self._current_stream_files])
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -384,8 +383,7 @@ class CollectionWriter(object):
 
     def finish(self):
         # Store the manifest in Keep and return its locator.
-        self._prep_keep_client()
-        return self._keep_client.put(self.manifest_text())
+        return self._my_keep().put(self.manifest_text())
 
     def stripped_manifest(self):
         """
@@ -414,8 +412,8 @@ class CollectionWriter(object):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        if len(manifest) > 0:
-            return CollectionReader(manifest).manifest_text()
+        if manifest:
+            return CollectionReader(manifest, self._api_client).manifest_text()
         else:
             return ""
 
@@ -433,9 +431,10 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self, api_client=None):
+    def __init__(self, api_client=None, num_retries=0):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__(api_client)
+        super(ResumableCollectionWriter, self).__init__(
+            api_client, num_retries=num_retries)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 48fb013..0dbf9bc 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -20,9 +20,9 @@ def fake_httplib2_response(code, **headers):
                    reason=httplib.responses.get(code, "Unknown Response"))
     return httplib2.Response(headers)
 
-def mock_responses(body, *codes):
+def mock_responses(body, *codes, **headers):
     return mock.patch('httplib2.Http.request', side_effect=(
-            (fake_httplib2_response(code), body) for code in codes))
+            (fake_httplib2_response(code, **headers), body) for code in codes))
 
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f066c5d..98a72f6 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -5,6 +5,7 @@
 import arvados
 import bz2
 import copy
+import mock
 import os
 import pprint
 import subprocess
@@ -12,7 +13,7 @@ import tempfile
 import unittest
 
 import run_test_server
-from arvados_testutil import ArvadosBaseTestCase
+import arvados_testutil as tutil
 
 class TestResumableWriter(arvados.ResumableCollectionWriter):
     KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
@@ -22,7 +23,7 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
 
 
 class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
-                             ArvadosBaseTestCase):
+                             tutil.ArvadosBaseTestCase):
     MAIN_SERVER = {}
 
     @classmethod
@@ -620,5 +621,136 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
                           cwriter.write, "badtext")
 
 
+class CollectionTestMixin(object):
+    PROXY_RESPONSE = {
+        'items_available': 1,
+        'items': [{
+                'uuid': 'zzzzz-bi6l4-mockproxy012345',
+                'owner_uuid': 'zzzzz-tpzed-mockowner012345',
+                'service_host': tutil.TEST_HOST,
+                'service_port': 65535,
+                'service_ssl_flag': True,
+                'service_type': 'proxy',
+                }]}
+    API_COLLECTIONS = run_test_server.fixture('collections')
+    DEFAULT_COLLECTION = API_COLLECTIONS['foo_file']
+    DEFAULT_DATA_HASH = DEFAULT_COLLECTION['portable_data_hash']
+    DEFAULT_MANIFEST = DEFAULT_COLLECTION['manifest_text']
+    DEFAULT_UUID = DEFAULT_COLLECTION['uuid']
+
+    def _mock_api_call(self, mock_method, code, body):
+        mock_method = mock_method().execute
+        if code == 200:
+            mock_method.return_value = body
+        else:
+            mock_method.side_effect = arvados.errors.ApiError(
+                tutil.fake_httplib2_response(code), "{}")
+
+    def mock_keep_services(self, api_mock, code, body):
+        self._mock_api_call(api_mock.keep_services().accessible, code, body)
+
+    def api_client_mock(self, code=200):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, code, self.PROXY_RESPONSE)
+        return client
+
+
+ at tutil.skip_sleep
+class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
+    def mock_get_collection(self, api_mock, code, body):
+        body = self.API_COLLECTIONS.get(body)
+        self._mock_api_call(api_mock.collections().get, code, body)
+
+    def api_client_mock(self, code=200):
+        client = super(CollectionReaderTestCase, self).api_client_mock(code)
+        self.mock_get_collection(client, code, 'foo_file')
+        return client
+
+    def test_init_no_default_retries(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        reader.manifest_text()
+        client.collections().get().execute.assert_called_with(num_retries=0)
+
+    def test_uuid_init_success(self):
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+                                          num_retries=3)
+        self.assertEqual(self.DEFAULT_COLLECTION['manifest_text'],
+                         reader.manifest_text())
+        client.collections().get().execute.assert_called_with(num_retries=3)
+
+    def test_uuid_init_failure_raises_api_error(self):
+        client = self.api_client_mock(500)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client)
+        with self.assertRaises(arvados.errors.ApiError):
+            reader.manifest_text()
+
+    def test_locator_init(self):
+        client = self.api_client_mock(200)
+        # Ensure Keep will not return anything if asked.
+        with tutil.mock_responses(None, 404):
+            reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+                                              api_client=client)
+            self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+    def test_locator_init_falls_back_to_keep(self):
+        # Reading manifests from Keep is deprecated.  Feel free to
+        # remove this test when we remove the fallback.
+        client = self.api_client_mock(200)
+        self.mock_get_collection(client, 404, None)
+        with tutil.mock_responses(self.DEFAULT_MANIFEST, 200):
+            reader = arvados.CollectionReader(self.DEFAULT_DATA_HASH,
+                                              api_client=client, num_retries=3)
+            self.assertEqual(self.DEFAULT_MANIFEST, reader.manifest_text())
+
+    def test_init_num_retries_propagated(self):
+        # More of an integration test...
+        client = self.api_client_mock(200)
+        reader = arvados.CollectionReader(self.DEFAULT_UUID, api_client=client,
+                                          num_retries=3)
+        with tutil.mock_responses('foo', 500, 500, 200):
+            self.assertEqual('foo',
+                             ''.join(f.read(9) for f in reader.all_files()))
+
+
+ at tutil.skip_sleep
+class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
+    def mock_keep(self, body, *codes, **headers):
+        headers.setdefault('x-keep-replicas-stored', 2)
+        return tutil.mock_responses(body, *codes, **headers)
+
+    def foo_writer(self, **kwargs):
+        api_client = self.api_client_mock()
+        writer = arvados.CollectionWriter(api_client, **kwargs)
+        writer.start_new_file('foo')
+        writer.write('foo')
+        return writer
+
+    def test_write_whole_collection(self):
+        writer = self.foo_writer()
+        with self.mock_keep(self.DEFAULT_DATA_HASH, 200, 200):
+            self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+    def test_write_no_default(self):
+        writer = self.foo_writer()
+        with self.mock_keep(None, 500):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.finish()
+
+    def test_write_whole_collection_through_retries(self):
+        writer = self.foo_writer(num_retries=2)
+        with self.mock_keep(self.DEFAULT_DATA_HASH,
+                            500, 500, 200, 500, 500, 200):
+            self.assertEqual(self.DEFAULT_DATA_HASH, writer.finish())
+
+    def test_flush_data_retries(self):
+        writer = self.foo_writer(num_retries=2)
+        foo_hash = self.DEFAULT_MANIFEST.split()[1]
+        with self.mock_keep(foo_hash, 500, 200):
+            writer.flush_data()
+        self.assertEqual(self.DEFAULT_MANIFEST, writer.manifest_text())
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index f49b947..d5523e8 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -31,14 +31,15 @@ class SafeApi(object):
 
     def __init__(self, config):
         self.host = config.get('ARVADOS_API_HOST')
-        self.token = config.get('ARVADOS_API_TOKEN')
+        self.api_token = config.get('ARVADOS_API_TOKEN')
         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
         self.local = threading.local()
         self.block_cache = arvados.KeepBlockCache()
 
     def localapi(self):
         if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
+            self.local.api = arvados.api('v1', False, self.host,
+                                         self.api_token, self.insecure)
         return self.local.api
 
     def localkeep(self):
@@ -46,17 +47,13 @@ class SafeApi(object):
             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
         return self.local.keep
 
-    def collections(self):
-        return self.localapi().collections()
-
-    def links(self):
-        return self.localapi().links()
-
-    def groups(self):
-        return self.localapi().groups()
+    def __getattr__(self, name):
+        # Proxy nonexistent attributes to the local API client.
+        try:
+            return getattr(self.localapi(), name)
+        except AttributeError:
+            return super(SafeApi, self).__getattr__(name)
 
-    def users(self):
-        return self.localapi().users()
 
 def convertTime(t):
     '''Parse Arvados timestamp to unix time.'''

commit 3b5c8cf8a3cf5ab3ccd33566c3c96c415bd0d3d6
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Sep 12 09:28:47 2014 -0400

    3147: Add retry support to PySDK StreamReader classes.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 9522976..323251d 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -707,7 +707,10 @@ class KeepClient(object):
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, copies, thread_limiter.done()))
 
-    def local_store_put(self, data):
+    # Local storage methods need no-op num_retries arguments to keep
+    # integration tests happy.  With better isolation they could
+    # probably be removed again.
+    def local_store_put(self, data, num_retries=0):
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -716,7 +719,7 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s):
+    def local_store_get(self, loc_s, num_retries=0):
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index b98937f..04b6b81 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -18,6 +18,7 @@ import time
 import threading
 import collections
 
+from arvados.retry import retry_method
 from keep import *
 import config
 import errors
@@ -108,6 +109,7 @@ class StreamFileReader(object):
         self.segments = segments
         self._name = name
         self._filepos = 0L
+        self.num_retries = stream.num_retries
 
     def name(self):
         return self._name
@@ -128,7 +130,8 @@ class StreamFileReader(object):
         n = self.segments[-1]
         return n[OFFSET] + n[BLOCKSIZE]
 
-    def read(self, size):
+    @retry_method
+    def read(self, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at the current file position"""
         if size == 0:
             return ''
@@ -137,52 +140,58 @@ class StreamFileReader(object):
         available_chunks = locators_and_ranges(self.segments, self._filepos, size)
         if available_chunks:
             locator, blocksize, segmentoffset, segmentsize = available_chunks[0]
-            data = self._stream.readfrom(locator+segmentoffset, segmentsize)
+            data = self._stream.readfrom(locator+segmentoffset, segmentsize,
+                                         num_retries=num_retries)
 
         self._filepos += len(data)
         return data
 
-    def readfrom(self, start, size):
+    @retry_method
+    def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
         if size == 0:
             return ''
 
-        data = ''
+        data = []
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self.segments, start, size):
-            data += self._stream.readfrom(locator+segmentoffset, segmentsize)
-        return data
+            data.append(self._stream.readfrom(locator+segmentoffset, segmentsize,
+                                              num_retries=num_retries))
+        return ''.join(data)
 
-    def readall(self, size=2**20):
+    @retry_method
+    def readall(self, size=2**20, num_retries=None):
         while True:
-            data = self.read(size)
+            data = self.read(size, num_retries=num_retries)
             if data == '':
                 break
             yield data
 
-    def decompress(self, decompress, size):
-        for segment in self.readall(size):
+    @retry_method
+    def decompress(self, decompress, size, num_retries=None):
+        for segment in self.readall(size, num_retries):
             data = decompress(segment)
             if data and data != '':
                 yield data
 
-    def readall_decompressed(self, size=2**20):
+    @retry_method
+    def readall_decompressed(self, size=2**20, num_retries=None):
         self.seek(0)
         if re.search('\.bz2$', self._name):
             dc = bz2.BZ2Decompressor()
-            return self.decompress(lambda segment: dc.decompress(segment), size)
+            return self.decompress(dc.decompress, size,
+                                   num_retries=num_retries)
         elif re.search('\.gz$', self._name):
             dc = zlib.decompressobj(16+zlib.MAX_WBITS)
-            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment), size)
+            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
+                                   size, num_retries=num_retries)
         else:
-            return self.readall(size)
+            return self.readall(size, num_retries=num_retries)
 
-    def readlines(self, decompress=True):
-        if decompress:
-            datasource = self.readall_decompressed()
-        else:
-            datasource = self.readall()
+    @retry_method
+    def readlines(self, decompress=True, num_retries=None):
+        read_func = self.readall_decompressed if decompress else self.readall
         data = ''
-        for newdata in datasource:
+        for newdata in read_func(num_retries=num_retries):
             data += newdata
             sol = 0
             while True:
@@ -201,12 +210,15 @@ class StreamFileReader(object):
         manifest_text.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in self.segments])
         return arvados.CollectionReader(' '.join(manifest_text) + '\n').manifest_text()
 
+
 class StreamReader(object):
-    def __init__(self, tokens, keep=None, debug=False, _empty=False):
+    def __init__(self, tokens, keep=None, debug=False, _empty=False,
+                 num_retries=0):
         self._stream_name = None
         self._data_locators = []
         self._files = collections.OrderedDict()
         self._keep = keep
+        self.num_retries = num_retries
 
         streamoffset = 0L
 
@@ -254,16 +266,17 @@ class StreamReader(object):
     def locators_and_ranges(self, range_start, range_size):
         return locators_and_ranges(self._data_locators, range_start, range_size)
 
-    def readfrom(self, start, size):
+    @retry_method
+    def readfrom(self, start, size, num_retries=None):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
         if size == 0:
             return ''
         if self._keep is None:
-            self._keep = KeepClient()
-        data = ''
+            self._keep = KeepClient(num_retries=self.num_retries)
+        data = []
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
-            data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
-        return data
+            data.append(self._keep.get(locator, num_retries=num_retries)[segmentoffset:segmentoffset+segmentsize])
+        return ''.join(data)
 
     def manifest_text(self, strip=False):
         manifest_text = [self.name().replace(' ', '\\040')]
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index 284854b..f066c5d 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -349,8 +349,9 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     class MockStreamReader(object):
         def __init__(self, content):
             self.content = content
+            self.num_retries = 0
 
-        def readfrom(self, start, size):
+        def readfrom(self, start, size, num_retries=0):
             return self.content[start:start+size]
 
     def test_file_stream(self):
@@ -422,7 +423,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
         def __init__(self, content, num_retries=0):
             self.content = content
 
-        def get(self, locator):
+        def get(self, locator, num_retries=0):
             return self.content[locator]
 
     def test_stream_reader(self):
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
index 060a26a..3970d67 100644
--- a/sdk/python/tests/test_stream.py
+++ b/sdk/python/tests/test_stream.py
@@ -1,18 +1,82 @@
 #!/usr/bin/env python
 
+import mock
 import unittest
 
 import arvados
 from arvados import StreamReader, StreamFileReader
 
+import arvados_testutil as tutil
 import run_test_server
 
-class StreamReaderTestCase(unittest.TestCase):
+class StreamRetryTestMixin(object):
+    # Define reader_for(coll_name, **kwargs)
+    # and read_for_test(reader, size, **kwargs).
     API_COLLECTIONS = run_test_server.fixture('collections')
 
+    def keep_client(self):
+        return arvados.KeepClient(proxy='http://[%s]:1' % (tutil.TEST_HOST,),
+                                  local_store='')
+
     def manifest_for(self, coll_name):
         return self.API_COLLECTIONS[coll_name]['manifest_text']
 
+    @tutil.skip_sleep
+    def test_success_without_retries(self):
+        reader = self.reader_for('bar_file')
+        with tutil.mock_responses('bar', 200):
+            self.assertEqual('bar', self.read_for_test(reader, 3))
+
+    @tutil.skip_sleep
+    def test_read_no_default_retry(self):
+        reader = self.reader_for('user_agreement')
+        with tutil.mock_responses('', 500):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 10)
+
+    @tutil.skip_sleep
+    def test_read_with_instance_retries(self):
+        reader = self.reader_for('foo_file', num_retries=3)
+        with tutil.mock_responses('foo', 500, 200):
+            self.assertEqual('foo', self.read_for_test(reader, 3))
+
+    @tutil.skip_sleep
+    def test_read_with_method_retries(self):
+        reader = self.reader_for('foo_file')
+        with tutil.mock_responses('foo', 500, 200):
+            self.assertEqual('foo',
+                             self.read_for_test(reader, 3, num_retries=3))
+
+    @tutil.skip_sleep
+    def test_read_instance_retries_exhausted(self):
+        reader = self.reader_for('bar_file', num_retries=3)
+        with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 3)
+
+    @tutil.skip_sleep
+    def test_read_method_retries_exhausted(self):
+        reader = self.reader_for('bar_file')
+        with tutil.mock_responses('bar', 500, 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 3, num_retries=3)
+
+    @tutil.skip_sleep
+    def test_method_retries_take_precedence(self):
+        reader = self.reader_for('user_agreement', num_retries=10)
+        with tutil.mock_responses('', 500, 500, 500, 200):
+            with self.assertRaises(arvados.errors.KeepReadError):
+                self.read_for_test(reader, 10, num_retries=1)
+
+
+class StreamReaderTestCase(unittest.TestCase, StreamRetryTestMixin):
+    def reader_for(self, coll_name, **kwargs):
+        return StreamReader(self.manifest_for(coll_name).split(),
+                            self.keep_client(), **kwargs)
+
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.readfrom(0, byte_count, **kwargs)
+
     def test_manifest_text_without_keep_client(self):
         mtext = self.manifest_for('multilevel_collection_1')
         for line in mtext.rstrip('\n').split('\n'):
@@ -20,5 +84,34 @@ class StreamReaderTestCase(unittest.TestCase):
             self.assertEqual(line + '\n', reader.manifest_text())
 
 
+class StreamFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
+    def reader_for(self, coll_name, **kwargs):
+        return StreamReader(self.manifest_for(coll_name).split(),
+                            self.keep_client(), **kwargs).all_files()[0]
+
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.read(byte_count, **kwargs)
+
+
+class StreamFileReadFromTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return reader.readfrom(0, byte_count, **kwargs)
+
+
+class StreamFileReadAllTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readall(**kwargs))
+
+
+class StreamFileReadAllDecompressedTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readall_decompressed(**kwargs))
+
+
+class StreamFileReadlinesTestCase(StreamFileReadTestCase):
+    def read_for_test(self, reader, byte_count, **kwargs):
+        return ''.join(reader.readlines(**kwargs))
+
+
 if __name__ == '__main__':
     unittest.main()

commit ceceb839425dac060ab9baad66a5dba4af78e5e8
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Sep 16 17:24:31 2014 -0400

    3147: Make PySDK KeepClient.get and put retry_methods.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 64d82ba..9522976 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -392,7 +392,8 @@ class KeepClient(object):
 
 
     def __init__(self, api_client=None, proxy=None, timeout=300,
-                 api_token=None, local_store=None, block_cache=None):
+                 api_token=None, local_store=None, block_cache=None,
+                 num_retries=0):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -416,6 +417,9 @@ class KeepClient(object):
           environment variable.  If you want to ensure KeepClient does not
           use local storage, pass in an empty string.  This is primarily
           intended to mock a server for testing.
+        * num_retries: The default number of times to retry failed requests.
+          This will be used as the default num_retries value when get() and
+          put() are called.  Default 0.
         """
         self.lock = threading.Lock()
         if proxy is None:
@@ -436,7 +440,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.timeout = timeout
-
+            self.num_retries = num_retries
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -558,7 +562,8 @@ class KeepClient(object):
         else:
             return None
 
-    def get(self, loc_s, num_retries=0):
+    @retry.retry_method
+    def get(self, loc_s, num_retries=None):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -575,7 +580,8 @@ class KeepClient(object):
           *each* Keep server if it returns temporary failures, with
           exponential backoff.  Note that, in each loop, the method may try
           to fetch data from every available Keep service, along with any
-          that are named in location hints in the locator.  Default 0.
+          that are named in location hints in the locator.  The default value
+          is set when the KeepClient is initialized.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
@@ -638,7 +644,8 @@ class KeepClient(object):
         else:
             raise arvados.errors.KeepReadError(loc_s)
 
-    def put(self, data, copies=2, num_retries=0):
+    @retry.retry_method
+    def put(self, data, copies=2, num_retries=None):
         """Save data in Keep.
 
         This method will get a list of Keep services from the API server, and
@@ -653,7 +660,8 @@ class KeepClient(object):
           Default 2.
         * num_retries: The number of times to retry PUT requests to
           *each* Keep server if it returns temporary failures, with
-          exponential backoff.  Default 0.
+          exponential backoff.  The default value is set when the
+          KeepClient is initialized.
         """
         data_hash = hashlib.md5(data).hexdigest()
         if copies < 1:
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f6abe2b..284854b 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -419,7 +419,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
 
 
     class MockKeep(object):
-        def __init__(self, content):
+        def __init__(self, content, num_retries=0):
             self.content = content
 
         def get(self, locator):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9a01d86..8cc22a6 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -236,8 +236,13 @@ class KeepClientRetryTestMixin(object):
     TEST_DATA = 'testdata'
     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
 
-    def new_client(self):
-        return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+    def setUp(self):
+        self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
+
+    def new_client(self, **caller_kwargs):
+        kwargs = self.client_kwargs.copy()
+        kwargs.update(caller_kwargs)
+        return arvados.KeepClient(**kwargs)
 
     def run_method(self, *args, **kwargs):
         raise NotImplementedError("test subclasses must define run_method")
@@ -272,6 +277,11 @@ class KeepClientRetryTestMixin(object):
         with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
             self.check_exception(num_retries=1)
 
+    def test_num_retries_instance_fallback(self):
+        self.client_kwargs['num_retries'] = 3
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_success()
+
 
 @tutil.skip_sleep
 class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):

commit 17dece4b2026cad47ba526cbad8dfce81f5fdebc
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Sep 12 08:49:21 2014 -0400

    3147: Add retry_method to the Python SDK.
    
    This gives us a way to make retry support flexible and consistent
    across SDK classes.  Any class that has retryable operations should
    take a num_retries argument at initialization.  Then, the specific
    methods that implement those operations should also take a num_retries
    argument, which will fall back to the instance's setting by default.
    This lets SDK users express their retry preferences wherever it's most
    convenient for them.

diff --git a/sdk/python/arvados/retry.py b/sdk/python/arvados/retry.py
index 5dc31ae..3d2fc48 100644
--- a/sdk/python/arvados/retry.py
+++ b/sdk/python/arvados/retry.py
@@ -1,5 +1,7 @@
 #!/usr/bin/env python
 
+import functools
+import inspect
 import time
 
 from collections import deque
@@ -138,3 +140,19 @@ def check_http_response_success(result):
         return False
     else:
         return None  # Get well soon, server.
+
+def retry_method(orig_func):
+    """Provide a default value for a method's num_retries argument.
+
+    This is a decorator for instance and class methods that accept a
+    num_retries argument, with a None default.  When the method is called
+    without a value for num_retries, it will be set from the underlying
+    instance or class' num_retries attribute.
+    """
+    @functools.wraps(orig_func)
+    def num_retries_setter(self, *args, **kwargs):
+        arg_vals = inspect.getcallargs(orig_func, self, *args, **kwargs)
+        if arg_vals['num_retries'] is None:
+            kwargs['num_retries'] = self.num_retries
+        return orig_func(self, *args, **kwargs)
+    return num_retries_setter
diff --git a/sdk/python/tests/test_retry.py b/sdk/python/tests/test_retry.py
index ed0a406..8c3916b 100644
--- a/sdk/python/tests/test_retry.py
+++ b/sdk/python/tests/test_retry.py
@@ -194,5 +194,32 @@ class CheckHTTPResponseSuccessTestCase(unittest.TestCase):
         self.check_is(None, 0, 99, 600, -200)
 
 
+class RetryMethodTestCase(unittest.TestCase):
+    class Tester(object):
+        def __init__(self):
+            self.num_retries = 1
+
+        @arv_retry.retry_method
+        def check(self, a, num_retries=None, z=0):
+            return (a, num_retries, z)
+
+
+    def test_positional_arg_passed(self):
+        self.assertEqual((3, 2, 0), self.Tester().check(3, 2))
+
+    def test_keyword_arg_passed(self):
+        self.assertEqual((4, 3, 0), self.Tester().check(num_retries=3, a=4))
+
+    def test_not_specified(self):
+        self.assertEqual((0, 1, 0), self.Tester().check(0))
+
+    def test_not_specified_with_other_kwargs(self):
+        self.assertEqual((1, 1, 1), self.Tester().check(1, z=1))
+
+    def test_bad_call(self):
+        with self.assertRaises(TypeError):
+            self.Tester().check(num_retries=2)
+
+
 if __name__ == '__main__':
     unittest.main()

commit 4c1de84b27923acf65b815dcf649d73a074f3645
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Sep 11 14:46:37 2014 -0400

    3147: Fix variable name typo.
    
    This variable was renamed in previous branch work, but this last user
    of it wasn't caught until just now.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 22bf327..64d82ba 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -316,7 +316,8 @@ class KeepClient(object):
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
                         return content
-                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+                    _logger.warning("Checksum fail: md5(%s) = %s",
+                                    url, resp_md5)
             return None
 
         def put(self, http, hash_s, body):
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 4ed2912..9a01d86 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,6 +312,12 @@ class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
                         side_effect=iter(side_effects)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
+    def test_retry_data_with_wrong_checksum(self):
+        side_effects = ((tutil.fake_httplib2_response(200), s)
+                        for s in ['baddata', self.TEST_DATA])
+        with mock.patch('httplib2.Http.request', side_effect=side_effects):
+            self.check_success(locator=self.HINTED_LOCATOR)
+
 
 @tutil.skip_sleep
 class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):

commit f749765cb8701c86c123a657540675d332c220df
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Sep 11 14:22:12 2014 -0400

    3147: PySDK StreamReader instantiates a KeepClient late.
    
    Similarly with API client instantiation in KeepClient, this helps with
    testing.  Refs #3693.

diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index e13e1a6..b98937f 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -206,9 +206,6 @@ class StreamReader(object):
         self._stream_name = None
         self._data_locators = []
         self._files = collections.OrderedDict()
-
-        if keep is None:
-            keep = KeepClient()
         self._keep = keep
 
         streamoffset = 0L
@@ -261,6 +258,8 @@ class StreamReader(object):
         """Read up to 'size' bytes from the stream, starting at 'start'"""
         if size == 0:
             return ''
+        if self._keep is None:
+            self._keep = KeepClient()
         data = ''
         for locator, blocksize, segmentoffset, segmentsize in locators_and_ranges(self._data_locators, start, size):
             data += self._keep.get(locator)[segmentoffset:segmentoffset+segmentsize]
diff --git a/sdk/python/tests/test_stream.py b/sdk/python/tests/test_stream.py
new file mode 100644
index 0000000..060a26a
--- /dev/null
+++ b/sdk/python/tests/test_stream.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+
+import unittest
+
+import arvados
+from arvados import StreamReader, StreamFileReader
+
+import run_test_server
+
+class StreamReaderTestCase(unittest.TestCase):
+    API_COLLECTIONS = run_test_server.fixture('collections')
+
+    def manifest_for(self, coll_name):
+        return self.API_COLLECTIONS[coll_name]['manifest_text']
+
+    def test_manifest_text_without_keep_client(self):
+        mtext = self.manifest_for('multilevel_collection_1')
+        for line in mtext.rstrip('\n').split('\n'):
+            reader = StreamReader(line.split())
+            self.assertEqual(line + '\n', reader.manifest_text())
+
+
+if __name__ == '__main__':
+    unittest.main()

commit 4d0665892191045a2d03f68b2364bb87a278b6ac
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Sep 12 16:46:54 2014 -0400

    3147: Move PySDK HTTP test infrastructure to testutil.
    
    Making this available to other test collections.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index 77146db..48fb013 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -3,16 +3,27 @@
 import errno
 import httplib
 import httplib2
+import mock
 import os
 import shutil
 import tempfile
 import unittest
 
+# Use this hostname when you want to make sure the traffic will be
+# instantly refused.  100::/64 is a dedicated black hole.
+TEST_HOST = '100::'
+
+skip_sleep = mock.patch('time.sleep', lambda n: None)  # clown'll eat me
+
 def fake_httplib2_response(code, **headers):
     headers.update(status=str(code),
                    reason=httplib.responses.get(code, "Unknown Response"))
     return httplib2.Response(headers)
 
+def mock_responses(body, *codes):
+    return mock.patch('httplib2.Http.request', side_effect=(
+            (fake_httplib2_response(code), body) for code in codes))
+
 class ArvadosBaseTestCase(unittest.TestCase):
     # This class provides common utility functions for our tests.
 
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 900143e..4ed2912 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -5,8 +5,8 @@ import unittest
 
 import arvados
 import arvados.retry
+import arvados_testutil as tutil
 import run_test_server
-from arvados_testutil import fake_httplib2_response
 
 class KeepTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
@@ -232,15 +232,10 @@ class KeepClientRetryTestMixin(object):
     # supporting servers, and prevents side effects in case something hiccups.
     # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
     # run_method().
-    PROXY_ADDR = 'http://[100::]/'
+    PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
     TEST_DATA = 'testdata'
     TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
 
-    @staticmethod
-    def mock_responses(body, *codes):
-        return mock.patch('httplib2.Http.request', side_effect=(
-                (fake_httplib2_response(code), body) for code in codes))
-
     def new_client(self):
         return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
 
@@ -258,30 +253,28 @@ class KeepClientRetryTestMixin(object):
         self.assertRaises(error_class, self.run_method, *args, **kwargs)
 
     def test_immediate_success(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
             self.check_success()
 
     def test_retry_then_success(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
             self.check_success(num_retries=3)
 
     def test_no_default_retry(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 200):
             self.check_exception()
 
     def test_no_retry_after_permanent_error(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 403, 200):
             self.check_exception(num_retries=3)
 
     def test_error_after_retries_exhausted(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
             self.check_exception(num_retries=1)
 
 
-# Don't delay from HTTPRetryLoop's exponential backoff.
-no_backoff = mock.patch('time.sleep', lambda n: None)
- at no_backoff
-class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
     DEFAULT_EXCEPTION = arvados.errors.KeepReadError
     HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K at xyzzy'
@@ -291,7 +284,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
         return self.new_client().get(locator, *args, **kwargs)
 
     def test_specific_exception_when_not_found(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200):
             self.check_exception(arvados.errors.NotFoundError, num_retries=3)
 
     def test_general_exception_with_mixed_errors(self):
@@ -300,7 +293,7 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
         # This test rigs up 50/50 disagreement between two servers, and
         # checks that it does not become a NotFoundError.
         client = self.new_client()
-        with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 500):
             with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
                 client.get(self.HINTED_LOCATOR)
             self.assertNotIsInstance(
@@ -308,19 +301,20 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
                 "mixed errors raised NotFoundError")
 
     def test_hint_server_can_succeed_without_retries(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
             self.check_success(locator=self.HINTED_LOCATOR)
 
     def test_try_next_server_after_timeout(self):
-        side_effects = [socket.timeout("timed out"),
-                        (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+        side_effects = [
+            socket.timeout("timed out"),
+            (tutil.fake_httplib2_response(200), self.DEFAULT_EXPECT)]
         with mock.patch('httplib2.Http.request',
                         side_effect=iter(side_effects)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 
- at no_backoff
-class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+ at tutil.skip_sleep
+class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
     DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
     DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
 
@@ -329,5 +323,5 @@ class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
         return self.new_client().put(data, copies, *args, **kwargs)
 
     def test_do_not_send_multiple_copies_to_same_server(self):
-        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+        with tutil.mock_responses(self.DEFAULT_EXPECT, 200):
             self.check_exception(copies=2, num_retries=3)

commit cd051b592013c18445d837aed1c8dc3354c1cf82
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Sep 10 16:44:25 2014 -0400

    3147: PySDK tests use mock>=1.0 and easier mock side_effect.

diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 30cc779..e0afe89 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -32,5 +32,5 @@ setup(name='arvados-python-client',
         'ws4py'
         ],
       test_suite='tests',
-      tests_require=['mock', 'PyYAML'],
+      tests_require=['mock>=1.0', 'PyYAML'],
       zip_safe=False)
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 04bcab8..900143e 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -312,15 +312,10 @@ class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
             self.check_success(locator=self.HINTED_LOCATOR)
 
     def test_try_next_server_after_timeout(self):
-        responses = iter([None, (fake_httplib2_response(200),
-                                 self.DEFAULT_EXPECT)])
-        def side_effect(*args, **kwargs):
-            response = next(responses)
-            if response is None:
-                raise socket.timeout("timed out")
-            else:
-                return response
-        with mock.patch('httplib2.Http.request', side_effect=side_effect):
+        side_effects = [socket.timeout("timed out"),
+                        (fake_httplib2_response(200), self.DEFAULT_EXPECT)]
+        with mock.patch('httplib2.Http.request',
+                        side_effect=iter(side_effects)):
             self.check_success(locator=self.HINTED_LOCATOR)
 
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list