[ARVADOS] updated: 673a10d2aabd1d9ed73fb6aeabb632f08f39a415

git at public.curoverse.com git at public.curoverse.com
Sat Jul 26 15:02:55 EDT 2014


Summary of changes:
 sdk/python/arvados/keep.py | 135 ++++++++++++++++++++++++---------------------
 sdk/python/bin/arv-ls      |   2 +-
 2 files changed, 73 insertions(+), 64 deletions(-)

  discards  0309cb97394e841fd4bb7b828f200f0b14c96f6e (commit)
       via  673a10d2aabd1d9ed73fb6aeabb632f08f39a415 (commit)
       via  63ea4e55dcffca4e4cbcc7fc2070451dfc718dae (commit)
       via  5e1c7a2841d2c02f2dd0259a32011bc563957a4d (commit)
       via  537b461c3668c788b17f2d6eb72743056cdad3b1 (commit)
       via  4f3642a6629bcdd8c8be30f3181fae1c57e3a1db (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (0309cb97394e841fd4bb7b828f200f0b14c96f6e)
            \
             N -- N -- N (673a10d2aabd1d9ed73fb6aeabb632f08f39a415)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 673a10d2aabd1d9ed73fb6aeabb632f08f39a415
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Jul 26 14:53:17 2014 -0400

    3351: Fix CollectionReader usage in arv-ls.

diff --git a/sdk/python/bin/arv-ls b/sdk/python/bin/arv-ls
index cccce59..1432274 100755
--- a/sdk/python/bin/arv-ls
+++ b/sdk/python/bin/arv-ls
@@ -24,7 +24,7 @@ args = parser.parse_args()
 
 import arvados
 
-cr = arvados.CollectionReader(arvados.Keep.get(args.locator))
+cr = arvados.CollectionReader(args.locator)
 
 for f in cr.all_files():
     if args.s:

commit 63ea4e55dcffca4e4cbcc7fc2070451dfc718dae
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Jul 26 14:51:19 2014 -0400

    3351: Add comment about SSL timeouts.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 0a74f74..2844242 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -251,6 +251,8 @@ class KeepClient(object):
             except (httplib2.HttpLib2Error,
                     httplib.HTTPException,
                     ssl.SSLError) as e:
+                # When using https, timeouts look like ssl.SSLError from here.
+                # "SSLError: The write operation timed out"
                 logging.warning("Request fail: PUT %s => %s: %s" %
                                 (url, type(e), str(e)))
 

commit 5e1c7a2841d2c02f2dd0259a32011bc563957a4d
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Jul 26 14:49:02 2014 -0400

    3351: Clean up some excess indentation.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index a388a16..0a74f74 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -197,57 +197,62 @@ class KeepClient(object):
                     # My turn arrived, but the job has been done without
                     # me.
                     return
-                logging.debug("KeepWriterThread %s proceeding %s %s" %
-                              (str(threading.current_thread()),
-                               self.args['data_hash'],
-                               self.args['service_root']))
-                h = httplib2.Http(timeout=60)
-                url = self.args['service_root'] + self.args['data_hash']
-                api_token = config.get('ARVADOS_API_TOKEN')
-                headers = {'Authorization': "OAuth2 %s" % api_token}
-
-                if self.args['using_proxy']:
-                    # We're using a proxy, so tell the proxy how many copies we
-                    # want it to store
-                    headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
-                try:
-                    logging.debug("Uploading to {}".format(url))
+                self.run_with_limiter(limiter)
+
+        def run_with_limiter(self, limiter):
+            logging.debug("KeepWriterThread %s proceeding %s %s" %
+                          (str(threading.current_thread()),
+                           self.args['data_hash'],
+                           self.args['service_root']))
+            h = httplib2.Http(timeout=60)
+            url = self.args['service_root'] + self.args['data_hash']
+            api_token = config.get('ARVADOS_API_TOKEN')
+            headers = {'Authorization': "OAuth2 %s" % api_token}
+
+            if self.args['using_proxy']:
+                # We're using a proxy, so tell the proxy how many copies we
+                # want it to store
+                headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
+            try:
+                logging.debug("Uploading to {}".format(url))
+                resp, content = h.request(url.encode('utf-8'), 'PUT',
+                                          headers=headers,
+                                          body=self.args['data'])
+                if (resp['status'] == '401' and
+                    re.match(r'Timestamp verification failed', content)):
+                    body = KeepClient.sign_for_old_server(
+                        self.args['data_hash'],
+                        self.args['data'])
+                    h = httplib2.Http()
                     resp, content = h.request(url.encode('utf-8'), 'PUT',
                                               headers=headers,
-                                              body=self.args['data'])
-                    if (resp['status'] == '401' and
-                        re.match(r'Timestamp verification failed', content)):
-                        body = KeepClient.sign_for_old_server(
-                            self.args['data_hash'],
-                            self.args['data'])
-                        h = httplib2.Http()
-                        resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                                  headers=headers,
-                                                  body=body)
-                    if re.match(r'^2\d\d$', resp['status']):
-                        self._success = True
-                        logging.debug("KeepWriterThread %s succeeded %s %s" %
-                                      (str(threading.current_thread()),
-                                       self.args['data_hash'],
-                                       self.args['service_root']))
-                        replicas_stored = 1
-                        if 'x-keep-replicas-stored' in resp:
-                            # Tick the 'done' counter for the number of replica
-                            # reported stored by the server, for the case that
-                            # we're talking to a proxy or other backend that
-                            # stores to multiple copies for us.
-                            try:
-                                replicas_stored = int(resp['x-keep-replicas-stored'])
-                            except ValueError:
-                                pass
-                        return limiter.save_response(content.strip(), replicas_stored)
-
+                                              body=body)
+                if re.match(r'^2\d\d$', resp['status']):
+                    self._success = True
+                    logging.debug("KeepWriterThread %s succeeded %s %s" %
+                                  (str(threading.current_thread()),
+                                   self.args['data_hash'],
+                                   self.args['service_root']))
+                    replicas_stored = 1
+                    if 'x-keep-replicas-stored' in resp:
+                        # Tick the 'done' counter for the number of replica
+                        # reported stored by the server, for the case that
+                        # we're talking to a proxy or other backend that
+                        # stores to multiple copies for us.
+                        try:
+                            replicas_stored = int(resp['x-keep-replicas-stored'])
+                        except ValueError:
+                            pass
+                    limiter.save_response(content.strip(), replicas_stored)
+                else:
                     logging.warning("Request fail: PUT %s => %s %s" %
                                     (url, resp['status'], content))
-                except (httplib2.HttpLib2Error, httplib.HTTPException, ssl.SSLError) as e:
-                    logging.warning("Request fail: PUT %s => %s: %s" %
-                                    (url, type(e), str(e)))
+            except (httplib2.HttpLib2Error,
+                    httplib.HTTPException,
+                    ssl.SSLError) as e:
+                logging.warning("Request fail: PUT %s => %s: %s" %
+                                (url, type(e), str(e)))
 
     def __init__(self, **kwargs):
         self.lock = threading.Lock()

commit 537b461c3668c788b17f2d6eb72743056cdad3b1
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Jul 26 14:48:13 2014 -0400

    3351: Catch SSLError and show warning instead of stack trace.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 2f0051c..a388a16 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -19,6 +19,7 @@ import time
 import threading
 import timer
 import datetime
+import ssl
 
 global_client_object = None
 
@@ -244,7 +245,7 @@ class KeepClient(object):
 
                     logging.warning("Request fail: PUT %s => %s %s" %
                                     (url, resp['status'], content))
-                except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
+                except (httplib2.HttpLib2Error, httplib.HTTPException, ssl.SSLError) as e:
                     logging.warning("Request fail: PUT %s => %s: %s" %
                                     (url, type(e), str(e)))
 

commit 4f3642a6629bcdd8c8be30f3181fae1c57e3a1db
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Jul 26 14:17:43 2014 -0400

    3351: Retry failed threads (servers) if replication is too low after one pass.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index edcb04b..2f0051c 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -185,6 +185,10 @@ class KeepClient(object):
         def __init__(self, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
             self.args = kwargs
+            self._success = False
+
+        def success(self):
+            return self._success
 
         def run(self):
             with self.args['thread_limiter'] as limiter:
@@ -221,6 +225,7 @@ class KeepClient(object):
                                                   headers=headers,
                                                   body=body)
                     if re.match(r'^2\d\d$', resp['status']):
+                        self._success = True
                         logging.debug("KeepWriterThread %s succeeded %s %s" %
                                       (str(threading.current_thread()),
                                        self.args['data_hash'],
@@ -243,7 +248,7 @@ class KeepClient(object):
                     logging.warning("Request fail: PUT %s => %s: %s" %
                                     (url, type(e), str(e)))
 
-    def __init__(self):
+    def __init__(self, **kwargs):
         self.lock = threading.Lock()
         self.service_roots = None
         self._cache_lock = threading.Lock()
@@ -251,6 +256,7 @@ class KeepClient(object):
         # default 256 megabyte cache
         self.cache_max = 256 * 1024 * 1024
         self.using_proxy = False
+        self.timeout = kwargs.get('timeout', 60)
 
     def shuffled_service_roots(self, hash):
         if self.service_roots == None:
@@ -466,16 +472,33 @@ class KeepClient(object):
         threads = []
         thread_limiter = KeepClient.ThreadLimiter(want_copies)
         for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(data=data,
-                                            data_hash=data_hash,
-                                            service_root=service_root,
-                                            thread_limiter=thread_limiter,
-                                            using_proxy=self.using_proxy,
-                                            want_copies=(want_copies if self.using_proxy else 1))
+            t = KeepClient.KeepWriterThread(
+                data=data,
+                data_hash=data_hash,
+                service_root=service_root,
+                thread_limiter=thread_limiter,
+                timeout=self.timeout,
+                using_proxy=self.using_proxy,
+                want_copies=(want_copies if self.using_proxy else 1))
             t.start()
             threads += [t]
         for t in threads:
             t.join()
+        if thread_limiter.done() < want_copies:
+            # Retry the threads (i.e., services) that failed the first
+            # time around.
+            threads_retry = []
+            for t in threads:
+                if not t.success():
+                    logging.warning("Retrying: PUT %s %s" % (
+                        t.args['service_root'],
+                        t.args['data_hash']))
+                    retry_with_args = t.args.copy()
+                    t_retry = KeepClient.KeepWriterThread(**retry_with_args)
+                    t_retry.start()
+                    threads_retry += [t_retry]
+            for t in threads_retry:
+                t.join()
         have_copies = thread_limiter.done()
         # If we're done, return the response from Keep
         if have_copies >= want_copies:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list