[ARVADOS] updated: be2d7e55af1036699282d06f629805b6508b6ceb
git at public.curoverse.com
git at public.curoverse.com
Fri Nov 13 09:32:47 EST 2015
Summary of changes:
sdk/cli/bin/crunch-job | 12 ++--
sdk/python/arvados/keep.py | 112 +++++++++++++++-------------------
sdk/python/tests/arvados_testutil.py | 5 +-
sdk/python/tests/manifest_examples.py | 4 +-
sdk/python/tests/test_arv_ls.py | 6 +-
sdk/python/tests/test_arvfile.py | 5 +-
sdk/python/tests/test_collections.py | 17 +++---
sdk/python/tests/test_keep_client.py | 36 ++++++++++-
8 files changed, 108 insertions(+), 89 deletions(-)
via be2d7e55af1036699282d06f629805b6508b6ceb (commit)
via e704e66bc35a0f0990620313ae9e2a630fa6821b (commit)
via c33d036cfef0b0784d16593e66e3f4fce018f783 (commit)
via 7563fb986662a066f0aa3a9c4c1dd35159fb69cc (commit)
via f4c3100bad26dff3c99ff4bb9fa19b0d9f7995c7 (commit)
via 31e10ca041ac1cfd296b24d4878264f6af50d64b (commit)
via 6de60c7db7a98405ef7ae4ac5eb20498f095416c (commit)
via a1dc811844d2dc76bea5ebfdc2f571a12cb41b49 (commit)
from d3171a9d75eac20b68cee6729dddc28cd7e6c612 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit be2d7e55af1036699282d06f629805b6508b6ceb
Merge: 6de60c7 e704e66
Author: Brett Smith <brett at curoverse.com>
Date: Fri Nov 13 09:29:40 2015 -0500
Merge branch '7696-pysdk-all-keep-service-types-wip'
Closes #7696, #7758.
commit e704e66bc35a0f0990620313ae9e2a630fa6821b
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 17:08:39 2015 -0500
7696: Improve PySDK KeepClient.ThreadLimiter.
* Move the calculation of how many threads to allow into the class.
* Teach it to handle cases where max_replicas_per_service is known and
greater than 1. This will never happen today, but is an anticipated
improvement.
* Update docstrings to reflect current reality.
These are all changes I made while debugging the previous race
condition.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 2b718d7..ec9f6f6 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -2,6 +2,7 @@ import cStringIO
import datetime
import hashlib
import logging
+import math
import os
import pycurl
import Queue
@@ -219,21 +220,27 @@ class KeepClient(object):
DEFAULT_PROXY_TIMEOUT = (20, 300)
class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
+ """Limit the number of threads writing to Keep at once.
+
+ This ensures that only a number of writer threads that could
+ potentially achieve the desired replication level run at once.
+ Once the desired replication level is achieved, queued threads
+ are instructed not to run.
Should be used in a "with" block.
"""
- def __init__(self, todo):
+ def __init__(self, want_copies, max_service_replicas):
self._started = 0
- self._todo = todo
+ self._want_copies = want_copies
self._done = 0
self._response = None
self._start_lock = threading.Condition()
- self._todo_lock = threading.Semaphore(todo)
+ if (not max_service_replicas) or (max_service_replicas >= want_copies):
+ max_threads = 1
+ else:
+ max_threads = math.ceil(float(want_copies) / max_service_replicas)
+ _logger.debug("Limiter max threads is %d", max_threads)
+ self._todo_lock = threading.Semaphore(max_threads)
self._done_lock = threading.Lock()
self._local = threading.local()
@@ -258,34 +265,28 @@ class KeepClient(object):
def shall_i_proceed(self):
"""
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
+ Return true if the current thread should write to Keep.
+ Return false otherwise.
"""
with self._done_lock:
- return (self._done < self._todo)
+ return (self._done < self._want_copies)
def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
- the Keep server. It is not necessary to save more than
- one response, since we presume that any locator returned
- in response to a successful request is valid.
+ the Keep server, and the number of replicas it stored.
"""
with self._done_lock:
self._done += replicas_stored
self._response = response_body
def response(self):
- """
- Returns the body from the response to a PUT request.
- """
+ """Return the body from the response to a PUT request."""
with self._done_lock:
return self._response
def done(self):
- """
- Return how many successes were reported.
- """
+ """Return the total number of replicas successfully stored."""
with self._done_lock:
return self._done
@@ -965,7 +966,8 @@ class KeepClient(object):
loop.save_result(error)
continue
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
+ thread_limiter = KeepClient.ThreadLimiter(
+ copies, self.max_replicas_per_service)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
commit c33d036cfef0b0784d16593e66e3f4fce018f783
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 16:50:18 2015 -0500
7696: PySDK determines max_replicas_per_service after querying services.
Because max_replicas_per_service was set to 1 in the case where
KeepClient was instantiated with no direct information about available
Keep services, and because ThreadLimiter was being instantiated before
querying available Keep services (via map_new_services), the first
Keep request to talk to non-disk services would let multiple threads
run at once. This fixes that race condition, and adds a test that was
triggering it semi-reliably.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3d64a4..2b718d7 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -645,6 +645,7 @@ class KeepClient(object):
self.put = self.local_store_put
else:
self.num_retries = num_retries
+ self.max_replicas_per_service = None
if proxy:
if not proxy.endswith('/'):
proxy += '/'
@@ -658,7 +659,6 @@ class KeepClient(object):
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -671,7 +671,6 @@ class KeepClient(object):
self._writable_services = None
self.using_proxy = None
self._static_services_list = False
- self.max_replicas_per_service = 1
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
@@ -955,10 +954,8 @@ class KeepClient(object):
# Tell the proxy how many copies we want it to store
headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
- thread_sequence = 0
for tries_left in loop:
try:
sorted_roots = self.map_new_services(
@@ -968,6 +965,7 @@ class KeepClient(object):
loop.save_result(error)
continue
+ thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
threads = []
for service_root, ks in [(root, roots_map[root])
for root in sorted_roots]:
@@ -980,10 +978,9 @@ class KeepClient(object):
service_root=service_root,
thread_limiter=thread_limiter,
timeout=self.current_timeout(num_retries-tries_left),
- thread_sequence=thread_sequence)
+ thread_sequence=len(threads))
t.start()
threads.append(t)
- thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index a5d9925..ada0dac 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -404,6 +404,19 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
actual = keep_client.put(body, copies=1)
self.assertEqual(pdh, actual)
+ def test_oddball_service_writer_count(self):
+ body = 'oddball service writer count'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore',
+ count=4)
+ headers = {'x-keep-replicas-stored': 3}
+ with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
+ **headers) as req_mock:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=2)
+ self.assertEqual(pdh, actual)
+ self.assertEqual(1, req_mock.call_count)
+
@tutil.skip_sleep
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
commit 7563fb986662a066f0aa3a9c4c1dd35159fb69cc
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 12:17:46 2015 -0500
7696: PySDK KeepClient uses all service types.
Filter out gateway services from the list of usable services, rather
than selecting only disk and proxy types.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index b3ccc3e..b3d64a4 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -652,12 +652,13 @@ class KeepClient(object):
self._gateway_services = {}
self._keep_services = [{
'uuid': 'proxy',
+ 'service_type': 'proxy',
'_service_root': proxy,
}]
self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
- self.max_replicas_per_service = 1
+ self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
@@ -687,6 +688,10 @@ class KeepClient(object):
t = self.proxy_timeout if self.using_proxy else self.timeout
return (t[0] * (1 << attempt_number), t[1])
+ def _any_nondisk_services(self, service_list):
+ return any(ks.get('service_type', 'disk') != 'disk'
+ for ks in service_list)
+
def build_services_list(self, force_rebuild=False):
if (self._static_services_list or
(self._keep_services and not force_rebuild)):
@@ -697,12 +702,16 @@ class KeepClient(object):
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- accessible = keep_services.execute().get('items')
- if not accessible:
+ # Gateway services are only used when specified by UUID,
+ # so there's nothing to gain by filtering them by
+ # service_type.
+ self._gateway_services = {ks['uuid']: ks for ks in
+ keep_services.execute()['items']}
+ if not self._gateway_services:
raise arvados.errors.NoKeepServersError()
# Precompute the base URI for each service.
- for r in accessible:
+ for r in self._gateway_services.itervalues():
host = r['service_host']
if not host.startswith('[') and host.find(':') >= 0:
# IPv6 URIs must be formatted like http://[::1]:80/...
@@ -712,27 +721,19 @@ class KeepClient(object):
host,
r['service_port'])
- # Gateway services are only used when specified by UUID,
- # so there's nothing to gain by filtering them by
- # service_type.
- self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
_logger.debug(str(self._gateway_services))
-
self._keep_services = [
- ks for ks in accessible
- if ks.get('service_type') in ['disk', 'proxy']]
- self._writable_services = [
- ks for ks in accessible
- if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
- _logger.debug(str(self._keep_services))
-
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
+ ks for ks in self._gateway_services.itervalues()
+ if not ks.get('service_type', '').startswith('gateway:')]
+ self._writable_services = [ks for ks in self._keep_services
+ if not ks.get('read_only')]
+
# For disk type services, max_replicas_per_service is 1
- # It is unknown or unlimited for non-disk typed services.
- for ks in accessible:
- if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
- self.max_replicas_per_service = None
+ # It is unknown (unlimited) for other service types.
+ if self._any_nondisk_services(self._writable_services):
+ self.max_replicas_per_service = None
+ else:
+ self.max_replicas_per_service = 1
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
@@ -769,7 +770,8 @@ class KeepClient(object):
# in that order.
use_services = self._keep_services
if need_writable:
- use_services = self._writable_services
+ use_services = self._writable_services
+ self.using_proxy = self._any_nondisk_services(use_services)
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
use_services,
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ab57ed5..a5d9925 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -387,6 +387,23 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
self.assertEqual(0, len(exc_check.exception.request_errors()))
+ def test_oddball_service_get(self):
+ body = 'oddball service get'
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(body, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.get(tutil.str_keep_locator(body))
+ self.assertEqual(body, actual)
+
+ def test_oddball_service_put(self):
+ body = 'oddball service put'
+ pdh = tutil.str_keep_locator(body)
+ api_client = self.mock_keep_services(service_type='fancynewblobstore')
+ with tutil.mock_keep_responses(pdh, 200):
+ keep_client = arvados.KeepClient(api_client=api_client)
+ actual = keep_client.put(body, copies=1)
+ self.assertEqual(pdh, actual)
+
@tutil.skip_sleep
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
commit f4c3100bad26dff3c99ff4bb9fa19b0d9f7995c7
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 12:18:46 2015 -0500
7696: Clean imports in PySDK arvados.keep module.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 8ed86fd..b3ccc3e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -1,28 +1,15 @@
-import bz2
+import cStringIO
import datetime
-import fcntl
-import functools
-import gflags
import hashlib
-import json
import logging
import os
-import pprint
import pycurl
import Queue
import re
import socket
import ssl
-import string
-import cStringIO
-import subprocess
-import sys
import threading
-import time
import timer
-import types
-import UserDict
-import zlib
import arvados
import arvados.config as config
commit 31e10ca041ac1cfd296b24d4878264f6af50d64b
Author: Brett Smith <brett at curoverse.com>
Date: Wed Nov 11 10:06:51 2015 -0500
7696: Refactor locator builder method in PySDK tests.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
index ea318c6..b2cf436 100644
--- a/sdk/python/tests/arvados_testutil.py
+++ b/sdk/python/tests/arvados_testutil.py
@@ -47,6 +47,8 @@ def mock_api_responses(api_client, body, codes, headers={}):
return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
+def str_keep_locator(s):
+ return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
class FakeCurl:
@classmethod
@@ -126,8 +128,7 @@ class MockStreamReader(object):
def __init__(self, name='.', *data):
self._name = name
self._data = ''.join(data)
- self._data_locators = ['{}+{}'.format(hashlib.md5(d).hexdigest(),
- len(d)) for d in data]
+ self._data_locators = [str_keep_locator(d) for d in data]
self.num_retries = 0
def name(self):
diff --git a/sdk/python/tests/manifest_examples.py b/sdk/python/tests/manifest_examples.py
index 2d8e475..8f0abd2 100644
--- a/sdk/python/tests/manifest_examples.py
+++ b/sdk/python/tests/manifest_examples.py
@@ -1,6 +1,5 @@
import arvados
import arvados_testutil as tutil
-import hashlib
class ManifestExamples(object):
def make_manifest(self,
@@ -9,8 +8,7 @@ class ManifestExamples(object):
files_per_stream=1,
streams=1):
datablip = 'x' * bytes_per_block
- data_loc = '{}+{}'.format(hashlib.md5(datablip).hexdigest(),
- bytes_per_block)
+ data_loc = tutil.str_keep_locator(datablip)
with tutil.mock_keep_responses(data_loc, 200):
coll = arvados.CollectionWriter()
for si in range(0, streams):
diff --git a/sdk/python/tests/test_arv_ls.py b/sdk/python/tests/test_arv_ls.py
index 90bbacf..664b57f 100644
--- a/sdk/python/tests/test_arv_ls.py
+++ b/sdk/python/tests/test_arv_ls.py
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import hashlib
import io
import random
@@ -11,6 +10,8 @@ import arvados.errors as arv_error
import arvados.commands.ls as arv_ls
import run_test_server
+from arvados_testutil import str_keep_locator
+
class ArvLsTestCase(run_test_server.TestCaseWithServers):
FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
@@ -24,8 +25,7 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
def mock_api_for_manifest(self, manifest_lines, uuid=FAKE_UUID):
manifest_text = self.newline_join(manifest_lines)
- pdh = '{}+{}'.format(hashlib.md5(manifest_text).hexdigest(),
- len(manifest_text))
+ pdh = str_keep_locator(manifest_text)
coll_info = {'uuid': uuid,
'portable_data_hash': pdh,
'manifest_text': manifest_text}
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
index 330dd44..ea86614 100644
--- a/sdk/python/tests/test_arvfile.py
+++ b/sdk/python/tests/test_arvfile.py
@@ -6,7 +6,6 @@ import io
import mock
import os
import unittest
-import hashlib
import time
import arvados
@@ -30,7 +29,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
self.requests.append(locator)
return self.blocks.get(locator)
def put(self, data, num_retries=None):
- pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+ pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = str(data)
return pdh
@@ -453,7 +452,7 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
n = 0
blocks = {}
for d in ['01234', '34567', '67890']:
- loc = '{}+{}'.format(hashlib.md5(d).hexdigest(), len(d))
+ loc = tutil.str_keep_locator(d)
blocks[loc] = d
stream.append(Range(loc, n, len(d)))
n += len(d)
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index ac7dd1b..bc3c0bf 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -4,7 +4,6 @@
import arvados
import copy
-import hashlib
import mock
import os
import pprint
@@ -395,7 +394,7 @@ class ArvadosCollectionsTest(run_test_server.TestCaseWithServers,
def test_write_directory_tree_with_zero_recursion(self):
cwriter = arvados.CollectionWriter(self.api_client)
content = 'd1/d2/f3d1/f2f1'
- blockhash = hashlib.md5(content).hexdigest() + '+' + str(len(content))
+ blockhash = tutil.str_keep_locator(content)
cwriter.write_directory_tree(
self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
max_manifest_depth=0)
@@ -739,7 +738,7 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
self.assertEqual('.', writer.current_stream_name())
self.assertEqual('out', writer.current_file_name())
out_file.write('test data')
- data_loc = hashlib.md5('test data').hexdigest() + '+9'
+ data_loc = tutil.str_keep_locator('test data')
self.assertTrue(out_file.closed, "writer file not closed after context")
self.assertRaises(ValueError, out_file.write, 'extra text')
with self.mock_keep(data_loc, 200) as keep_mock:
@@ -751,15 +750,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
writer = arvados.CollectionWriter(client)
with writer.open('six') as out_file:
out_file.writelines(['12', '34', '56'])
- data_loc = hashlib.md5('123456').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('123456')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:6:six\n".format(data_loc),
writer.manifest_text())
def test_open_flush(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('flush1').hexdigest() + '+6'
- data_loc2 = hashlib.md5('flush2').hexdigest() + '+6'
+ data_loc1 = tutil.str_keep_locator('flush1')
+ data_loc2 = tutil.str_keep_locator('flush2')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('flush_test') as out_file:
@@ -777,15 +776,15 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
out_file.write('1st')
with writer.open('.', '2') as out_file:
out_file.write('2nd')
- data_loc = hashlib.md5('1st2nd').hexdigest() + '+6'
+ data_loc = tutil.str_keep_locator('1st2nd')
with self.mock_keep(data_loc, 200) as keep_mock:
self.assertEqual(". {} 0:3:1 3:3:2\n".format(data_loc),
writer.manifest_text())
def test_two_opens_two_streams(self):
client = self.api_client_mock()
- data_loc1 = hashlib.md5('file').hexdigest() + '+4'
- data_loc2 = hashlib.md5('indir').hexdigest() + '+5'
+ data_loc1 = tutil.str_keep_locator('file')
+ data_loc2 = tutil.str_keep_locator('indir')
with self.mock_keep((data_loc1, 200), (data_loc2, 200)) as keep_mock:
writer = arvados.CollectionWriter(client)
with writer.open('file') as out_file:
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index 9046892..ab57ed5 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -368,7 +368,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_put_error_does_not_include_successful_puts(self):
data = 'partial failure test'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(count=3)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -378,7 +378,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
def test_proxy_put_with_no_writable_services(self):
data = 'test with no writable services'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ data_loc = tutil.str_keep_locator(data)
api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
@@ -518,7 +518,7 @@ class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
def check_64_zeros_error_order(self, verb, exc_class):
data = '0' * 64
if verb == 'get':
- data = hashlib.md5(data).hexdigest() + '+1234'
+ data = tutil.str_keep_locator(data)
# Arbitrary port number:
aport = random.randint(1024,65535)
api_client = self.mock_keep_services(service_port=aport, count=self.services)
commit 6de60c7db7a98405ef7ae4ac5eb20498f095416c
Merge: d3171a9 a1dc811
Author: Brett Smith <brett at curoverse.com>
Date: Fri Nov 13 09:28:12 2015 -0500
Merge branch '7123-crunch-no-record-log-failure-wip'
Closes #7123, #7741.
commit a1dc811844d2dc76bea5ebfdc2f571a12cb41b49
Author: Brett Smith <brett at curoverse.com>
Date: Mon Nov 9 10:28:51 2015 -0500
7123: Crunch doesn't update job log when arv-put fails.
This prevents crunch-job from recording the empty collection as a
job's log. Most other components (Workbench, the log clenaer)
recognize a null log as a special case; less so the empty collection.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index f78824b..70ba04f 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1698,20 +1698,24 @@ sub log_writer_finish()
close($log_pipe_in);
+ my $logger_failed = 0;
my $read_result = log_writer_read_output(120);
if ($read_result == -1) {
+ $logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
} elsif ($read_result != 0) {
+ $logger_failed = -2;
Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
if ($?) {
+ $logger_failed ||= $?;
Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
close($log_pipe_out);
- my $arv_put_output = $log_pipe_out_buf;
+ my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
$log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
$log_pipe_out_select = undef;
@@ -1777,13 +1781,13 @@ sub save_meta
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
+ my $log_manifest = log_writer_finish();
+ return unless defined($log_manifest);
- my $log_manifest = "";
if ($Job->{log}) {
my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
- $log_manifest .= $prev_log_coll->{manifest_text};
+ $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
}
- $log_manifest .= log_writer_finish();
my $log_coll = api_call(
"collections/create", ensure_unique_name => 1, collection => {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list