[ARVADOS] updated: be2d7e55af1036699282d06f629805b6508b6ceb

git at public.curoverse.com git at public.curoverse.com
Fri Nov 13 09:32:47 EST 2015


Summary of changes:
 sdk/cli/bin/crunch-job                |  12 ++--
 sdk/python/arvados/keep.py            | 112 +++++++++++++++-------------------
 sdk/python/tests/arvados_testutil.py  |   5 +-
 sdk/python/tests/manifest_examples.py |   4 +-
 sdk/python/tests/test_arv_ls.py       |   6 +-
 sdk/python/tests/test_arvfile.py      |   5 +-
 sdk/python/tests/test_collections.py  |  17 +++---
 sdk/python/tests/test_keep_client.py  |  36 ++++++++++-
 8 files changed, 108 insertions(+), 89 deletions(-)

       via  be2d7e55af1036699282d06f629805b6508b6ceb (commit)
       via  e704e66bc35a0f0990620313ae9e2a630fa6821b (commit)
       via  c33d036cfef0b0784d16593e66e3f4fce018f783 (commit)
       via  7563fb986662a066f0aa3a9c4c1dd35159fb69cc (commit)
       via  f4c3100bad26dff3c99ff4bb9fa19b0d9f7995c7 (commit)
       via  31e10ca041ac1cfd296b24d4878264f6af50d64b (commit)
       via  6de60c7db7a98405ef7ae4ac5eb20498f095416c (commit)
       via  a1dc811844d2dc76bea5ebfdc2f571a12cb41b49 (commit)
      from  d3171a9d75eac20b68cee6729dddc28cd7e6c612 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit be2d7e55af1036699282d06f629805b6508b6ceb
Merge: 6de60c7 e704e66
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Nov 13 09:29:40 2015 -0500

    Merge branch '7696-pysdk-all-keep-service-types-wip'
    
    Closes #7696, #7758.


commit e704e66bc35a0f0990620313ae9e2a630fa6821b
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 17:08:39 2015 -0500

    7696: Improve PySDK KeepClient.ThreadLimiter.
    
    * Move the calculation of how many threads to allow into the class.
    * Teach it to handle cases where max_replicas_per_service is known and
      greater than 1.  This will never happen today, but is an anticipated
      improvement.
    * Update docstrings to reflect current reality.
    
    These are all changes I made while debugging the previous race
    condition.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 2b718d7..ec9f6f6 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -2,6 +2,7 @@ import cStringIO
 import datetime
 import hashlib
 import logging
+import math
 import os
 import pycurl
 import Queue
@@ -219,21 +220,27 @@ class KeepClient(object):
     DEFAULT_PROXY_TIMEOUT = (20, 300)
 
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
 
         Should be used in a "with" block.
         """
-        def __init__(self, todo):
+        def __init__(self, want_copies, max_service_replicas):
             self._started = 0
-            self._todo = todo
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
             self._start_lock = threading.Condition()
-            self._todo_lock = threading.Semaphore(todo)
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
             self._local = threading.local()
 
@@ -258,34 +265,28 @@ class KeepClient(object):
 
         def shall_i_proceed(self):
             """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             """
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
 
         def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             """
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
 
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
 
@@ -965,7 +966,8 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:

commit c33d036cfef0b0784d16593e66e3f4fce018f783
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 16:50:18 2015 -0500

    7696: PySDK determines max_replicas_per_service after querying services.
    
    Because max_replicas_per_service was set to 1 in the case where
    KeepClient was instantiated with no direct information about available
    Keep services, and because ThreadLimiter was being instantiated before
    querying available Keep services (via map_new_services), the first
    Keep request to talk to non-disk services would let multiple threads
    run at once.  This fixes that race condition, and adds a test that was
    triggering it semi-reliably.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3d64a4..2b718d7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -645,6 +645,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -658,7 +659,6 @@ class KeepClient(object):
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -671,7 +671,6 @@ class KeepClient(object):
                 self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
-                self.max_replicas_per_service = 1
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -955,10 +954,8 @@ class KeepClient(object):
         # Tell the proxy how many copies we want it to store
         headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        thread_sequence = 0
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -968,6 +965,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
             threads = []
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
@@ -980,10 +978,9 @@ class KeepClient(object):
                     service_root=service_root,
                     thread_limiter=thread_limiter,
                     timeout=self.current_timeout(num_retries-tries_left),
-                    thread_sequence=thread_sequence)
+                    thread_sequence=len(threads))
                 t.start()
                 threads.append(t)
-                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a5d9925..ada0dac 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -404,6 +404,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             actual = keep_client.put(body, copies=1)
         self.assertEqual(pdh, actual)
 
+    def test_oddball_service_writer_count(self):
+        body = 'oddball service writer count'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore',
+                                             count=4)
+        headers = {'x-keep-replicas-stored': 3}
+        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+                                       **headers) as req_mock:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=2)
+        self.assertEqual(pdh, actual)
+        self.assertEqual(1, req_mock.call_count)
+
 
 @tutil.skip_sleep
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):

commit 7563fb986662a066f0aa3a9c4c1dd35159fb69cc
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 12:17:46 2015 -0500

    7696: PySDK KeepClient uses all service types.
    
    Filter out gateway services from the list of usable services, rather
    than selecting only disk and proxy types.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3ccc3e..b3d64a4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -652,12 +652,13 @@ class KeepClient(object):
                 self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
+                    'service_type': 'proxy',
                     '_service_root': proxy,
                     }]
                 self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
-                self.max_replicas_per_service = 1
+                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -687,6 +688,10 @@ class KeepClient(object):
         t = self.proxy_timeout if self.using_proxy else self.timeout
         return (t[0] * (1 << attempt_number), t[1])
 
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
+
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
               (self._keep_services and not force_rebuild)):
@@ -697,12 +702,16 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            accessible = keep_services.execute().get('items')
-            if not accessible:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in accessible:
+            for r in self._gateway_services.itervalues():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -712,27 +721,19 @@ class KeepClient(object):
                     host,
                     r['service_port'])
 
-            # Gateway services are only used when specified by UUID,
-            # so there's nothing to gain by filtering them by
-            # service_type.
-            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
             _logger.debug(str(self._gateway_services))
-
             self._keep_services = [
-                ks for ks in accessible
-                if ks.get('service_type') in ['disk', 'proxy']]
-            self._writable_services = [
-                ks for ks in accessible
-                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
-            _logger.debug(str(self._keep_services))
-
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
             # For disk type services, max_replicas_per_service is 1
-            # It is unknown or unlimited for non-disk typed services.
-            for ks in accessible:
-                if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
-                    self.max_replicas_per_service = None
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -769,7 +770,8 @@ class KeepClient(object):
         # in that order.
         use_services = self._keep_services
         if need_writable:
-          use_services = self._writable_services
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 use_services,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ab57ed5..a5d9925 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -387,6 +387,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
         self.assertEqual(0, len(exc_check.exception.request_errors()))
 
+    def test_oddball_service_get(self):
+        body = 'oddball service get'
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(body, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.get(tutil.str_keep_locator(body))
+        self.assertEqual(body, actual)
+
+    def test_oddball_service_put(self):
+        body = 'oddball service put'
+        pdh = tutil.str_keep_locator(body)
+        api_client = self.mock_keep_services(service_type='fancynewblobstore')
+        with tutil.mock_keep_responses(pdh, 200):
+            keep_client = arvados.KeepClient(api_client=api_client)
+            actual = keep_client.put(body, copies=1)
+        self.assertEqual(pdh, actual)
+
 
 @tutil.skip_sleep
 class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):

commit f4c3100bad26dff3c99ff4bb9fa19b0d9f7995c7
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 12:18:46 2015 -0500

    7696: Clean imports in PySDK arvados.keep module.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 8ed86fd..b3ccc3e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -1,28 +1,15 @@
-import bz2
+import cStringIO
 import datetime
-import fcntl
-import functools
-import gflags
 import hashlib
-import json
 import logging
 import os
-import pprint
 import pycurl
 import Queue
 import re
 import socket
 import ssl
-import string
-import cStringIO
-import subprocess
-import sys
 import threading
-import time
 import timer
-import types
-import UserDict
-import zlib
 
 import arvados
 import arvados.config as config

commit 31e10ca041ac1cfd296b24d4878264f6af50d64b
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 11 10:06:51 2015 -0500

    7696: Refactor locator builder method in PySDK tests.

diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index ea318c6..b2cf436 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -47,6 +47,8 @@ def mock_api_responses(api_client, body, codes, headers={}):
     return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
 
+def str_keep_locator(s):
+    return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
 
 class FakeCurl:
     @classmethod
@@ -126,8 +128,7 @@ class MockStreamReader(object):
     def __init__(self, name='.', *data):
         self._name = name
         self._data = ''.join(data)
-        self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
-                                              len(d)) for d in data]
+        self._data_locators = [str_keep_locator(d) for d in data]
         self.num_retries = 0
 
     def name(self):
diff --git a/sdk/python/tests/manifest_examples.py b/sdk/python/tests/manifest_examples.py
index 2d8e475..8f0abd2 100644
--- a/sdk/python/tests/manifest_examples.py
+++ b/sdk/python/tests/manifest_examples.py
@@ -1,6 +1,5 @@
 import arvados
 import arvados_testutil as tutil
-import hashlib
 
 class ManifestExamples(object):
     def make_manifest(self,
@@ -9,8 +8,7 @@ class ManifestExamples(object):
                       files_per_stream=1,
                       streams=1):
         datablip = 'x' * bytes_per_block
-        data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
-                                  bytes_per_block)
+        data_loc = tutil.str_keep_locator(datablip)
         with tutil.mock_keep_responses(data_loc, 200):
             coll = arvados.CollectionWriter()
             for si in range(0, streams):
diff --git a/sdk/python/tests/test_arv_ls.py b/sdk/python/tests/test_arv_ls.py
index 90bbacf..664b57f 100644
--- a/sdk/python/tests/test_arv_ls.py
+++ b/sdk/python/tests/test_arv_ls.py
@@ -1,7 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 
-import hashlib
 import io
 import random
 
@@ -11,6 +10,8 @@ import arvados.errors as arv_error
 import arvados.commands.ls as arv_ls
 import run_test_server
 
+from arvados_testutil import str_keep_locator
+
 class ArvLsTestCase(run_test_server.TestCaseWithServers):
     FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
 
@@ -24,8 +25,7 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
 
     def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
         manifest_text = self.newline_join(manifest_lines)
-        pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
-                             len(manifest_text))
+        pdh = str_keep_locator(manifest_text)
         coll_info = {'uuid': uuid,
                      'portable_data_hash': pdh,
                      'manifest_text': manifest_text}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 330dd44..ea86614 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -6,7 +6,6 @@ import io
 import mock
 import os
 import unittest
-import hashlib
 import time
 
 import arvados
@@ -30,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.requests.append(locator)
             return self.blocks.get(locator)
         def put(self, data, num_retries=None):
-            pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+            pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = str(data)
             return pdh
 
@@ -453,7 +452,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
         n = 0
         blocks = {}
         for d in ['01234', '34567', '67890']:
-            loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+            loc = tutil.str_keep_locator(d)
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ac7dd1b..bc3c0bf 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -4,7 +4,6 @@
 
 import arvados
 import copy
-import hashlib
 import mock
 import os
 import pprint
@@ -395,7 +394,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
     def test_write_directory_tree_with_zero_recursion(self):
         cwriter = arvados.CollectionWriter(self.api_client)
         content = 'd1/d2/f3d1/f2f1'
-        blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+        blockhash = tutil.str_keep_locator(content)
         cwriter.write_directory_tree(
             self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
             max_manifest_depth=0)
@@ -739,7 +738,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             self.assertEqual('.', writer.current_stream_name())
             self.assertEqual('out', writer.current_file_name())
             out_file.write('test data')
-            data_loc = hashlib.md5('test data').hexdigest() + '+9'
+            data_loc = tutil.str_keep_locator('test data')
         self.assertTrue(out_file.closed, "writer file not closed after context")
         self.assertRaises(ValueError, out_file.write, 'extra text')
         with self.mock_keep(data_loc, 200) as keep_mock:
@@ -751,15 +750,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         writer = arvados.CollectionWriter(client)
         with writer.open('six') as out_file:
             out_file.writelines(['12', '34', '56'])
-            data_loc = hashlib.md5('123456').hexdigest() + '+6'
+            data_loc = tutil.str_keep_locator('123456')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:6:six\n".format(data_loc),
                              writer.manifest_text())
 
     def test_open_flush(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
-        data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+        data_loc1 = tutil.str_keep_locator('flush1')
+        data_loc2 = tutil.str_keep_locator('flush2')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('flush_test') as out_file:
@@ -777,15 +776,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             out_file.write('1st')
         with writer.open('.', '2') as out_file:
             out_file.write('2nd')
-        data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+        data_loc = tutil.str_keep_locator('1st2nd')
         with self.mock_keep(data_loc, 200) as keep_mock:
             self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
                              writer.manifest_text())
 
     def test_two_opens_two_streams(self):
         client = self.api_client_mock()
-        data_loc1 = hashlib.md5('file').hexdigest() + '+4'
-        data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+        data_loc1 = tutil.str_keep_locator('file')
+        data_loc2 = tutil.str_keep_locator('indir')
         with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
             writer = arvados.CollectionWriter(client)
             with writer.open('file') as out_file:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9046892..ab57ed5 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -368,7 +368,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_put_error_does_not_include_successful_puts(self):
         data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(count=3)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +378,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
 
     def test_proxy_put_with_no_writable_services(self):
         data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        data_loc = tutil.str_keep_locator(data)
         api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
         with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
                 self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -518,7 +518,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_64_zeros_error_order(self, verb, exc_class):
         data = '0' * 64
         if verb == 'get':
-            data = hashlib.md5(data).hexdigest() + '+1234'
+            data = tutil.str_keep_locator(data)
         # Arbitrary port number:
         aport = random.randint(1024,65535)
         api_client = self.mock_keep_services(service_port=aport, count=self.services)

commit 6de60c7db7a98405ef7ae4ac5eb20498f095416c
Merge: d3171a9 a1dc811
Author: Brett Smith <brett at curoverse.com>
Date:   Fri Nov 13 09:28:12 2015 -0500

    Merge branch '7123-crunch-no-record-log-failure-wip'
    
    Closes #7123, #7741.


commit a1dc811844d2dc76bea5ebfdc2f571a12cb41b49
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Nov 9 10:28:51 2015 -0500

    7123: Crunch doesn't update job log when arv-put fails.
    
    This prevents crunch-job from recording the empty collection as a
    job's log.  Most other components (Workbench, the log clenaer)
    recognize a null log as a special case; less so the empty collection.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index f78824b..70ba04f 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1698,20 +1698,24 @@ sub log_writer_finish()
 
   close($log_pipe_in);
 
+  my $logger_failed = 0;
   my $read_result = log_writer_read_output(120);
   if ($read_result == -1) {
+    $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
   } elsif ($read_result != 0) {
+    $logger_failed = -2;
     Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
   if ($?) {
+    $logger_failed ||= $?;
     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
   close($log_pipe_out);
-  my $arv_put_output = $log_pipe_out_buf;
+  my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
       $log_pipe_out_select = undef;
 
@@ -1777,13 +1781,13 @@ sub save_meta
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
+  my $log_manifest = log_writer_finish();
+  return unless defined($log_manifest);
 
-  my $log_manifest = "";
   if ($Job->{log}) {
     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
-    $log_manifest .= $prev_log_coll->{manifest_text};
+    $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
   }
-  $log_manifest .= log_writer_finish();
 
   my $log_coll = api_call(
     "collections/create", ensure_unique_name => 1, collection => {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list