[ARVADOS] created: e1fd0fa7beda15a0eb23d81ef704a5aa9190f798

git at public.curoverse.com git at public.curoverse.com
Fri Jul 25 19:28:50 EDT 2014


        at  e1fd0fa7beda15a0eb23d81ef704a5aa9190f798 (commit)


commit e1fd0fa7beda15a0eb23d81ef704a5aa9190f798
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Jul 25 19:28:10 2014 -0400

    3351: Retry failed threads (servers) if replication is too low after one pass.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index edcb04b..058ed5e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -185,6 +185,13 @@ class KeepClient(object):
         def __init__(self, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
             self.args = kwargs
+            self._success = False
+
+        def args(self):
+            return self.args
+
+        def success(self):
+            return self._success
 
         def run(self):
             with self.args['thread_limiter'] as limiter:
@@ -221,6 +228,7 @@ class KeepClient(object):
                                                   headers=headers,
                                                   body=body)
                     if re.match(r'^2\d\d$', resp['status']):
+                        self._success = True
                         logging.debug("KeepWriterThread %s succeeded %s %s" %
                                       (str(threading.current_thread()),
                                        self.args['data_hash'],
@@ -476,6 +484,20 @@ class KeepClient(object):
             threads += [t]
         for t in threads:
             t.join()
+        if thread_limiter.done() < want_copies:
+            # Retry the threads (i.e., services) that failed the first
+            # time around.
+            threads_retry = []
+            for t in threads:
+                if not t.success():
+                    logging.warning("Retrying: PUT %s %s" % (
+                        t.args()['service_root'],
+                        t.args()['data_hash']))
+                    t_retry = KeepClient.KeepWriterThread(t.args())
+                    t_retry.start()
+                    threads_retry += [t_retry]
+            for t in threads_retry:
+                t.join()
         have_copies = thread_limiter.done()
         # If we're done, return the response from Keep
         if have_copies >= want_copies:

commit b53f78b45e416b710fd273aee52469e6ff5db7be
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Jul 25 18:42:18 2014 -0400

    3351: Improve "already uploading" message (and send to stderr, not stdout).

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 5fe5be8..55bf7fc 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -404,8 +404,9 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         except (IOError, OSError, ValueError):
             pass  # Couldn't open cache directory/file.  Continue without it.
         except ResumeCacheConflict:
-            stdout.write(
-                "arv-put: Another process is already uploading this data.\n")
+            print >>stderr, "\n".join([
+                "arv-put: Another process is already uploading this data.",
+                "         Use --no-resume if this is really what you want."])
             sys.exit(1)
 
     if resume_cache is None:

commit 0d1a48e90dfbbdf1f8bb6e08d634e70e38223ff6
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Jul 25 18:36:38 2014 -0400

    3351: Do not use resume cache at all if --no-resume flag is given.

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8af5814..5fe5be8 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -398,20 +398,20 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     bytes_expected = expected_bytes_for(args.paths)
 
     resume_cache = None
-    try:
-        resume_cache = ResumeCache(ResumeCache.make_path(args))
-    except (IOError, OSError, ValueError):
-        pass  # Couldn't open cache directory/file.  Continue without it.
-    except ResumeCacheConflict:
-        stdout.write(
-            "arv-put: Another process is already uploading this data.\n")
-        sys.exit(1)
+    if args.resume:
+        try:
+            resume_cache = ResumeCache(ResumeCache.make_path(args))
+        except (IOError, OSError, ValueError):
+            pass  # Couldn't open cache directory/file.  Continue without it.
+        except ResumeCacheConflict:
+            stdout.write(
+                "arv-put: Another process is already uploading this data.\n")
+            sys.exit(1)
 
     if resume_cache is None:
         writer = ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
     else:
-        if not args.resume:
-            resume_cache.restart()
+        resume_cache.restart()
         writer = ArvPutCollectionWriter.from_cache(
             resume_cache, reporter, bytes_expected)
 

commit 575ecb6cb09d65a18492c8ecbc6af0eec01991f6
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Jul 25 17:42:43 2014 -0400

    3351: Set PUT request timeout to 60 seconds.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 82c04ea..edcb04b 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -196,7 +196,7 @@ class KeepClient(object):
                               (str(threading.current_thread()),
                                self.args['data_hash'],
                                self.args['service_root']))
-                h = httplib2.Http()
+                h = httplib2.Http(timeout=60)
                 url = self.args['service_root'] + self.args['data_hash']
                 api_token = config.get('ARVADOS_API_TOKEN')
                 headers = {'Authorization': "OAuth2 %s" % api_token}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list