[ARVADOS] updated: dcb4db28681b6949a56a1de579891cb375c423fe
Git user
git at public.curoverse.com
Wed Jul 6 23:21:14 EDT 2016
Summary of changes:
sdk/python/arvados/commands/put.py | 329 ++++++++++++++++++++++++++++++-------
sdk/python/tests/test_arv_put.py | 63 +++++--
2 files changed, 318 insertions(+), 74 deletions(-)
via dcb4db28681b6949a56a1de579891cb375c423fe (commit)
from 3847357436978a97470e0769f3200d354a2bd08e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit dcb4db28681b6949a56a1de579891cb375c423fe
Author: Lucas Di Pentima <lucas at curoverse.com>
Date: Thu Jul 7 00:20:58 2016 -0300
9463: Polishing the last details to make the integration tests work ok
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 6e216ee..6bb1a0b 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -310,11 +310,10 @@ class ArvPutCollectionCache(object):
def _save(self):
"""
- Atomically save (create temp file & rename() it)
+ Atomically save
"""
# TODO: Should be a good idea to avoid _save() spamming? when writing
# lots of small files.
- print "SAVE START"
try:
new_cache_fd, new_cache_name = tempfile.mkstemp(
dir=os.path.dirname(self.filename))
@@ -323,23 +322,24 @@ class ArvPutCollectionCache(object):
json.dump(self.data, new_cache)
os.rename(new_cache_name, self.filename)
except (IOError, OSError, ResumeCacheConflict) as error:
- print "SAVE ERROR: %s" % error
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
pass
else:
- print "SAVE DONE!! %s" % self.filename
self.cache_file.close()
self.cache_file = new_cache
def file_uploaded(self, path):
+ print "About to register an uploaded file: %s" % path
if path in self.files.keys():
self.data['uploaded'][path] = self.files[path]
self._save()
+ print "Already registered the uploaded file!"
- def set_collection(self, uuid):
- self.data['col_locator'] = uuid
+ def set_collection(self, loc):
+ self.data['col_locator'] = loc
+ self._save()
def collection(self):
return self.data['col_locator']
@@ -406,27 +406,41 @@ class ArvPutCollectionCache(object):
self.cache_file.close()
def destroy(self):
- # try:
- # os.unlink(self.filename)
- # except OSError as error:
- # if error.errno != errno.ENOENT: # That's what we wanted anyway.
- # raise
+ try:
+ os.unlink(self.filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT: # That's what we wanted anyway.
+ raise
self.close()
class ArvPutUploader(object):
- def __init__(self, paths):
+ def __init__(self, paths, reporter=None):
+ expected_bytes = expected_bytes_for(paths)
self.cache = ArvPutCollectionCache(paths)
- if self.cache.collection() is not None:
- self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
- else:
- self.collection = ArvPutCollection(cache=self.cache)
- self.cache.set_collection(self.collection.manifest_locator())
- for p in paths:
- if os.path.isdir(p):
- self.collection.write_directory_tree(p)
- elif os.path.isfile(p):
- self.collection.write_file(p)
- self.cache.destroy()
+ self.paths = paths
+ self.already_uploaded = False
+ # if self.cache.collection() is not None:
+ # self.collection = ArvPutCollection(
+ # locator=self.cache.collection(),
+ # cache=self.cache,
+ # reporter=reporter,
+ # bytes_expected=expected_bytes)
+ # else:
+ self.collection = ArvPutCollection(
+ cache=self.cache,
+ reporter=reporter,
+ bytes_expected=expected_bytes)
+ # self.cache.set_collection(self.collection.manifest_locator())
+
+ def do_upload(self):
+ if not self.already_uploaded:
+ for p in paths:
+ if os.path.isdir(p):
+ self.collection.write_directory_tree(p)
+ elif os.path.isfile(p):
+ self.collection.write_file(p, os.path.basename(p))
+ self.cache.destroy()
+ self.already_uploaded = True
def manifest(self):
return self.collection.manifest()
@@ -436,24 +450,69 @@ class ArvPutUploader(object):
class ArvPutCollection(object):
- def __init__(self, locator=None, cache=None, reporter=None,
- bytes_expected=None, **kwargs):
- self.collection_flush_time = 60
+ def __init__(self, cache=None, reporter=None, bytes_expected=None,
+ name=None, owner_uuid=None, ensure_unique_name=False,
+ num_retries=None, replication=None):
+ self.collection_flush_time = 60 # Secs
self.bytes_written = 0
- self._seen_inputs = []
self.cache = cache
self.reporter = reporter
+ self.num_retries=num_retries
self.bytes_expected = bytes_expected
+ locator = self.cache.collection() if self.cache else None
+
if locator is None:
self.collection = arvados.collection.Collection()
- self.collection.save_new()
+ self.collection.save_new(name=name, owner_uuid=owner_uuid,
+ ensure_unique_name=ensure_unique_name,
+ num_retries=num_retries)
+ if self.cache:
+ self.cache.set_collection(self.collection.manifest_locator())
else:
self.collection = arvados.collection.Collection(locator)
+ def save(self):
+ self.collection.save(num_retries=self.num_retries)
+
def manifest_locator(self):
return self.collection.manifest_locator()
-
+
+ def portable_data_hash(self):
+ return self.collectin.portable_data_hash()
+
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ return self.collection.manifest_text(stream_name, strip, normalize)
+
+ def _write(self, source_fd, output, first_block=True):
+ start_time = time.time()
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ if not data:
+ break
+ output.write(data)
+ output.flush() # Commit block to Keep
+ self.bytes_written += len(data)
+ # Is it time to update the collection?
+ if (time.time() - start_time) > self.collection_flush_time:
+ self.collection.save(num_retries=self.num_retries)
+ start_time = time.time()
+ # Once a block is written on each file, mark it as uploaded on the cache
+ if first_block:
+ if self.cache:
+ self.cache.file_uploaded(source_fd.name)
+ self.collection.save(num_retries=self.num_retries)
+ print "FLUSHED COLLECTION!!!"
+ first_block = False
+ self.report_progress()
+
+ def write_stdin(self, filename):
+ with self.collection as c:
+ output = c.open(filename, 'w')
+ self._write(sys.stdin, output)
+ output.close()
+ self.collection.save()
+
def write_file(self, source, filename):
if self.cache and source in self.cache.dirty_files():
print "DIRTY: Removing file %s from collection to be uploaded again" % source
@@ -461,23 +520,30 @@ class ArvPutCollection(object):
resume_offset = 0
resume_upload = False
-
- print "FIND file %s" % filename
- if self.collection.find(filename):
+ try:
+ print "FIND file %s" % filename
+ collection_file = self.collection.find(filename)
+ except IOError:
+ # Not found
+ collection_file = None
+
+ if collection_file:
print "File %s already in the collection, checking!" % source
- if os.path.getsize(source) == self.collection.find(filename).size():
+ if os.path.getsize(source) == collection_file.size():
print "WARNING: file %s already uploaded, skipping!" % source
# File already there, skip it.
+ self.bytes_written += os.path.getsize(source)
return
- elif os.path.getsize(source) > self.collection.find(filename).size():
+ elif os.path.getsize(source) > collection_file.size():
print "WARNING: RESUMING file %s" % source
# File partially uploaded, resume!
resume_upload = True
- resume_offset = self.collection.find(filename).size()
+ resume_offset = collection_file.size()
+ self.bytes_written += resume_offset
else:
# Source file smaller than uploaded file, what happened here?
# TODO: Raise exception of some kind?
- pass
+ return
with open(source, 'r') as source_fd:
with self.collection as c:
@@ -490,29 +556,12 @@ class ArvPutCollection(object):
print "Writing file, source: %s, filename: %s" % (source, filename)
output = c.open(filename, 'w')
first_block = True
-
- start_time = time.time()
- while True:
- data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
- if not data:
- break
- output.write(data)
- output.flush() # Commit block to Keep
- self.bytes_written += len(data)
- # Is it time to update the collection?
- if (time.time() - start_time) > self.collection_flush_time:
- self.collection.save()
- start_time = time.time()
- # Once a block is written on each file, mark it as uploaded on the cache
- if first_block:
- if self.cache:
- self.cache.file_uploaded(source)
- first_block = False
- # File write finished
+
+ self._write(source_fd, output, first_block)
output.close()
self.collection.save() # One last save...
- def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+ def write_directory_tree(self, path, stream_name='.'):
if os.path.isdir(path):
for item in os.listdir(path):
print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
@@ -655,6 +704,172 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
raise ValueError("Not a valid project UUID: {}".format(project_uuid))
return query.execute(num_retries=num_retries)['uuid']
+def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+ global api_client
+
+ args = parse_arguments(arguments)
+ status = 0
+ if api_client is None:
+ api_client = arvados.api('v1')
+
+ # Determine the name to use
+ if args.name:
+ if args.stream or args.raw:
+ print >>stderr, "Cannot use --name with --stream or --raw"
+ sys.exit(1)
+ collection_name = args.name
+ else:
+ collection_name = "Saved at {} by {}@{}".format(
+ datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
+ pwd.getpwuid(os.getuid()).pw_name,
+ socket.gethostname())
+
+ if args.project_uuid and (args.stream or args.raw):
+ print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+ sys.exit(1)
+
+ # Determine the parent project
+ try:
+ project_uuid = desired_project_uuid(api_client, args.project_uuid,
+ args.retries)
+ except (apiclient_errors.Error, ValueError) as error:
+ print >>stderr, error
+ sys.exit(1)
+
+ # write_copies diverges from args.replication here.
+ # args.replication is how many copies we will instruct Arvados to
+ # maintain (by passing it in collections().create()) after all
+ # data is written -- and if None was given, we'll use None there.
+ # Meanwhile, write_copies is how many copies of each data block we
+ # write to Keep, which has to be a number.
+ #
+ # If we simply changed args.replication from None to a default
+ # here, we'd end up erroneously passing the default replication
+ # level (instead of None) to collections().create().
+ write_copies = (args.replication or
+ api_client._rootDesc.get('defaultCollectionReplication', 2))
+
+ if args.progress:
+ reporter = progress_writer(human_progress)
+ elif args.batch_progress:
+ reporter = progress_writer(machine_progress)
+ else:
+ reporter = None
+ bytes_expected = expected_bytes_for(args.paths)
+
+ resume_cache = None
+ if args.resume:
+ try:
+ resume_cache = ArvPutCollectionCache(args.paths)
+ except (IOError, OSError, ValueError):
+ pass # Couldn't open cache directory/file. Continue without it.
+ except ResumeCacheConflict:
+ print >>stderr, "\n".join([
+ "arv-put: Another process is already uploading this data.",
+ " Use --no-resume if this is really what you want."])
+ sys.exit(1)
+
+ if args.stream or args.raw:
+ writer = ArvPutCollection(cache=resume_cache,
+ reporter=reporter,
+ bytes_expected=bytes_expected,
+ num_retries=args.retries,
+ replication=write_copies)
+ else:
+ writer = ArvPutCollection(cache=resume_cache,
+ reporter=reporter,
+ bytes_expected=bytes_expected,
+ num_retries=args.retries,
+ replication=write_copies,
+ name=collection_name,
+ owner_uuid=project_uuid,
+ ensure_unique_name=True)
+
+ # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+ # the originals.
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+ for sigcode in CAUGHT_SIGNALS}
+
+ if writer.bytes_written > 0: # We're resuming a previous upload. TODO
+ print >>stderr, "\n".join([
+ "arv-put: Resuming previous upload from last checkpoint.",
+ " Use the --no-resume option to start over."])
+
+ writer.report_progress()
+ for path in args.paths: # Copy file data to Keep.
+ if path == '-':
+ writer.write_stdin(args.filename)
+ elif os.path.isdir(path):
+ writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params
+ else:
+ writer.write_file(path, args.filename or os.path.basename(path))
+
+ if args.progress: # Print newline to split stderr from stdout for humans.
+ print >>stderr
+
+ output = None
+ if args.stream:
+ if args.normalize:
+ output = writer.manifest_text(normalize=True)
+ else:
+ output = writer.manifest_text()
+ elif args.raw:
+ output = ','.join(writer.data_locators()) # TODO
+ else:
+ try:
+ # manifest_text = writer.manifest_text()
+ # if args.normalize:
+ # manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
+ # replication_attr = 'replication_desired'
+ # if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
+ # # API called it 'redundancy' before #3410.
+ # replication_attr = 'redundancy'
+ # # Register the resulting collection in Arvados.
+ # collection = api_client.collections().create(
+ # body={
+ # 'owner_uuid': project_uuid,
+ # 'name': collection_name,
+ # 'manifest_text': manifest_text,
+ # replication_attr: args.replication,
+ # },
+ # ensure_unique_name=True
+ # ).execute(num_retries=args.retries)
+ #
+ # print >>stderr, "Collection saved as '%s'" % collection['name']
+ #
+ writer.save()
+ if args.portable_data_hash:
+ output = writer.portable_data_hash()
+ else:
+ output = writer.manifest_locator()
+ with open('/tmp/lucas.txt', 'w') as f:
+ f.write(output)
+
+ except apiclient_errors.Error as error:
+ print >>stderr, (
+ "arv-put: Error creating Collection on project: {}.".format(
+ error))
+ status = 1
+
+ # Print the locator (uuid) of the new collection.
+ if output is None:
+ status = status or 1
+ else:
+ stdout.write(output)
+ if not output.endswith('\n'):
+ stdout.write('\n')
+
+ for sigcode, orig_handler in orig_signal_handlers.items():
+ signal.signal(sigcode, orig_handler)
+
+ if status != 0:
+ sys.exit(status)
+
+ if resume_cache is not None:
+ resume_cache.destroy()
+
+ return output
+
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
global api_client
@@ -825,4 +1040,4 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
return output
if __name__ == '__main__':
- main()
+ main_new()
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index c4ce823..0990075 100755
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -8,6 +8,7 @@ import pwd
import re
import shutil
import subprocess
+import multiprocessing
import sys
import tempfile
import time
@@ -267,23 +268,51 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
# shutil.rmtree(tmpdir)
# self.assertEqual(True, c.manifest())
- def test_write_directory_twice(self):
- data = 'b' * 1024 * 1024
- tmpdir = tempfile.mkdtemp()
- for size in [1, 5, 10, 70]:
- with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
- for _ in range(size):
- f.write(data)
- os.mkdir(os.path.join(tmpdir, 'subdir1'))
- for size in [2, 4, 6]:
- with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
- for _ in range(size):
- f.write(data)
- c = arv_put.ArvPutUploader([tmpdir])
- d = arv_put.ArvPutUploader([tmpdir])
- print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
- shutil.rmtree(tmpdir)
- self.assertEqual(0, d.bytes_written())
+ def fake_reporter(self, written, expected):
+ # Use this callback as a intra-block pause to be able to simulate an interruption
+ print "Written %d / %d bytes" % (written, expected)
+ time.sleep(10)
+
+ def bg_uploader(self, paths):
+ return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
+
+ # def test_resume_large_file_upload(self):
+ # import multiprocessing
+ # data = 'x' * 1024 * 1024 # 1 MB
+ # _, filename = tempfile.mkstemp()
+ # fileobj = open(filename, 'w')
+ # for _ in range(200):
+ # fileobj.write(data)
+ # fileobj.close()
+ # uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
+ # uploader.start()
+ # time.sleep(5)
+ # uploader.terminate()
+ # time.sleep(1)
+ # # cache = arv_put.ArvPutCollectionCache([filename])
+ # # print "Collection detected: %s" % cache.collection()
+ # # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
+ # # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
+ # # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+ # os.unlink(filename)
+
+ # def test_write_directory_twice(self):
+ # data = 'b' * 1024 * 1024
+ # tmpdir = tempfile.mkdtemp()
+ # for size in [1, 5, 10, 70]:
+ # with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # os.mkdir(os.path.join(tmpdir, 'subdir1'))
+ # for size in [2, 4, 6]:
+ # with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+ # for _ in range(size):
+ # f.write(data)
+ # c = arv_put.ArvPutUploader([tmpdir])
+ # d = arv_put.ArvPutUploader([tmpdir])
+ # print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+ # shutil.rmtree(tmpdir)
+ # self.assertEqual(0, d.bytes_written())
class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list