[ARVADOS] created: 536f5fa8d8deb3c04dfc296a15a179d5c08c1757
git at public.curoverse.com
git at public.curoverse.com
Mon May 26 08:32:56 EDT 2014
at 536f5fa8d8deb3c04dfc296a15a179d5c08c1757 (commit)
commit 536f5fa8d8deb3c04dfc296a15a179d5c08c1757
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 26 07:31:07 2014 -0400
2752: arv-put shouldn't resume from expired Keep locators.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 814bd75..7b38ccd 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -396,6 +396,10 @@ class ResumableCollectionWriter(CollectionWriter):
attr_value = attr_class(attr_value)
setattr(writer, attr_name, attr_value)
# Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
writer.check_dependencies()
if state['_current_file'] is not None:
path, pos = state['_current_file']
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index bb64c77..4d1e150 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -585,6 +585,20 @@ class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
TestResumableWriter.from_state,
cwriter.last_state())
+ def test_resume_fails_with_expired_locator(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ state = cwriter.last_state()
+ # Get the last locator, remove any permission hint, and add
+ # an expired one.
+ new_loc = state['_current_stream_locators'][-1].split('+A', 1)[0]
+ state['_current_stream_locators'][-1] = "{}+A{}@10000000".format(
+ new_loc, 'a' * 40)
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state, state)
+
def test_successful_resumes(self):
# FIXME: This is more of an integration test than a unit test.
cwriter = TestResumableWriter()
commit e9dc92eb773d669a3f22a2b6fdc9c2a42670dd36
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 26 07:15:37 2014 -0400
2752: Add KeepLocator class to Python SDK.
I hope this can be one place to parse and manipulate locator strings.
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index e414d26..bd98466 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -18,6 +18,7 @@ import fcntl
import time
import threading
import timer
+import datetime
global_client_object = None
@@ -25,6 +26,89 @@ from api import *
import config
import arvados.errors
+class KeepLocator(object):
+ EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+ HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
+
+ def __init__(self, locator_str):
+ self.size = None
+ self.loc_hint = None
+ self._perm_sig = None
+ self._perm_expiry = None
+ pieces = iter(locator_str.split('+'))
+ self.md5sum = next(pieces)
+ for hint in pieces:
+ if hint.startswith('A'):
+ self.parse_permission_hint(hint)
+ elif hint.startswith('K'):
+ self.loc_hint = hint # FIXME
+ elif hint.isdigit():
+ self.size = int(hint)
+ else:
+ raise ValueError("unrecognized hint data {}".format(hint))
+
+ def __str__(self):
+ return '+'.join(
+ str(s) for s in [self.md5sum, self.size, self.loc_hint,
+ self.permission_hint()]
+ if s is not None)
+
+ def _is_hex_length(self, s, *size_spec):
+ if len(size_spec) == 1:
+ good_len = (len(s) == size_spec[0])
+ else:
+ good_len = (size_spec[0] <= len(s) <= size_spec[1])
+ return good_len and self.HEX_RE.match(s)
+
+ def _make_hex_prop(name, length):
+ # Build and return a new property with the given name that
+ # must be a hex string of the given length.
+ data_name = '_{}'.format(name)
+ def getter(self):
+ return getattr(self, data_name)
+ def setter(self, hex_str):
+ if not self._is_hex_length(hex_str, length):
+ raise ValueError("{} must be a {}-digit hex string: {}".
+ format(name, length, hex_str))
+ setattr(self, data_name, hex_str)
+ return property(getter, setter)
+
+ md5sum = _make_hex_prop('md5sum', 32)
+ perm_sig = _make_hex_prop('perm_sig', 40)
+
+ @property
+ def perm_expiry(self):
+ return self._perm_expiry
+
+ @perm_expiry.setter
+ def perm_expiry(self, value):
+ if not self._is_hex_length(value, 1, 8):
+ raise ValueError(
+ "permission timestamp must be a hex Unix timestamp: {}".
+ format(value))
+ self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
+
+ def permission_hint(self):
+ data = [self.perm_sig, self.perm_expiry]
+ if None in data:
+ return None
+ data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
+ return "A{}@{:08x}".format(*data)
+
+ def parse_permission_hint(self, s):
+ try:
+ self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
+ except IndexError:
+ raise ValueError("bad permission hint {}".format(s))
+
+ def permission_expired(self, as_of_dt=None):
+ if self.perm_expiry is None:
+ return False
+ elif as_of_dt is None:
+ as_of_dt = datetime.datetime.now()
+ return self.perm_expiry <= as_of_dt
+
+
class Keep:
@staticmethod
def global_client_object():
diff --git a/sdk/python/tests/test_keep_locator.py b/sdk/python/tests/test_keep_locator.py
new file mode 100644
index 0000000..e9d6356
--- /dev/null
+++ b/sdk/python/tests/test_keep_locator.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import datetime
+import itertools
+import random
+import unittest
+
+from arvados.keep import KeepLocator
+
+class ArvadosPutResumeCacheTest(unittest.TestCase):
+ DEFAULT_TEST_COUNT = 10
+
+ def numstrs(fmtstr, base, exponent):
+ def genstrs(self, count=None):
+ return (fmtstr.format(random.randint(0, base ** exponent))
+ for c in xrange(count or self.DEFAULT_TEST_COUNT))
+ return genstrs
+
+ checksums = numstrs('{:032x}', 16, 32)
+ sizes = numstrs('{:d}', 2, 26)
+ signatures = numstrs('{:040x}', 16, 40)
+ timestamps = numstrs('{:08x}', 16, 8)
+
+ def perm_hints(self, count=DEFAULT_TEST_COUNT):
+ for sig, ts in itertools.izip(self.signatures(count),
+ self.timestamps(count)):
+ yield 'A{}@{}'.format(sig, ts)
+
+ def test_good_locators_returned(self):
+ for hint_gens in [(), (self.sizes(),), (self.perm_hints(),),
+ (self.sizes(), self.perm_hints())]:
+ for loc_data in itertools.izip(self.checksums(), *hint_gens):
+ locator = '+'.join(loc_data)
+ self.assertEquals(locator, str(KeepLocator(locator)))
+
+ def test_nonchecksum_rejected(self):
+ for badstr in ['', 'badbadbad', '8f9e68d957b504a29ba76c526c3145dj',
+ '+8f9e68d957b504a29ba76c526c3145d9',
+ '3+8f9e68d957b504a29ba76c526c3145d9']:
+ self.assertRaises(ValueError, KeepLocator, badstr)
+
+ def test_bad_hints_rejected(self):
+ checksum = next(self.checksums(1))
+ for badhint in ['', 'nonsense', '+32', checksum]:
+ self.assertRaises(ValueError, KeepLocator,
+ '+'.join([checksum, badhint]))
+
+ def test_expiry_passed(self):
+ checksum = next(self.checksums(1))
+ signature = next(self.signatures(1))
+ dt1980 = datetime.datetime(1980, 1, 1)
+ dt2000 = datetime.datetime(2000, 2, 2)
+ dt2080 = datetime.datetime(2080, 3, 3)
+ locator = KeepLocator(checksum)
+ self.assertFalse(locator.permission_expired())
+ self.assertFalse(locator.permission_expired(dt1980))
+ self.assertFalse(locator.permission_expired(dt2080))
+ # Timestamped to 1987-01-05 18:48:32.
+ locator = KeepLocator('{}+A{}@20000000'.format(checksum, signature))
+ self.assertTrue(locator.permission_expired())
+ self.assertTrue(locator.permission_expired(dt2000))
+ self.assertFalse(locator.permission_expired(dt1980))
+
+
+if __name__ == '__main__':
+ unittest.main()
commit 6c56a891a36f43df89234add46958e0e95c2d1ae
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 26 08:12:25 2014 -0400
2752: arv-put resumes interrupted downloads from cache.
This commit simply uses all the state-saving work in arv-put's main()
function.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 4395047..e113bbd 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -108,6 +108,16 @@ def parse_arguments(arguments):
total data size).
""")
+ group = arg_parser.add_mutually_exclusive_group()
+ group.add_argument('--resume', action='store_true', default=True,
+ help="""
+ Continue interrupted uploads from cached state (default).
+ """)
+ group.add_argument('--no-resume', action='store_false', dest='resume',
+ help="""
+ Do not continue interrupted uploads from cached state.
+ """)
+
args = parser.parse_args(arguments)
if len(args.paths) == 0:
@@ -139,6 +149,16 @@ class ResumeCacheConflict(Exception):
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+ @classmethod
+ def setup_user_cache(cls):
+ try:
+ os.makedirs(cls.CACHE_DIR)
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ else:
+ os.chmod(cls.CACHE_DIR, 0o700)
+
def __init__(self, file_spec):
try:
self.cache_file = open(file_spec, 'a+')
@@ -281,6 +301,7 @@ def progress_writer(progress_func, outfile=sys.stderr):
return write_progress
def main(arguments=None):
+ ResumeCache.setup_user_cache()
args = parse_arguments(arguments)
if args.progress:
@@ -290,8 +311,16 @@ def main(arguments=None):
else:
reporter = None
- writer = ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
+ try:
+ resume_cache = ResumeCache(args)
+ if not args.resume:
+ resume_cache.restart()
+ except ResumeCacheConflict:
+ print "arv-put: Another process is already uploading this data."
+ sys.exit(1)
+
+ writer = ArvPutCollectionWriter.from_cache(
+ resume_cache, reporter, expected_bytes_for(args.paths))
# Copy file data to Keep.
for path in args.paths:
@@ -318,6 +347,7 @@ def main(arguments=None):
# Print the locator (uuid) of the new collection.
print writer.finish()
+ resume_cache.destroy()
if __name__ == '__main__':
main()
commit fd2809f4c0c8d20f76e8f360673990879fbe3ad9
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 23 16:46:49 2014 -0400
2752: Add ResumeCache.restart().
This will make it easier for the user to bypass the cache.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 37974c0..4395047 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -198,6 +198,10 @@ class ResumeCache(object):
raise
self.close()
+ def restart(self):
+ self.destroy()
+ self.__init__(self.filename)
+
class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
def __init__(self, cache=None, reporter=None, bytes_expected=None):
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index e765482..a98eaa6 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -170,6 +170,15 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
if os.path.exists(cachefile.name):
os.unlink(cachefile.name)
+ def test_restart_cache(self):
+ path = os.path.join(self.make_tmpdir(), 'cache')
+ cache = arv_put.ResumeCache(path)
+ cache.save('test')
+ cache.restart()
+ self.assertRaises(ValueError, cache.load)
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, path)
+
class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
def setUp(self):
commit ca72f9044c8c39b89c4058f648e4bc1a97c1bc1f
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 23 16:22:39 2014 -0400
2752: Refactor progress reporting in arv-put's CollectionWriter.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 705dcfd..37974c0 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -199,22 +199,28 @@ class ResumeCache(object):
self.close()
-class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
- def __init__(self, cache=None):
+class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
+ def __init__(self, cache=None, reporter=None, bytes_expected=None):
+ self.__init_locals__(cache, reporter, bytes_expected)
+ super(ArvPutCollectionWriter, self).__init__()
+
+ def __init_locals__(self, cache, reporter, bytes_expected):
self.cache = cache
- super(ResumeCacheCollectionWriter, self).__init__()
+ self.report_func = reporter
+ self.bytes_written = 0
+ self.bytes_expected = bytes_expected
@classmethod
- def from_cache(cls, cache):
+ def from_cache(cls, cache, reporter=None, bytes_expected=None):
try:
state = cache.load()
state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
writer = cls.from_state(state)
except (TypeError, ValueError,
arvados.errors.StaleWriterStateError) as error:
- return cls(cache)
+ return cls(cache, reporter, bytes_expected)
else:
- writer.cache = cache
+ writer.__init_locals__(cache, reporter, bytes_expected)
return writer
def checkpoint_state(self):
@@ -229,41 +235,17 @@ class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
state[attr] = list(value)
self.cache.save(state)
-
-class CollectionWriterWithProgress(arvados.CollectionWriter):
- def flush_data(self, *args, **kwargs):
- if not getattr(self, 'display_type', None):
- return
- if not hasattr(self, 'bytes_flushed'):
- self.bytes_flushed = 0
- self.bytes_flushed += self._data_buffer_len
- super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
- self.bytes_flushed -= self._data_buffer_len
- if self.display_type == 'machine':
- sys.stderr.write('%s %d: %d written %d total\n' %
- (sys.argv[0],
- os.getpid(),
- self.bytes_flushed,
- getattr(self, 'bytes_expected', -1)))
- elif getattr(self, 'bytes_expected', 0) > 0:
- pct = 100.0 * self.bytes_flushed / self.bytes_expected
- sys.stderr.write('\r%dM / %dM %.1f%% ' %
- (self.bytes_flushed >> 20,
- self.bytes_expected >> 20, pct))
- else:
- sys.stderr.write('\r%d ' % self.bytes_flushed)
-
- def manifest_text(self, *args, **kwargs):
- manifest_text = (super(CollectionWriterWithProgress, self)
- .manifest_text(*args, **kwargs))
- if getattr(self, 'display_type', None):
- if self.display_type == 'human':
- sys.stderr.write('\n')
- self.display_type = None
- return manifest_text
+ def flush_data(self):
+ bytes_buffered = self._data_buffer_len
+ super(ArvPutCollectionWriter, self).flush_data()
+ self.bytes_written += (bytes_buffered - self._data_buffer_len)
+ if self.report_func is not None:
+ self.report_func(self.bytes_written, self.bytes_expected)
def expected_bytes_for(pathlist):
+ # Walk the given directory trees and stat files, adding up file sizes,
+ # so we can display progress as percent
bytesum = 0
for path in pathlist:
if os.path.isdir(path):
@@ -289,23 +271,23 @@ def human_progress(bytes_written, bytes_expected):
else:
return "\r{} ".format(bytes_written)
+def progress_writer(progress_func, outfile=sys.stderr):
+ def write_progress(bytes_written, bytes_expected):
+ outfile.write(progress_func(bytes_written, bytes_expected))
+ return write_progress
+
def main(arguments=None):
args = parse_arguments(arguments)
if args.progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'human'
+ reporter = progress_writer(human_progress)
elif args.batch_progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'machine'
+ reporter = progress_writer(machine_progress)
else:
- writer = arvados.CollectionWriter()
+ reporter = None
- # Walk the given directory trees and stat files, adding up file sizes,
- # so we can display progress as percent
- writer.bytes_expected = expected_bytes_for(args.paths)
- if writer.bytes_expected is None:
- del writer.bytes_expected
+ writer = ArvPutCollectionWriter(
+ reporter=reporter, bytes_expected=expected_bytes_for(args.paths))
# Copy file data to Keep.
for path in args.paths:
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index db03eca..e765482 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -171,62 +171,73 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
os.unlink(cachefile.name)
-class ArvadosPutResumeCacheCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
+class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
def setUp(self):
- super(ArvadosPutResumeCacheCollectionWriterTest, self).setUp()
+ super(ArvadosPutCollectionWriterTest, self).setUp()
with tempfile.NamedTemporaryFile(delete=False) as cachefile:
self.cache = arv_put.ResumeCache(cachefile.name)
self.cache_filename = cachefile.name
def tearDown(self):
- super(ArvadosPutResumeCacheCollectionWriterTest, self).tearDown()
+ super(ArvadosPutCollectionWriterTest, self).tearDown()
if os.path.exists(self.cache_filename):
self.cache.destroy()
self.cache.close()
def test_writer_caches(self):
- cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
cwriter.write_file('/dev/null')
self.assertTrue(self.cache.load())
self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
def test_writer_works_without_cache(self):
- cwriter = arv_put.ResumeCacheCollectionWriter()
+ cwriter = arv_put.ArvPutCollectionWriter()
cwriter.write_file('/dev/null')
self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
def test_writer_resumes_from_cache(self):
- cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
with self.make_test_file() as testfile:
cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(
self.cache)
self.assertEquals(
". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
new_writer.manifest_text())
def test_new_writer_from_stale_cache(self):
- cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
with self.make_test_file() as testfile:
cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(self.cache)
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
new_writer.write_file('/dev/null')
self.assertEquals(". 0:0:null\n", new_writer.manifest_text())
def test_new_writer_from_empty_cache(self):
- cwriter = arv_put.ResumeCacheCollectionWriter.from_cache(self.cache)
+ cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
cwriter.write_file('/dev/null')
self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
def test_writer_resumable_after_arbitrary_bytes(self):
- cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ cwriter = arv_put.ArvPutCollectionWriter(self.cache)
# These bytes are intentionally not valid UTF-8.
with self.make_test_file('\x00\x07\xe2') as testfile:
cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(
+ new_writer = arv_put.ArvPutCollectionWriter.from_cache(
self.cache)
self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+ def test_progress_reporting(self):
+ for expect_count in (None, 8):
+ progression = []
+ cwriter = arv_put.ArvPutCollectionWriter(
+ reporter=lambda *args: progression.append(args),
+ bytes_expected=expect_count)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ cwriter.finish_current_stream()
+ self.assertIn((4, expect_count), progression)
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
commit 114f6b556d1d38bb4614befd5e5107465161fc99
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 23 15:52:50 2014 -0400
2752: Separate and test progress reporting functions.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 5de8616..705dcfd 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -275,6 +275,20 @@ def expected_bytes_for(pathlist):
bytesum += os.path.getsize(path)
return bytesum
+_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
+ os.getpid())
+def machine_progress(bytes_written, bytes_expected):
+ return _machine_format.format(
+ bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+def human_progress(bytes_written, bytes_expected):
+ if bytes_expected:
+ return "\r{}M / {}M {:.1f}% ".format(
+ bytes_written >> 20, bytes_expected >> 20,
+ bytes_written / bytes_expected)
+ else:
+ return "\r{} ".format(bytes_written)
+
def main(arguments=None):
args = parse_arguments(arguments)
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index 50060ef..db03eca 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -249,6 +249,27 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+class ArvadosPutReportTest(ArvadosBaseTestCase):
+ def test_machine_progress(self):
+ for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
+ expect = ": {} written {} total\n".format(
+ count, -1 if (total is None) else total)
+ self.assertTrue(
+ arv_put.machine_progress(count, total).endswith(expect))
+
+ def test_known_human_progress(self):
+ for count, total in [(0, 1), (2, 4), (45, 60)]:
+ expect = '{:.1f}%'.format(count / total)
+ actual = arv_put.human_progress(count, total)
+ self.assertTrue(actual.startswith('\r'))
+ self.assertIn(expect, actual)
+
+ def test_unknown_human_progress(self):
+ for count in [1, 20, 300, 4000, 50000]:
+ self.assertTrue(re.search(r'\b{}\b'.format(count),
+ arv_put.human_progress(count, None)))
+
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def test_simple_file_put(self):
with self.make_test_file() as testfile:
commit c2214d31507297b8b9049b8e7704b49080cb0795
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 23 14:55:56 2014 -0400
2752: Factor out arv-put byte tally, and test.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index d3da8cf..5de8616 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -263,6 +263,18 @@ class CollectionWriterWithProgress(arvados.CollectionWriter):
return manifest_text
+def expected_bytes_for(pathlist):
+ bytesum = 0
+ for path in pathlist:
+ if os.path.isdir(path):
+ for filename in arvados.util.listdir_recursive(path):
+ bytesum += os.path.getsize(os.path.join(path, filename))
+ elif not os.path.isfile(path):
+ return None
+ else:
+ bytesum += os.path.getsize(path)
+ return bytesum
+
def main(arguments=None):
args = parse_arguments(arguments)
@@ -277,17 +289,9 @@ def main(arguments=None):
# Walk the given directory trees and stat files, adding up file sizes,
# so we can display progress as percent
- writer.bytes_expected = 0
- for path in args.paths:
- if os.path.isdir(path):
- for filename in arvados.util.listdir_recursive(path):
- writer.bytes_expected += os.path.getsize(
- os.path.join(path, filename))
- elif not os.path.isfile(path):
- del writer.bytes_expected
- break
- else:
- writer.bytes_expected += os.path.getsize(path)
+ writer.bytes_expected = expected_bytes_for(args.paths)
+ if writer.bytes_expected is None:
+ del writer.bytes_expected
# Copy file data to Keep.
for path in args.paths:
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index 8412c6a..50060ef 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -3,6 +3,7 @@
import os
import re
+import shutil
import tempfile
import unittest
@@ -227,6 +228,27 @@ class ArvadosPutResumeCacheCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
+ TEST_SIZE = os.path.getsize(__file__)
+
+ def test_expected_bytes_for_file(self):
+ self.assertEquals(self.TEST_SIZE,
+ arv_put.expected_bytes_for([__file__]))
+
+ def test_expected_bytes_for_tree(self):
+ tree = self.make_tmpdir()
+ shutil.copyfile(__file__, os.path.join(tree, 'one'))
+ shutil.copyfile(__file__, os.path.join(tree, 'two'))
+ self.assertEquals(self.TEST_SIZE * 2,
+ arv_put.expected_bytes_for([tree]))
+ self.assertEquals(self.TEST_SIZE * 3,
+ arv_put.expected_bytes_for([tree, __file__]))
+
+ def test_expected_bytes_for_device(self):
+ self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
+ self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def test_simple_file_put(self):
with self.make_test_file() as testfile:
commit 88092c27bf020aa1d08d754af7a4c3549ae166dc
Author: Brett Smith <brett at curoverse.com>
Date: Fri May 23 13:16:25 2014 -0400
2752: Add ResumableCollectionWriter serialization to arv-put.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index a0dec2b..d3da8cf 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -5,6 +5,7 @@
import argparse
import arvados
+import base64
import errno
import fcntl
import hashlib
@@ -198,6 +199,37 @@ class ResumeCache(object):
self.close()
+class ResumeCacheCollectionWriter(arvados.ResumableCollectionWriter):
+ def __init__(self, cache=None):
+ self.cache = cache
+ super(ResumeCacheCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_cache(cls, cache):
+ try:
+ state = cache.load()
+ state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
+ writer = cls.from_state(state)
+ except (TypeError, ValueError,
+ arvados.errors.StaleWriterStateError) as error:
+ return cls(cache)
+ else:
+ writer.cache = cache
+ return writer
+
+ def checkpoint_state(self):
+ if self.cache is None:
+ return
+ state = self.dump_state()
+ # Transform attributes for serialization.
+ for attr, value in state.items():
+ if attr == '_data_buffer':
+ state[attr] = base64.encodestring(''.join(value))
+ elif hasattr(value, 'popleft'):
+ state[attr] = list(value)
+ self.cache.save(state)
+
+
class CollectionWriterWithProgress(arvados.CollectionWriter):
def flush_data(self, *args, **kwargs):
if not getattr(self, 'display_type', None):
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index d2aaf42..8412c6a 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -1,4 +1,5 @@
#!/usr/bin/env python
+# -*- coding: utf-8 -*-
import os
import re
@@ -169,6 +170,63 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
os.unlink(cachefile.name)
+class ArvadosPutResumeCacheCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
+ def setUp(self):
+ super(ArvadosPutResumeCacheCollectionWriterTest, self).setUp()
+ with tempfile.NamedTemporaryFile(delete=False) as cachefile:
+ self.cache = arv_put.ResumeCache(cachefile.name)
+ self.cache_filename = cachefile.name
+
+ def tearDown(self):
+ super(ArvadosPutResumeCacheCollectionWriterTest, self).tearDown()
+ if os.path.exists(self.cache_filename):
+ self.cache.destroy()
+ self.cache.close()
+
+ def test_writer_caches(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ cwriter.write_file('/dev/null')
+ self.assertTrue(self.cache.load())
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_works_without_cache(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter()
+ cwriter.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_resumes_from_cache(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(
+ self.cache)
+ self.assertEquals(
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
+ new_writer.manifest_text())
+
+ def test_new_writer_from_stale_cache(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(self.cache)
+ new_writer.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", new_writer.manifest_text())
+
+ def test_new_writer_from_empty_cache(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter.from_cache(self.cache)
+ cwriter.write_file('/dev/null')
+ self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
+
+ def test_writer_resumable_after_arbitrary_bytes(self):
+ cwriter = arv_put.ResumeCacheCollectionWriter(self.cache)
+ # These bytes are intentionally not valid UTF-8.
+ with self.make_test_file('\x00\x07\xe2') as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ new_writer = arv_put.ResumeCacheCollectionWriter.from_cache(
+ self.cache)
+ self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
+
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def test_simple_file_put(self):
with self.make_test_file() as testfile:
commit 4d84bf681a8a197e6e900dcfc9d82e7fe13bac5f
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 26 08:02:15 2014 -0400
2752: Add ResumeCache to arv-put.
This class encapsulates all the functionality necessary to
de/serialize ResumableWriter state.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 9324835..a0dec2b 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -5,9 +5,13 @@
import argparse
import arvados
+import errno
+import fcntl
import hashlib
+import json
import os
import sys
+import tempfile
def parse_arguments(arguments):
parser = argparse.ArgumentParser(
@@ -127,9 +131,22 @@ def parse_arguments(arguments):
return args
+class ResumeCacheConflict(Exception):
+ pass
+
+
class ResumeCache(object):
CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+ def __init__(self, file_spec):
+ try:
+ self.cache_file = open(file_spec, 'a+')
+ except TypeError:
+ file_spec = self.make_path(file_spec)
+ self.cache_file = open(file_spec, 'a+')
+ self._lock_file(self.cache_file)
+ self.filename = self.cache_file.name
+
@classmethod
def make_path(cls, args):
md5 = hashlib.md5()
@@ -142,6 +159,44 @@ class ResumeCache(object):
md5.update(args.filename)
return os.path.join(cls.CACHE_DIR, md5.hexdigest())
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def load(self):
+ self.cache_file.seek(0)
+ return json.load(self.cache_file)
+
+ def save(self, data):
+ try:
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self.filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(data, new_cache)
+ os.rename(new_cache_name, self.filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self.cache_file.close()
+ self.cache_file = new_cache
+
+ def close(self):
+ self.cache_file.close()
+
+ def destroy(self):
+ try:
+ os.unlink(self.filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ raise
+ self.close()
+
class CollectionWriterWithProgress(arvados.CollectionWriter):
def flush_data(self, *args, **kwargs):
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index 306f5f4..d2aaf42 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -19,6 +19,13 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
['/tmp', '--max-manifest-depth', '1']
]
+ def tearDown(self):
+ super(ArvadosPutResumeCacheTest, self).tearDown()
+ try:
+ self.last_cache.destroy()
+ except AttributeError:
+ pass
+
def cache_path_from_arglist(self, arglist):
return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
@@ -101,6 +108,66 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
else:
config['ARVADOS_API_HOST'] = orig_host
+ def test_basic_cache_storage(self):
+ thing = ['test', 'list']
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ self.last_cache.save(thing)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_empty_cache(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ cache = arv_put.ResumeCache(cachefile.name)
+ self.assertRaises(ValueError, cache.load)
+
+ def test_cache_persistent(self):
+ thing = ['test', 'list']
+ path = os.path.join(self.make_tmpdir(), 'cache')
+ cache = arv_put.ResumeCache(path)
+ cache.save(thing)
+ cache.close()
+ self.last_cache = arv_put.ResumeCache(path)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_multiple_cache_writes(self):
+ thing = ['short', 'list']
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ # Start writing an object longer than the one we test, to make
+ # sure the cache file gets truncated.
+ self.last_cache.save(['long', 'long', 'list'])
+ self.last_cache.save(thing)
+ self.assertEquals(thing, self.last_cache.load())
+
+ def test_cache_is_locked(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ cache = arv_put.ResumeCache(cachefile.name)
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, cachefile.name)
+
+ def test_cache_stays_locked(self):
+ with tempfile.NamedTemporaryFile() as cachefile:
+ self.last_cache = arv_put.ResumeCache(cachefile.name)
+ path = cachefile.name
+ self.last_cache.save('test')
+ self.assertRaises(arv_put.ResumeCacheConflict,
+ arv_put.ResumeCache, path)
+
+ def test_destroy_cache(self):
+ cachefile = tempfile.NamedTemporaryFile(delete=False)
+ try:
+ cache = arv_put.ResumeCache(cachefile.name)
+ cache.save('test')
+ cache.destroy()
+ try:
+ arv_put.ResumeCache(cachefile.name)
+ except arv_put.ResumeCacheConflict:
+ self.fail("could not load cache after destroying it")
+ self.assertRaises(ValueError, cache.load)
+ finally:
+ if os.path.exists(cachefile.name):
+ os.unlink(cachefile.name)
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def test_simple_file_put(self):
commit 461c910330846018f0f545e8055f6304c63fedd2
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 26 08:01:27 2014 -0400
2752: Generate cache filenames from arv-put arguments.
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 8082752..9324835 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -5,6 +5,7 @@
import argparse
import arvados
+import hashlib
import os
import sys
@@ -126,6 +127,22 @@ def parse_arguments(arguments):
return args
+class ResumeCache(object):
+ CACHE_DIR = os.path.expanduser('~/.cache/arvados/arv-put')
+
+ @classmethod
+ def make_path(cls, args):
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in args.paths)
+ md5.update(''.join(realpaths))
+ if any(os.path.isdir(path) for path in realpaths):
+ md5.update(str(max(args.max_manifest_depth, -1)))
+ elif args.filename:
+ md5.update(args.filename)
+ return os.path.join(cls.CACHE_DIR, md5.hexdigest())
+
+
class CollectionWriterWithProgress(arvados.CollectionWriter):
def flush_data(self, *args, **kwargs):
if not getattr(self, 'display_type', None):
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
index c5d446e..306f5f4 100644
--- a/sdk/python/tests/test_arv-put.py
+++ b/sdk/python/tests/test_arv-put.py
@@ -1,12 +1,106 @@
#!/usr/bin/env python
import os
+import re
import tempfile
import unittest
import arvados
import arvados.commands.put as arv_put
-from arvados_testutil import ArvadosKeepLocalStoreTestCase
+from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
+
+class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
+ CACHE_ARGSET = [
+ [],
+ ['/dev/null'],
+ ['/dev/null', '--filename', 'empty'],
+ ['/tmp'],
+ ['/tmp', '--max-manifest-depth', '0'],
+ ['/tmp', '--max-manifest-depth', '1']
+ ]
+
+ def cache_path_from_arglist(self, arglist):
+ return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
+
+ def test_cache_names_stable(self):
+ for argset in self.CACHE_ARGSET:
+ self.assertEquals(self.cache_path_from_arglist(argset),
+ self.cache_path_from_arglist(argset),
+ "cache name changed for {}".format(argset))
+
+ def test_cache_names_unique(self):
+ results = []
+ for argset in self.CACHE_ARGSET:
+ path = self.cache_path_from_arglist(argset)
+ self.assertNotIn(path, results)
+ results.append(path)
+
+ def test_cache_names_simple(self):
+ # The goal here is to make sure the filename doesn't use characters
+ # reserved by the filesystem. Feel free to adjust this regexp as
+ # long as it still does that.
+ bad_chars = re.compile(r'[^-\.\w]')
+ for argset in self.CACHE_ARGSET:
+ path = self.cache_path_from_arglist(argset)
+ self.assertFalse(bad_chars.search(os.path.basename(path)),
+ "path too exotic: {}".format(path))
+
+ def test_cache_names_ignore_argument_order(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['a', 'b', 'c']),
+ self.cache_path_from_arglist(['c', 'a', 'b']))
+ self.assertEquals(
+ self.cache_path_from_arglist(['-', '--filename', 'stdin']),
+ self.cache_path_from_arglist(['--filename', 'stdin', '-']))
+
+ def test_cache_names_ignore_irrelevant_arguments(self):
+ # Workaround: parse_arguments bails on --filename with a directory.
+ args1 = arv_put.parse_arguments(['/tmp'])
+ args2 = arv_put.parse_arguments(['/tmp'])
+ args2.filename = 'tmp'
+ self.assertEquals(arv_put.ResumeCache.make_path(args1),
+ arv_put.ResumeCache.make_path(args2),
+ "cache path considered --filename for directory")
+ self.assertEquals(
+ self.cache_path_from_arglist(['-']),
+ self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
+ "cache path considered --max-manifest-depth for file")
+
+ def test_cache_names_treat_negative_manifest_depths_identically(self):
+ base_args = ['/tmp', '--max-manifest-depth']
+ self.assertEquals(
+ self.cache_path_from_arglist(base_args + ['-1']),
+ self.cache_path_from_arglist(base_args + ['-2']))
+
+ def test_cache_names_treat_stdin_consistently(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['-', '--filename', 'test']),
+ self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
+
+ def test_cache_names_identical_for_synonymous_names(self):
+ self.assertEquals(
+ self.cache_path_from_arglist(['.']),
+ self.cache_path_from_arglist([os.path.realpath('.')]))
+ testdir = self.make_tmpdir()
+ looplink = os.path.join(testdir, 'loop')
+ os.symlink(testdir, looplink)
+ self.assertEquals(
+ self.cache_path_from_arglist([testdir]),
+ self.cache_path_from_arglist([looplink]))
+
+ def test_cache_names_different_by_api_host(self):
+ config = arvados.config.settings()
+ orig_host = config.get('ARVADOS_API_HOST')
+ try:
+ name1 = self.cache_path_from_arglist(['.'])
+ config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
+ self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
+ finally:
+ if orig_host is None:
+ del config['ARVADOS_API_HOST']
+ else:
+ config['ARVADOS_API_HOST'] = orig_host
+
class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
def test_simple_file_put(self):
commit 253cb42ed3f59230500329ee807be4ad868512c3
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 22 17:10:40 2014 -0400
2752: Add basic test for arv-put.
diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py
new file mode 100644
index 0000000..c5d446e
--- /dev/null
+++ b/sdk/python/tests/test_arv-put.py
@@ -0,0 +1,23 @@
+#!/usr/bin/env python
+
+import os
+import tempfile
+import unittest
+
+import arvados
+import arvados.commands.put as arv_put
+from arvados_testutil import ArvadosKeepLocalStoreTestCase
+
+class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
+ def test_simple_file_put(self):
+ with self.make_test_file() as testfile:
+ path = testfile.name
+ arv_put.main(['--stream', '--no-progress', path])
+ self.assertTrue(
+ os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
+ '098f6bcd4621d373cade4e832627b4f6')),
+ "did not find file stream in Keep store")
+
+
+if __name__ == '__main__':
+ unittest.main()
commit ef2497ff77afb5bf36f7eb93019cb52797b681af
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 22 16:46:15 2014 -0400
2752: Extract common Python test methods.
I'm going to reuse these to test arv-put.
diff --git a/sdk/python/tests/arvados_testutil.py b/sdk/python/tests/arvados_testutil.py
new file mode 100644
index 0000000..cd86d80
--- /dev/null
+++ b/sdk/python/tests/arvados_testutil.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+
+import errno
+import os
+import shutil
+import tempfile
+import unittest
+
+class ArvadosBaseTestCase(unittest.TestCase):
+ # This class provides common utility functions for our tests.
+
+ def setUp(self):
+ self._tempdirs = []
+
+ def tearDown(self):
+ for workdir in self._tempdirs:
+ shutil.rmtree(workdir, ignore_errors=True)
+
+ def make_tmpdir(self):
+ self._tempdirs.append(tempfile.mkdtemp())
+ return self._tempdirs[-1]
+
+ def data_file(self, filename):
+ try:
+ basedir = os.path.dirname(__file__)
+ except NameError:
+ basedir = '.'
+ return open(os.path.join(basedir, 'data', filename))
+
+
+class ArvadosKeepLocalStoreTestCase(ArvadosBaseTestCase):
+ def setUp(self):
+ super(ArvadosKeepLocalStoreTestCase, self).setUp()
+ self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
+ os.environ['KEEP_LOCAL_STORE'] = self.make_tmpdir()
+
+ def tearDown(self):
+ if self._orig_keep_local_store is None:
+ del os.environ['KEEP_LOCAL_STORE']
+ else:
+ os.environ['KEEP_LOCAL_STORE'] = self._orig_keep_local_store
+ super(ArvadosKeepLocalStoreTestCase, self).tearDown()
+
+ def build_directory_tree(self, tree):
+ tree_root = self.make_tmpdir()
+ for leaf in tree:
+ path = os.path.join(tree_root, leaf)
+ try:
+ os.makedirs(os.path.dirname(path))
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ with open(path, 'w') as tmpfile:
+ tmpfile.write(leaf)
+ return tree_root
+
+ def make_test_file(self, text="test"):
+ testfile = tempfile.NamedTemporaryFile()
+ testfile.write(text)
+ testfile.flush()
+ return testfile
diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py
index f260b4e..bb64c77 100644
--- a/sdk/python/tests/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -5,15 +5,14 @@
import arvados
import bz2
import copy
-import errno
import os
import pprint
-import shutil
import subprocess
-import sys
import tempfile
import unittest
+from arvados_testutil import ArvadosKeepLocalStoreTestCase
+
class TestResumableWriter(arvados.ResumableCollectionWriter):
KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
@@ -29,24 +28,7 @@ class TestResumableWriter(arvados.ResumableCollectionWriter):
return self.saved_states[-1]
-class ArvadosCollectionsTest(unittest.TestCase):
- def _make_tmpdir(self):
- self._tempdirs.append(tempfile.mkdtemp())
- return self._tempdirs[-1]
-
- def setUp(self):
- self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
- self._tempdirs = []
- os.environ['KEEP_LOCAL_STORE'] = self._make_tmpdir()
-
- def tearDown(self):
- for workdir in self._tempdirs:
- shutil.rmtree(workdir, ignore_errors=True)
- if self._orig_keep_local_store is None:
- del os.environ['KEEP_LOCAL_STORE']
- else:
- os.environ['KEEP_LOCAL_STORE'] = self._orig_keep_local_store
-
+class ArvadosCollectionsTest(ArvadosKeepLocalStoreTestCase):
def write_foo_bar_baz(self):
cw = arvados.CollectionWriter()
self.assertEqual(cw.current_stream_name(), '.',
@@ -219,13 +201,6 @@ class ArvadosCollectionsTest(unittest.TestCase):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
- def data_file(self, filename):
- try:
- basedir = os.path.dirname(__file__)
- except NameError:
- basedir = '.'
- return open(os.path.join(basedir, 'data', filename))
-
def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -490,29 +465,18 @@ class ArvadosCollectionsTest(unittest.TestCase):
self.assertEqual(arvados.CollectionReader(m1).all_streams()[0].files()['md9sum.txt'].as_manifest(),
". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n")
- def build_directory_tree(self, tree=['basefile', 'subdir/subfile']):
- tree_root = self._make_tmpdir()
- for leaf in tree:
- path = os.path.join(tree_root, leaf)
- try:
- os.makedirs(os.path.dirname(path))
- except OSError as error:
- if error.errno != errno.EEXIST:
- raise
- with open(path, 'w') as tmpfile:
- tmpfile.write(leaf)
- return tree_root
-
def test_write_directory_tree(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree())
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']))
self.assertEqual(cwriter.manifest_text(),
""". c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
./subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
def test_write_named_directory_tree(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree(), 'root')
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), 'root')
self.assertEqual(
cwriter.manifest_text(),
"""./root c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
@@ -520,8 +484,8 @@ class ArvadosCollectionsTest(unittest.TestCase):
def test_write_directory_tree_in_one_stream(self):
cwriter = arvados.CollectionWriter()
- cwriter.write_directory_tree(self.build_directory_tree(),
- max_manifest_depth=0)
+ cwriter.write_directory_tree(self.build_directory_tree(
+ ['basefile', 'subdir/subfile']), max_manifest_depth=0)
self.assertEqual(cwriter.manifest_text(),
""". 4ace875ffdc6824a04950f06858f4465+22 0:8:basefile
./subdir 4ace875ffdc6824a04950f06858f4465+22 8:14:subfile\n""")
@@ -536,12 +500,6 @@ class ArvadosCollectionsTest(unittest.TestCase):
./d1 50170217e5b04312024aa5cd42934494+13 8:5:f2
./d1/d2 50170217e5b04312024aa5cd42934494+13 0:8:f3\n""")
- def make_test_file(self, text="test"):
- testfile = tempfile.NamedTemporaryFile()
- testfile.write(text)
- testfile.flush()
- return testfile
-
def test_write_one_file(self):
cwriter = arvados.CollectionWriter()
with self.make_test_file() as testfile:
@@ -630,7 +588,7 @@ class ArvadosCollectionsTest(unittest.TestCase):
def test_successful_resumes(self):
# FIXME: This is more of an integration test than a unit test.
cwriter = TestResumableWriter()
- source_tree = self.build_directory_tree()
+ source_tree = self.build_directory_tree(['basefile', 'subdir/subfile'])
with open(os.path.join(source_tree, 'long'), 'w') as longfile:
longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
cwriter.write_directory_tree(source_tree)
commit a1e2086ee001f2aaa26244c76f1ebbe1921e5682
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 22 16:11:10 2014 -0400
2752: Move arv-put functionality to arvados.commands.put.
This will make it easier to unit test arv-put, and make it easier for
developers to extend or customize its functionality.
diff --git a/sdk/python/arvados/commands/__init__.py b/sdk/python/arvados/commands/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
new file mode 100644
index 0000000..8082752
--- /dev/null
+++ b/sdk/python/arvados/commands/put.py
@@ -0,0 +1,215 @@
+#!/usr/bin/env python
+
+# TODO:
+# --md5sum - display md5 of each file as read from disk
+
+import argparse
+import arvados
+import os
+import sys
+
+def parse_arguments(arguments):
+ parser = argparse.ArgumentParser(
+ description='Copy data from the local filesystem to Keep.')
+
+ parser.add_argument('paths', metavar='path', type=str, nargs='*',
+ help="""
+ Local file or directory. Default: read from standard input.
+ """)
+
+ parser.add_argument('--max-manifest-depth', type=int, metavar='N',
+ default=-1, help="""
+ Maximum depth of directory tree to represent in the manifest
+ structure. A directory structure deeper than this will be represented
+ as a single stream in the manifest. If N=0, the manifest will contain
+ a single stream. Default: -1 (unlimited), i.e., exactly one manifest
+ stream per filesystem directory that contains files.
+ """)
+
+ group = parser.add_mutually_exclusive_group()
+
+ group.add_argument('--as-stream', action='store_true', dest='stream',
+ help="""
+ Synonym for --stream.
+ """)
+
+ group.add_argument('--stream', action='store_true',
+ help="""
+ Store the file content and display the resulting manifest on
+ stdout. Do not write the manifest to Keep or save a Collection object
+ in Arvados.
+ """)
+
+ group.add_argument('--as-manifest', action='store_true', dest='manifest',
+ help="""
+ Synonym for --manifest.
+ """)
+
+ group.add_argument('--in-manifest', action='store_true', dest='manifest',
+ help="""
+ Synonym for --manifest.
+ """)
+
+ group.add_argument('--manifest', action='store_true',
+ help="""
+ Store the file data and resulting manifest in Keep, save a Collection
+ object in Arvados, and display the manifest locator (Collection uuid)
+ on stdout. This is the default behavior.
+ """)
+
+ group.add_argument('--as-raw', action='store_true', dest='raw',
+ help="""
+ Synonym for --raw.
+ """)
+
+ group.add_argument('--raw', action='store_true',
+ help="""
+ Store the file content and display the data block locators on stdout,
+ separated by commas, with a trailing newline. Do not store a
+ manifest.
+ """)
+
+ parser.add_argument('--use-filename', type=str, default=None,
+ dest='filename', help="""
+ Synonym for --filename.
+ """)
+
+ parser.add_argument('--filename', type=str, default=None,
+ help="""
+ Use the given filename in the manifest, instead of the name of the
+ local file. This is useful when "-" or "/dev/stdin" is given as an
+ input file. It can be used only if there is exactly one path given and
+ it is not a directory. Implies --manifest.
+ """)
+
+ group = parser.add_mutually_exclusive_group()
+ group.add_argument('--progress', action='store_true',
+ help="""
+ Display human-readable progress on stderr (bytes and, if possible,
+ percentage of total data size). This is the default behavior when
+ stderr is a tty.
+ """)
+
+ group.add_argument('--no-progress', action='store_true',
+ help="""
+ Do not display human-readable progress on stderr, even if stderr is a
+ tty.
+ """)
+
+ group.add_argument('--batch-progress', action='store_true',
+ help="""
+ Display machine-readable progress on stderr (bytes and, if known,
+ total data size).
+ """)
+
+ args = parser.parse_args(arguments)
+
+ if len(args.paths) == 0:
+ args.paths += ['/dev/stdin']
+
+ if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
+ if args.filename:
+ parser.error("""
+ --filename argument cannot be used when storing a directory or
+ multiple files.
+ """)
+
+ # Turn on --progress by default if stderr is a tty.
+ if (not (args.batch_progress or args.no_progress)
+ and os.isatty(sys.stderr.fileno())):
+ args.progress = True
+
+ if args.paths == ['-']:
+ args.paths = ['/dev/stdin']
+ if not args.filename:
+ args.filename = '-'
+
+ return args
+
+class CollectionWriterWithProgress(arvados.CollectionWriter):
+ def flush_data(self, *args, **kwargs):
+ if not getattr(self, 'display_type', None):
+ return
+ if not hasattr(self, 'bytes_flushed'):
+ self.bytes_flushed = 0
+ self.bytes_flushed += self._data_buffer_len
+ super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
+ self.bytes_flushed -= self._data_buffer_len
+ if self.display_type == 'machine':
+ sys.stderr.write('%s %d: %d written %d total\n' %
+ (sys.argv[0],
+ os.getpid(),
+ self.bytes_flushed,
+ getattr(self, 'bytes_expected', -1)))
+ elif getattr(self, 'bytes_expected', 0) > 0:
+ pct = 100.0 * self.bytes_flushed / self.bytes_expected
+ sys.stderr.write('\r%dM / %dM %.1f%% ' %
+ (self.bytes_flushed >> 20,
+ self.bytes_expected >> 20, pct))
+ else:
+ sys.stderr.write('\r%d ' % self.bytes_flushed)
+
+ def manifest_text(self, *args, **kwargs):
+ manifest_text = (super(CollectionWriterWithProgress, self)
+ .manifest_text(*args, **kwargs))
+ if getattr(self, 'display_type', None):
+ if self.display_type == 'human':
+ sys.stderr.write('\n')
+ self.display_type = None
+ return manifest_text
+
+
+def main(arguments=None):
+ args = parse_arguments(arguments)
+
+ if args.progress:
+ writer = CollectionWriterWithProgress()
+ writer.display_type = 'human'
+ elif args.batch_progress:
+ writer = CollectionWriterWithProgress()
+ writer.display_type = 'machine'
+ else:
+ writer = arvados.CollectionWriter()
+
+ # Walk the given directory trees and stat files, adding up file sizes,
+ # so we can display progress as percent
+ writer.bytes_expected = 0
+ for path in args.paths:
+ if os.path.isdir(path):
+ for filename in arvados.util.listdir_recursive(path):
+ writer.bytes_expected += os.path.getsize(
+ os.path.join(path, filename))
+ elif not os.path.isfile(path):
+ del writer.bytes_expected
+ break
+ else:
+ writer.bytes_expected += os.path.getsize(path)
+
+ # Copy file data to Keep.
+ for path in args.paths:
+ if os.path.isdir(path):
+ writer.write_directory_tree(
+ path, max_manifest_depth=args.max_manifest_depth)
+ else:
+ writer.start_new_stream()
+ writer.write_file(path, args.filename or os.path.basename(path))
+
+ if args.stream:
+ print writer.manifest_text(),
+ elif args.raw:
+ writer.finish_current_stream()
+ print ','.join(writer.data_locators())
+ else:
+ # Register the resulting collection in Arvados.
+ arvados.api().collections().create(
+ body={
+ 'uuid': writer.finish(),
+ 'manifest_text': writer.manifest_text(),
+ },
+ ).execute()
+
+ # Print the locator (uuid) of the new collection.
+ print writer.finish()
+
+if __name__ == '__main__':
+ main()
diff --git a/sdk/python/bin/arv-put b/sdk/python/bin/arv-put
index 428689a..cdb831b 100755
--- a/sdk/python/bin/arv-put
+++ b/sdk/python/bin/arv-put
@@ -1,205 +1,4 @@
#!/usr/bin/env python
-# TODO:
-# --md5sum - display md5 of each file as read from disk
-
-import argparse
-import arvados
-import os
-import sys
-
-parser = argparse.ArgumentParser(
- description='Copy data from the local filesystem to Keep.')
-
-parser.add_argument('paths', metavar='path', type=str, nargs='*',
- help="""
-Local file or directory. Default: read from standard input.
-""")
-
-parser.add_argument('--max-manifest-depth', type=int, metavar='N', default=-1,
- help="""
-Maximum depth of directory tree to represent in the manifest
-structure. A directory structure deeper than this will be represented
-as a single stream in the manifest. If N=0, the manifest will contain
-a single stream. Default: -1 (unlimited), i.e., exactly one manifest
-stream per filesystem directory that contains files.
-""")
-
-group = parser.add_mutually_exclusive_group()
-
-group.add_argument('--as-stream', action='store_true', dest='stream',
- help="""
-Synonym for --stream.
-""")
-
-group.add_argument('--stream', action='store_true',
- help="""
-Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
-""")
-
-group.add_argument('--as-manifest', action='store_true', dest='manifest',
- help="""
-Synonym for --manifest.
-""")
-
-group.add_argument('--in-manifest', action='store_true', dest='manifest',
- help="""
-Synonym for --manifest.
-""")
-
-group.add_argument('--manifest', action='store_true',
- help="""
-Store the file data and resulting manifest in Keep, save a Collection
-object in Arvados, and display the manifest locator (Collection uuid)
-on stdout. This is the default behavior.
-""")
-
-group.add_argument('--as-raw', action='store_true', dest='raw',
- help="""
-Synonym for --raw.
-""")
-
-group.add_argument('--raw', action='store_true',
- help="""
-Store the file content and display the data block locators on stdout,
-separated by commas, with a trailing newline. Do not store a
-manifest.
-""")
-
-parser.add_argument('--use-filename', type=str, default=None, dest='filename',
- help="""
-Synonym for --filename.
-""")
-
-parser.add_argument('--filename', type=str, default=None,
- help="""
-Use the given filename in the manifest, instead of the name of the
-local file. This is useful when "-" or "/dev/stdin" is given as an
-input file. It can be used only if there is exactly one path given and
-it is not a directory. Implies --manifest.
-""")
-
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--progress', action='store_true',
- help="""
-Display human-readable progress on stderr (bytes and, if possible,
-percentage of total data size). This is the default behavior when
-stderr is a tty.
-""")
-
-group.add_argument('--no-progress', action='store_true',
- help="""
-Do not display human-readable progress on stderr, even if stderr is a
-tty.
-""")
-
-group.add_argument('--batch-progress', action='store_true',
- help="""
-Display machine-readable progress on stderr (bytes and, if known,
-total data size).
-""")
-
-args = parser.parse_args()
-
-if len(args.paths) == 0:
- args.paths += ['/dev/stdin']
-
-if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
- if args.filename:
- parser.error("""
---filename argument cannot be used when storing a directory or
-multiple files.
-""")
-
-# Turn on --progress by default if stderr is a tty.
-if (not (args.batch_progress or args.no_progress)
- and os.isatty(sys.stderr.fileno())):
- args.progress = True
-
-class CollectionWriterWithProgress(arvados.CollectionWriter):
- def flush_data(self, *args, **kwargs):
- if not getattr(self, 'display_type', None):
- return
- if not hasattr(self, 'bytes_flushed'):
- self.bytes_flushed = 0
- self.bytes_flushed += self._data_buffer_len
- super(CollectionWriterWithProgress, self).flush_data(*args, **kwargs)
- self.bytes_flushed -= self._data_buffer_len
- if self.display_type == 'machine':
- sys.stderr.write('%s %d: %d written %d total\n' %
- (sys.argv[0],
- os.getpid(),
- self.bytes_flushed,
- getattr(self, 'bytes_expected', -1)))
- elif getattr(self, 'bytes_expected', 0) > 0:
- pct = 100.0 * self.bytes_flushed / self.bytes_expected
- sys.stderr.write('\r%dM / %dM %.1f%% ' %
- (self.bytes_flushed >> 20,
- self.bytes_expected >> 20, pct))
- else:
- sys.stderr.write('\r%d ' % self.bytes_flushed)
-
- def manifest_text(self, *args, **kwargs):
- manifest_text = (super(CollectionWriterWithProgress, self)
- .manifest_text(*args, **kwargs))
- if getattr(self, 'display_type', None):
- if self.display_type == 'human':
- sys.stderr.write('\n')
- self.display_type = None
- return manifest_text
-
-if args.progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'human'
-elif args.batch_progress:
- writer = CollectionWriterWithProgress()
- writer.display_type = 'machine'
-else:
- writer = arvados.CollectionWriter()
-
-if args.paths == ['-']:
- args.paths = ['/dev/stdin']
- if not args.filename:
- args.filename = '-'
-
-# Walk the given directory trees and stat files, adding up file sizes,
-# so we can display progress as percent
-writer.bytes_expected = 0
-for path in args.paths:
- if os.path.isdir(path):
- for filename in arvados.util.listdir_recursive(path):
- writer.bytes_expected += os.path.getsize(
- os.path.join(path, filename))
- elif not os.path.isfile(path):
- del writer.bytes_expected
- break
- else:
- writer.bytes_expected += os.path.getsize(path)
-
-# Copy file data to Keep.
-for path in args.paths:
- if os.path.isdir(path):
- writer.write_directory_tree(path,
- max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.write_file(path, args.filename or os.path.basename(path))
-
-if args.stream:
- print writer.manifest_text(),
-elif args.raw:
- writer.finish_current_stream()
- print ','.join(writer.data_locators())
-else:
- # Register the resulting collection in Arvados.
- arvados.api().collections().create(
- body={
- 'uuid': writer.finish(),
- 'manifest_text': writer.manifest_text(),
- },
- ).execute()
-
- # Print the locator (uuid) of the new collection.
- print writer.finish()
+from arvados.commands.put import main
+main()
commit 2c1f18ce28b3cc765ccafaddae36baf2ca56e247
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 22 15:49:31 2014 -0400
2752: Move Python tests to dedicated subdirectory.
This better follows Python convention, and will let us build out our
test frameworks without cluttering the root source directories.
This change brings a small change to the way we run tests. Pass the
directory name to the discover command:
$ python -m unittest discover tests
diff --git a/sdk/python/testdata/1000G_ref_manifest b/sdk/python/tests/data/1000G_ref_manifest
similarity index 100%
rename from sdk/python/testdata/1000G_ref_manifest
rename to sdk/python/tests/data/1000G_ref_manifest
diff --git a/sdk/python/testdata/jlake_manifest b/sdk/python/tests/data/jlake_manifest
similarity index 100%
rename from sdk/python/testdata/jlake_manifest
rename to sdk/python/tests/data/jlake_manifest
diff --git a/sdk/python/run_test_server.py b/sdk/python/tests/run_test_server.py
similarity index 98%
rename from sdk/python/run_test_server.py
rename to sdk/python/tests/run_test_server.py
index dbb4ff0..ffa0f41 100644
--- a/sdk/python/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -10,8 +10,8 @@ import arvados.api
import shutil
import tempfile
-ARV_API_SERVER_DIR = '../../services/api'
-KEEP_SERVER_DIR = '../../services/keep'
+ARV_API_SERVER_DIR = '../../../services/api'
+KEEP_SERVER_DIR = '../../../services/keep'
SERVER_PID_PATH = 'tmp/pids/webrick-test.pid'
WEBSOCKETS_SERVER_PID_PATH = 'tmp/pids/passenger-test.pid'
diff --git a/sdk/python/test_collections.py b/sdk/python/tests/test_collections.py
similarity index 98%
rename from sdk/python/test_collections.py
rename to sdk/python/tests/test_collections.py
index d8cf8e9..f260b4e 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/tests/test_collections.py
@@ -219,6 +219,13 @@ class ArvadosCollectionsTest(unittest.TestCase):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
+ def data_file(self, filename):
+ try:
+ basedir = os.path.dirname(__file__)
+ except NameError:
+ basedir = '.'
+ return open(os.path.join(basedir, 'data', filename))
+
def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -254,11 +261,11 @@ class ArvadosCollectionsTest(unittest.TestCase):
./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
""")
- with open('testdata/1000G_ref_manifest') as f6:
+ with self.data_file('1000G_ref_manifest') as f6:
m6 = f6.read()
self.assertEqual(arvados.CollectionReader(m6).manifest_text(), m6)
- with open('testdata/jlake_manifest') as f7:
+ with self.data_file('jlake_manifest') as f7:
m7 = f7.read()
self.assertEqual(arvados.CollectionReader(m7).manifest_text(), m7)
diff --git a/sdk/python/test_keep_client.py b/sdk/python/tests/test_keep_client.py
similarity index 100%
rename from sdk/python/test_keep_client.py
rename to sdk/python/tests/test_keep_client.py
diff --git a/sdk/python/test_pipeline_template.py b/sdk/python/tests/test_pipeline_template.py
similarity index 100%
rename from sdk/python/test_pipeline_template.py
rename to sdk/python/tests/test_pipeline_template.py
diff --git a/sdk/python/test_util.py b/sdk/python/tests/test_util.py
similarity index 100%
rename from sdk/python/test_util.py
rename to sdk/python/tests/test_util.py
diff --git a/sdk/python/test_websockets.py b/sdk/python/tests/test_websockets.py
similarity index 100%
rename from sdk/python/test_websockets.py
rename to sdk/python/tests/test_websockets.py
diff --git a/services/fuse/run_test_server.py b/services/fuse/run_test_server.py
deleted file mode 120000
index 8d0a3b1..0000000
--- a/services/fuse/run_test_server.py
+++ /dev/null
@@ -1 +0,0 @@
-../../sdk/python/run_test_server.py
\ No newline at end of file
diff --git a/services/fuse/tests/run_test_server.py b/services/fuse/tests/run_test_server.py
new file mode 120000
index 0000000..76bcc16
--- /dev/null
+++ b/services/fuse/tests/run_test_server.py
@@ -0,0 +1 @@
+../../../sdk/python/tests/run_test_server.py
\ No newline at end of file
diff --git a/services/fuse/test_mount.py b/services/fuse/tests/test_mount.py
similarity index 100%
rename from services/fuse/test_mount.py
rename to services/fuse/tests/test_mount.py
commit 5ebc4a294ef91579fb8873e5cf76383831db1a3e
Author: Brett Smith <brett at curoverse.com>
Date: Thu May 22 13:19:27 2014 -0400
2752: Add arvados.collections.ResumableCollectionWriter.
This is a subclass of CollectionWriter that only accepts data from the
filesystem. In exchange, it can record its own state, and resume
writing from one of those states. arv-put will use this to make the
user experience nicer if a long upload is interrupted.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index cbf8a85..814bd75 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -19,6 +19,7 @@ import time
import threading
from collections import deque
+from stat import *
from keep import *
from stream import *
@@ -193,6 +194,11 @@ class CollectionWriter(object):
self._work_trees()
else:
break
+ self.checkpoint_state()
+
+ def checkpoint_state(self):
+ # Subclasses can implement this method to, e.g., report or record state.
+ pass
def _work_file(self):
while True:
@@ -208,6 +214,8 @@ class CollectionWriter(object):
def _work_dirents(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
while self._queued_dirents:
dirent = self._queued_dirents.popleft()
target = os.path.join(path, dirent)
@@ -242,7 +250,6 @@ class CollectionWriter(object):
def _queue_dirents(self, stream_name, dirents):
assert (not self._queued_dirents), "tried to queue more than one tree"
self._queued_dirents = deque(sorted(dirents))
- self.start_new_stream(stream_name)
def _queue_tree(self, path, stream_name, max_manifest_depth):
self._queued_trees.append((path, stream_name, max_manifest_depth))
@@ -273,6 +280,7 @@ class CollectionWriter(object):
self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
+ self.checkpoint_state()
def start_new_file(self, newfilename=None):
self.finish_current_file()
@@ -363,3 +371,86 @@ class CollectionWriter(object):
for name, locators, files in self._finished_streams:
ret += locators
return ret
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ def __init__(self):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_state(cls, state):
+ writer = cls()
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ "failed to reopen active file {}: {}".format(path, error))
+ writer._do_queued_work()
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in self._dependencies.items():
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError("{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ "failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError("{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError("{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as error:
+ raise errors.AssertionError(
+ "could not stat {}: {}".format(source, error))
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ "{} changed between open and stat calls".format(source))
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)
diff --git a/sdk/python/arvados/errors.py b/sdk/python/arvados/errors.py
index e4c69a3..f206c1d 100644
--- a/sdk/python/arvados/errors.py
+++ b/sdk/python/arvados/errors.py
@@ -14,3 +14,5 @@ class KeepWriteError(Exception):
pass
class NotImplementedError(Exception):
pass
+class StaleWriterStateError(Exception):
+ pass
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index f9236ee..d8cf8e9 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -4,14 +4,31 @@
import arvados
import bz2
+import copy
import errno
import os
+import pprint
import shutil
import subprocess
import sys
import tempfile
import unittest
+class TestResumableWriter(arvados.ResumableCollectionWriter):
+ KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
+
+ def __init__(self):
+ self.saved_states = []
+ return super(TestResumableWriter, self).__init__()
+
+ def checkpoint_state(self):
+ self.saved_states.append(self.dump_state(copy.deepcopy))
+
+ def last_state(self):
+ assert self.saved_states, "resumable writer did not save any state"
+ return self.saved_states[-1]
+
+
class ArvadosCollectionsTest(unittest.TestCase):
def _make_tmpdir(self):
self._tempdirs.append(tempfile.mkdtemp())
@@ -543,6 +560,97 @@ class ArvadosCollectionsTest(unittest.TestCase):
cwriter.manifest_text(),
". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
+ def test_checkpoint_after_put(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file(
+ 't' * (cwriter.KEEP_BLOCK_SIZE + 10)) as testfile:
+ testpath = os.path.realpath(testfile.name)
+ cwriter.write_file(testpath, 'test')
+ for state in cwriter.saved_states:
+ if state.get('_current_file') == (testpath,
+ cwriter.KEEP_BLOCK_SIZE):
+ break
+ else:
+ self.fail("can't find state immediately after PUT to Keep")
+ self.assertIn('d45107e93f9052fa88a82fc08bb1d316+1024', # 't' * 1024
+ state['_current_stream_locators'])
+
+ def test_basic_resume(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ last_state = cwriter.last_state()
+ resumed = TestResumableWriter.from_state(last_state)
+ self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
+ "resumed CollectionWriter had different manifest")
+
+ def test_resume_fails_when_missing_dependency(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_mtime_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ os.utime(testfile.name, (0, 0))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_is_nonfile(self):
+ cwriter = TestResumableWriter()
+ cwriter.write_file('/dev/null', 'empty')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_size_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ orig_mtime = os.fstat(testfile.fileno()).st_mtime
+ testfile.write('extra')
+ testfile.flush()
+ os.utime(testfile.name, (orig_mtime, orig_mtime))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_successful_resumes(self):
+ # FIXME: This is more of an integration test than a unit test.
+ cwriter = TestResumableWriter()
+ source_tree = self.build_directory_tree()
+ with open(os.path.join(source_tree, 'long'), 'w') as longfile:
+ longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
+ cwriter.write_directory_tree(source_tree)
+ # A state for each file, plus a fourth for mid-longfile.
+ self.assertGreater(len(cwriter.saved_states), 3,
+ "CollectionWriter didn't save enough states to test")
+
+ for state in cwriter.saved_states:
+ new_writer = TestResumableWriter.from_state(state)
+ manifests = [writer.manifest_text()
+ for writer in (cwriter, new_writer)]
+ self.assertEquals(
+ manifests[0], manifests[1],
+ "\n".join(["manifest mismatch after resuming from state:",
+ pprint.pformat(state), ""] + manifests))
+
+ def test_arbitrary_objects_not_resumable(self):
+ cwriter = TestResumableWriter()
+ with open('/dev/null') as badfile:
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write_file, badfile)
+
+ def test_arbitrary_writes_not_resumable(self):
+ cwriter = TestResumableWriter()
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write, "badtext")
+
if __name__ == '__main__':
unittest.main()
commit 4f925f51d97964e4b697609d4b3fbfbaaeb13979
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 12:14:07 2014 -0400
2752: Implement CollectionWriter with a work queue.
This will make it easier to capture and restore state.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index 71f30da..cbf8a85 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -18,6 +18,8 @@ import fcntl
import time
import threading
+from collections import deque
+
from keep import *
from stream import *
import config
@@ -157,6 +159,10 @@ class CollectionWriter(object):
self._current_file_name = None
self._current_file_pos = 0
self._finished_streams = []
+ self._close_file = None
+ self._queued_file = None
+ self._queued_dirents = deque()
+ self._queued_trees = deque()
def __enter__(self):
pass
@@ -164,38 +170,91 @@ class CollectionWriter(object):
def __exit__(self):
self.finish()
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self.start_new_stream(stream_name)
- todo = []
- if max_manifest_depth == 0:
- dirents = sorted(util.listdir_recursive(path))
- else:
- dirents = sorted(os.listdir(path))
- for dirent in dirents:
- target = os.path.join(path, dirent)
- if os.path.isdir(target):
- todo += [[target,
- os.path.join(stream_name, dirent),
- max_manifest_depth-1]]
+ def _do_queued_work(self):
+ # The work queue consists of three pieces:
+ # * _queued_file: The file object we're currently writing to the
+ # Collection.
+ # * _queued_dirents: Entries under the current directory
+ # (_queued_trees[0]) that we want to write or recurse through.
+ # This may contain files from subdirectories if
+ # max_manifest_depth == 0 for this directory.
+ # * _queued_trees: Directories that should be written as separate
+ # streams to the Collection.
+ # This function handles the smallest piece of work currently queued
+ # (current file, then current directory, then next directory) until
+ # no work remains. The _work_THING methods each do a unit of work on
+ # THING. _queue_THING methods add a THING to the work queue.
+ while True:
+ if self._queued_file:
+ self._work_file()
+ elif self._queued_dirents:
+ self._work_dirents()
+ elif self._queued_trees:
+ self._work_trees()
else:
- self.write_file(target, dirent)
- self.finish_current_stream()
- map(lambda x: self.write_directory_tree(*x), todo)
+ break
- def write_file(self, source, filename=None):
- if not hasattr(source, 'read'):
- with open(source, 'rb') as srcfile:
- return self.write_file(srcfile, filename)
- elif filename is None:
- filename = os.path.basename(source.name)
- self.start_new_file(filename)
+ def _work_file(self):
while True:
- buf = source.read(self.KEEP_BLOCK_SIZE)
+ buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
if not buf:
break
self.write(buf)
self.finish_current_file()
+ if self._close_file:
+ self._queued_file.close()
+ self._close_file = None
+ self._queued_file = None
+
+ def _work_dirents(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ while self._queued_dirents:
+ dirent = self._queued_dirents.popleft()
+ target = os.path.join(path, dirent)
+ if os.path.isdir(target):
+ self._queue_tree(target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth - 1)
+ else:
+ self._queue_file(target, dirent)
+ break
+ if not self._queued_dirents:
+ self._queued_trees.popleft()
+
+ def _work_trees(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
+ else os.listdir)
+ self._queue_dirents(stream_name, make_dirents(path))
+
+ def _queue_file(self, source, filename=None):
+ assert (self._queued_file is None), "tried to queue more than one file"
+ if not hasattr(source, 'read'):
+ source = open(source, 'rb')
+ self._close_file = True
+ else:
+ self._close_file = False
+ if filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ self._queued_file = source
+
+ def _queue_dirents(self, stream_name, dirents):
+ assert (not self._queued_dirents), "tried to queue more than one tree"
+ self._queued_dirents = deque(sorted(dirents))
+ self.start_new_stream(stream_name)
+
+ def _queue_tree(self, path, stream_name, max_manifest_depth):
+ self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+ def write_file(self, source, filename=None):
+ self._queue_file(source, filename)
+ self._do_queued_work()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self._queue_tree(path, stream_name, max_manifest_depth)
+ self._do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):
commit a9e2ae4879cb766ce2d902842a8c89122f3f87f8
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 11:29:44 2014 -0400
2752: Reuse CollectionWriter.write_file() where possible.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index ef9626e..71f30da 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -179,13 +179,7 @@ class CollectionWriter(object):
os.path.join(stream_name, dirent),
max_manifest_depth-1]]
else:
- self.start_new_file(dirent)
- with open(target, 'rb') as f:
- while True:
- buf = f.read(2**26)
- if len(buf) == 0:
- break
- self.write(buf)
+ self.write_file(target, dirent)
self.finish_current_stream()
map(lambda x: self.write_directory_tree(*x), todo)
diff --git a/sdk/python/bin/arv-put b/sdk/python/bin/arv-put
index 1f7ad60..428689a 100755
--- a/sdk/python/bin/arv-put
+++ b/sdk/python/bin/arv-put
@@ -185,13 +185,7 @@ for path in args.paths:
max_manifest_depth=args.max_manifest_depth)
else:
writer.start_new_stream()
- writer.start_new_file(args.filename or os.path.split(path)[1])
- with open(path, 'rb') as f:
- while True:
- buf = f.read(2**26)
- if len(buf) == 0:
- break
- writer.write(buf)
+ writer.write_file(path, args.filename or os.path.basename(path))
if args.stream:
print writer.manifest_text(),
commit dca048f0ea4dd6f4aedbd6dce774d6bd7956e780
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 11:15:00 2014 -0400
2752: Add method CollectionWriter.write_file().
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index f736475..ef9626e 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -189,6 +189,20 @@ class CollectionWriter(object):
self.finish_current_stream()
map(lambda x: self.write_directory_tree(*x), todo)
+ def write_file(self, source, filename=None):
+ if not hasattr(source, 'read'):
+ with open(source, 'rb') as srcfile:
+ return self.write_file(srcfile, filename)
+ elif filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ while True:
+ buf = source.read(self.KEEP_BLOCK_SIZE)
+ if not buf:
+ break
+ self.write(buf)
+ self.finish_current_file()
+
def write(self, newdata):
if hasattr(newdata, '__iter__'):
for s in newdata:
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index feaf93b..f9236ee 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -512,6 +512,37 @@ class ArvadosCollectionsTest(unittest.TestCase):
./d1 50170217e5b04312024aa5cd42934494+13 8:5:f2
./d1/d2 50170217e5b04312024aa5cd42934494+13 0:8:f3\n""")
+ def make_test_file(self, text="test"):
+ testfile = tempfile.NamedTemporaryFile()
+ testfile.write(text)
+ testfile.flush()
+ return testfile
+
+ def test_write_one_file(self):
+ cwriter = arvados.CollectionWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name)
+ self.assertEqual(
+ cwriter.manifest_text(),
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:{}\n".format(
+ os.path.basename(testfile.name)))
+
+ def test_write_named_file(self):
+ cwriter = arvados.CollectionWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'foo')
+ self.assertEqual(cwriter.manifest_text(),
+ ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:foo\n")
+
+ def test_write_multiple_files(self):
+ cwriter = arvados.CollectionWriter()
+ for letter in 'ABC':
+ with self.make_test_file(letter) as testfile:
+ cwriter.write_file(testfile.name, letter)
+ self.assertEqual(
+ cwriter.manifest_text(),
+ ". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
+
if __name__ == '__main__':
unittest.main()
commit c5f68686b97b90de5698a813add655045f02a62e
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 11:12:56 2014 -0400
2752: Remove trailing whitespace in arv-put.
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index fb3dea4..f736475 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -56,9 +56,9 @@ def normalize_stream(s, stream):
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
if len(stream[f]) == 0:
- stream_tokens.append("0:0:{0}".format(fout))
+ stream_tokens.append("0:0:{0}".format(fout))
- return stream_tokens
+ return stream_tokens
def normalize(collection):
streams = {}
@@ -285,7 +285,7 @@ class CollectionWriter(object):
manifest += ' ' + ' '.join(stream[1])
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
-
+
#print 'writer',manifest
#print 'after reader',CollectionReader(manifest).manifest_text()
commit de46e684ce822099055a185d3fbaa7d7e04d890e
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 10:25:18 2014 -0400
2752: Add tests for CollectionWriter.write_directory_tree().
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 6b46fc9..feaf93b 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -4,6 +4,7 @@
import arvados
import bz2
+import errno
import os
import shutil
import subprocess
@@ -12,12 +13,18 @@ import tempfile
import unittest
class ArvadosCollectionsTest(unittest.TestCase):
+ def _make_tmpdir(self):
+ self._tempdirs.append(tempfile.mkdtemp())
+ return self._tempdirs[-1]
+
def setUp(self):
self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
- os.environ['KEEP_LOCAL_STORE'] = tempfile.mkdtemp()
+ self._tempdirs = []
+ os.environ['KEEP_LOCAL_STORE'] = self._make_tmpdir()
def tearDown(self):
- shutil.rmtree(os.environ['KEEP_LOCAL_STORE'], ignore_errors=True)
+ for workdir in self._tempdirs:
+ shutil.rmtree(workdir, ignore_errors=True)
if self._orig_keep_local_store is None:
del os.environ['KEEP_LOCAL_STORE']
else:
@@ -459,6 +466,52 @@ class ArvadosCollectionsTest(unittest.TestCase):
self.assertEqual(arvados.CollectionReader(m1).all_streams()[0].files()['md9sum.txt'].as_manifest(),
". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n")
+ def build_directory_tree(self, tree=['basefile', 'subdir/subfile']):
+ tree_root = self._make_tmpdir()
+ for leaf in tree:
+ path = os.path.join(tree_root, leaf)
+ try:
+ os.makedirs(os.path.dirname(path))
+ except OSError as error:
+ if error.errno != errno.EEXIST:
+ raise
+ with open(path, 'w') as tmpfile:
+ tmpfile.write(leaf)
+ return tree_root
+
+ def test_write_directory_tree(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree())
+ self.assertEqual(cwriter.manifest_text(),
+ """. c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
+./subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
+
+ def test_write_named_directory_tree(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree(), 'root')
+ self.assertEqual(
+ cwriter.manifest_text(),
+ """./root c5110c5ac93202d8e0f9e381f22bac0f+8 0:8:basefile
+./root/subdir 1ca4dec89403084bf282ad31e6cf7972+14 0:14:subfile\n""")
+
+ def test_write_directory_tree_in_one_stream(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(self.build_directory_tree(),
+ max_manifest_depth=0)
+ self.assertEqual(cwriter.manifest_text(),
+ """. 4ace875ffdc6824a04950f06858f4465+22 0:8:basefile
+./subdir 4ace875ffdc6824a04950f06858f4465+22 8:14:subfile\n""")
+
+ def test_write_directory_tree_with_limited_recursion(self):
+ cwriter = arvados.CollectionWriter()
+ cwriter.write_directory_tree(
+ self.build_directory_tree(['f1', 'd1/f2', 'd1/d2/f3']),
+ max_manifest_depth=1)
+ self.assertEqual(cwriter.manifest_text(),
+ """. bd19836ddb62c11c55ab251ccaca5645+2 0:2:f1
+./d1 50170217e5b04312024aa5cd42934494+13 8:5:f2
+./d1/d2 50170217e5b04312024aa5cd42934494+13 0:8:f3\n""")
+
if __name__ == '__main__':
unittest.main()
commit 89ae22bf5f939a8f2132addab60ba29df58c2152
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 09:47:27 2014 -0400
2752: Clean up after Python SDK Collections tests.
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 5c31f12..6b46fc9 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -2,16 +2,26 @@
#
# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-import unittest
import arvados
-import os
import bz2
-import sys
+import os
+import shutil
import subprocess
+import sys
+import tempfile
+import unittest
class ArvadosCollectionsTest(unittest.TestCase):
def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
+ self._orig_keep_local_store = os.environ.get('KEEP_LOCAL_STORE')
+ os.environ['KEEP_LOCAL_STORE'] = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(os.environ['KEEP_LOCAL_STORE'], ignore_errors=True)
+ if self._orig_keep_local_store is None:
+ del os.environ['KEEP_LOCAL_STORE']
+ else:
+ os.environ['KEEP_LOCAL_STORE'] = self._orig_keep_local_store
def write_foo_bar_baz(self):
cw = arvados.CollectionWriter()
commit 3da7843d8ca43e1e3ef71b33f37f43c33add7b3e
Author: Brett Smith <brett at curoverse.com>
Date: Tue May 20 09:40:53 2014 -0400
2752: Refactor Python SDK test_collections.
This eliminates duplication in setup and teardown code, and clarifies
dependencies between tests a bit.
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 7df620d..5c31f12 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -9,17 +9,11 @@ import bz2
import sys
import subprocess
-class KeepLocalStoreTest(unittest.TestCase):
+class ArvadosCollectionsTest(unittest.TestCase):
def setUp(self):
os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
- self.assertEqual(arvados.Keep.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
- self.assertEqual(arvados.Keep.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
-class LocalCollectionWriterTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def write_foo_bar_baz(self):
cw = arvados.CollectionWriter()
self.assertEqual(cw.current_stream_name(), '.',
'current_stream_name() should be "." now')
@@ -32,16 +26,19 @@ class LocalCollectionWriterTest(unittest.TestCase):
cw.start_new_stream('baz')
cw.write('baz')
cw.set_current_file_name('baz.txt')
- hash = cw.finish()
- self.assertEqual(hash,
+ return cw.finish()
+
+ def test_keep_local_store(self):
+ self.assertEqual(arvados.Keep.put('foo'), 'acbd18db4cc2f85cedef654fccc4a4d8+3', 'wrong md5 hash from Keep.put')
+ self.assertEqual(arvados.Keep.get('acbd18db4cc2f85cedef654fccc4a4d8+3'), 'foo', 'wrong data from Keep.get')
+
+ def test_local_collection_writer(self):
+ self.assertEqual(self.write_foo_bar_baz(),
'd6c3b8e571f1b81ebb150a45ed06c884+114',
- "resulting manifest hash was {0}, expecting d6c3b8e571f1b81ebb150a45ed06c884+114".format(hash))
+ "wrong locator hash for files foo, bar, baz")
-class LocalCollectionReaderTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- LocalCollectionWriterTest().runTest()
- def runTest(self):
+ def test_local_collection_reader(self):
+ self.write_foo_bar_baz()
cr = arvados.CollectionReader('d6c3b8e571f1b81ebb150a45ed06c884+114+Xzizzle')
got = []
for s in cr.all_streams():
@@ -63,34 +60,7 @@ class LocalCollectionReaderTest(unittest.TestCase):
'',
'reading zero bytes should have returned empty string')
-class LocalCollectionManifestSubsetTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- LocalCollectionWriterTest().runTest()
- def runTest(self):
- self._runTest('d6c3b8e571f1b81ebb150a45ed06c884+114',
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo'],
- [3, './baz', 'baz.txt', 'baz']])
- self._runTest((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[3, '.', 'bar.txt', 'bar'],
- [3, '.', 'foo.txt', 'foo']])
- self._runTest((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[2, '.', 'fo.txt', 'fo'],
- [4, '.', 'obar.txt', 'obar']])
- self._runTest((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
- (arvados.Keep.put("foo"),
- arvados.Keep.put("bar"))),
- [[2, '.', 'ar.txt', 'ar'],
- [2, '.', 'fo.txt', 'fo'],
- [2, '.', 'ob.txt', 'ob'],
- [0, '.', 'zero.txt', '']])
-
- def _runTest(self, collection, expected):
+ def _test_subset(self, collection, expected):
cr = arvados.CollectionReader(collection)
for s in cr.all_streams():
for ex in expected:
@@ -101,10 +71,31 @@ class LocalCollectionManifestSubsetTest(unittest.TestCase):
ex,
'all_files|as_manifest did not preserve manifest contents: got %s expected %s' % (got, ex))
-class LocalCollectionReadlineTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def _runTest(self, what_in, what_out):
+ def test_collection_manifest_subset(self):
+ self.write_foo_bar_baz()
+ self._test_subset('d6c3b8e571f1b81ebb150a45ed06c884+114',
+ [[3, '.', 'bar.txt', 'bar'],
+ [3, '.', 'foo.txt', 'foo'],
+ [3, './baz', 'baz.txt', 'baz']])
+ self._test_subset((". %s %s 0:3:foo.txt 3:3:bar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[3, '.', 'bar.txt', 'bar'],
+ [3, '.', 'foo.txt', 'foo']])
+ self._test_subset((". %s %s 0:2:fo.txt 2:4:obar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[2, '.', 'fo.txt', 'fo'],
+ [4, '.', 'obar.txt', 'obar']])
+ self._test_subset((". %s %s 0:2:fo.txt 2:0:zero.txt 2:2:ob.txt 4:2:ar.txt\n" %
+ (arvados.Keep.put("foo"),
+ arvados.Keep.put("bar"))),
+ [[2, '.', 'ar.txt', 'ar'],
+ [2, '.', 'fo.txt', 'fo'],
+ [2, '.', 'ob.txt', 'ob'],
+ [0, '.', 'zero.txt', '']])
+
+ def _test_readline(self, what_in, what_out):
cw = arvados.CollectionWriter()
cw.start_new_file('test.txt')
cw.write(what_in)
@@ -116,16 +107,14 @@ class LocalCollectionReadlineTest(unittest.TestCase):
self.assertEqual(got,
what_out,
"readlines did not split lines correctly: %s" % got)
- def runTest(self):
- self._runTest("\na\nbcd\n\nefg\nz",
- ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
- self._runTest("ab\ncd\n",
- ["ab\n", "cd\n"])
-class LocalCollectionEmptyFileTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def test_collection_readline(self):
+ self._test_readline("\na\nbcd\n\nefg\nz",
+ ["\n", "a\n", "bcd\n", "\n", "efg\n", "z"])
+ self._test_readline("ab\ncd\n",
+ ["ab\n", "cd\n"])
+
+ def test_collection_empty_file(self):
cw = arvados.CollectionWriter()
cw.start_new_file('zero.txt')
cw.write('')
@@ -149,10 +138,7 @@ class LocalCollectionEmptyFileTest(unittest.TestCase):
got_sizes += [f.size()]
self.assertEqual(got_sizes, expect_sizes, "got wrong file sizes %s, expected %s" % (got_sizes, expect_sizes))
-class LocalCollectionBZ2DecompressionTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def test_collection_bz2_decompression(self):
n_lines_in = 2**18
data_in = "abc\n"
for x in xrange(0, 18):
@@ -173,10 +159,7 @@ class LocalCollectionBZ2DecompressionTest(unittest.TestCase):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
-class LocalCollectionGzipDecompressionTest(unittest.TestCase):
- def setUp(self):
- os.environ['KEEP_LOCAL_STORE'] = '/tmp'
- def runTest(self):
+ def test_collection_gzip_decompression(self):
n_lines_in = 2**18
data_in = "abc\n"
for x in xrange(0, 18):
@@ -202,8 +185,7 @@ class LocalCollectionGzipDecompressionTest(unittest.TestCase):
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
-class NormalizedCollectionTest(unittest.TestCase):
- def runTest(self):
+ def test_normalized_collection(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
@@ -250,8 +232,7 @@ class NormalizedCollectionTest(unittest.TestCase):
"""
self.assertEqual(arvados.CollectionReader(m8).manifest_text(), m8)
-class LocatorsAndRangesTest(unittest.TestCase):
- def runTest(self):
+ def test_locators_and_ranges(self):
blocks2 = [['a', 10, 0],
['b', 10, 10],
['c', 10, 20],
@@ -284,7 +265,7 @@ class LocatorsAndRangesTest(unittest.TestCase):
self.assertEqual(arvados.locators_and_ranges(blocks2, 49, 2), [['e', 10, 9, 1], ['f', 10, 0, 1]])
self.assertEqual(arvados.locators_and_ranges(blocks2, 59, 2), [['f', 10, 9, 1]])
-
+
blocks3 = [['a', 10, 0],
['b', 10, 10],
['c', 10, 20],
@@ -309,14 +290,14 @@ class LocatorsAndRangesTest(unittest.TestCase):
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 5), [['a', 10, 0, 5]])
self.assertEqual(arvados.locators_and_ranges(blocks, 3, 5), [['a', 10, 3, 5]])
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 10), [['a', 10, 0, 10]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 11), [['a', 10, 0, 10],
['b', 15, 0, 1]])
self.assertEqual(arvados.locators_and_ranges(blocks, 1, 11), [['a', 10, 1, 9],
['b', 15, 0, 2]])
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 25), [['a', 10, 0, 10],
['b', 15, 0, 15]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 30), [['a', 10, 0, 10],
['b', 15, 0, 15],
['c', 5, 0, 5]])
@@ -326,25 +307,24 @@ class LocatorsAndRangesTest(unittest.TestCase):
self.assertEqual(arvados.locators_and_ranges(blocks, 0, 31), [['a', 10, 0, 10],
['b', 15, 0, 15],
['c', 5, 0, 5]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 15, 5), [['b', 15, 5, 5]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 8, 17), [['a', 10, 8, 2],
['b', 15, 0, 15]])
self.assertEqual(arvados.locators_and_ranges(blocks, 8, 20), [['a', 10, 8, 2],
['b', 15, 0, 15],
['c', 5, 0, 3]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 26, 2), [['c', 5, 1, 2]])
-
+
self.assertEqual(arvados.locators_and_ranges(blocks, 9, 15), [['a', 10, 9, 1],
- ['b', 15, 0, 14]])
+ ['b', 15, 0, 14]])
self.assertEqual(arvados.locators_and_ranges(blocks, 10, 15), [['b', 15, 0, 15]])
self.assertEqual(arvados.locators_and_ranges(blocks, 11, 15), [['b', 15, 1, 14],
['c', 5, 0, 1]])
-class FileStreamTest(unittest.TestCase):
class MockStreamReader(object):
def __init__(self, content):
self.content = content
@@ -352,13 +332,13 @@ class FileStreamTest(unittest.TestCase):
def readfrom(self, start, size):
return self.content[start:start+size]
- def runTest(self):
+ def test_file_stream(self):
content = 'abcdefghijklmnopqrstuvwxyz0123456789'
- msr = FileStreamTest.MockStreamReader(content)
+ msr = self.MockStreamReader(content)
segments = [[0, 10, 0],
[10, 15, 10],
[25, 5, 25]]
-
+
sfr = arvados.StreamFileReader(msr, segments, "test")
self.assertEqual(sfr.name(), "test")
@@ -389,7 +369,7 @@ class FileStreamTest(unittest.TestCase):
segments = [[26, 10, 0],
[0, 15, 10],
[15, 5, 25]]
-
+
sfr = arvados.StreamFileReader(msr, segments, "test")
self.assertEqual(sfr.size(), 30)
@@ -417,8 +397,6 @@ class FileStreamTest(unittest.TestCase):
self.assertEqual(sfr.tell(), 30)
-class StreamReaderTest(unittest.TestCase):
-
class MockKeep(object):
def __init__(self, content):
self.content = content
@@ -426,11 +404,11 @@ class StreamReaderTest(unittest.TestCase):
def get(self, locator):
return self.content[locator]
- def runTest(self):
- keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
- 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
+ def test_stream_reader(self):
+ keepblocks = {'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10': 'abcdefghij',
+ 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15': 'klmnopqrstuvwxy',
'cccccccccccccccccccccccccccccccc+5': 'z0123'}
- mk = StreamReaderTest.MockKeep(keepblocks)
+ mk = self.MockKeep(keepblocks)
sr = arvados.StreamReader([".", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+10", "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15", "cccccccccccccccccccccccccccccccc+5", "0:30:foo"], mk)
@@ -450,8 +428,7 @@ class StreamReaderTest(unittest.TestCase):
self.assertEqual(sr.readfrom(25, 5), content[25:30])
self.assertEqual(sr.readfrom(30, 5), '')
-class ExtractFileTest(unittest.TestCase):
- def runTest(self):
+ def test_extract_file(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md6sum.txt
. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md7sum.txt
@@ -471,3 +448,7 @@ class ExtractFileTest(unittest.TestCase):
". 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md7sum.txt\n")
self.assertEqual(arvados.CollectionReader(m1).all_streams()[0].files()['md9sum.txt'].as_manifest(),
". 085c37f02916da1cad16f93c54d899b7+41 5348b82a029fd9e971a811ce1f71360b+43 8b22da26f9f433dea0a10e5ec66d73ba+43 40:80:md9sum.txt\n")
+
+
+if __name__ == '__main__':
+ unittest.main()
commit 9f184376420682b762d312995bf3f40886a5818e
Author: Brett Smith <brett at curoverse.com>
Date: Mon May 19 14:10:41 2014 -0400
2752: Clean up arv-put imports.
diff --git a/sdk/python/bin/arv-put b/sdk/python/bin/arv-put
index 08b3f3d..1f7ad60 100755
--- a/sdk/python/bin/arv-put
+++ b/sdk/python/bin/arv-put
@@ -4,6 +4,7 @@
# --md5sum - display md5 of each file as read from disk
import argparse
+import arvados
import os
import sys
@@ -117,11 +118,6 @@ if (not (args.batch_progress or args.no_progress)
and os.isatty(sys.stderr.fileno())):
args.progress = True
-
-import arvados
-import re
-import string
-
class CollectionWriterWithProgress(arvados.CollectionWriter):
def flush_data(self, *args, **kwargs):
if not getattr(self, 'display_type', None):
@@ -201,7 +197,7 @@ if args.stream:
print writer.manifest_text(),
elif args.raw:
writer.finish_current_stream()
- print string.join(writer.data_locators(), ',')
+ print ','.join(writer.data_locators())
else:
# Register the resulting collection in Arvados.
arvados.api().collections().create(
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list