[ARVADOS] updated: dcb4db28681b6949a56a1de579891cb375c423fe

Git user git at public.curoverse.com
Wed Jul 6 23:21:14 EDT 2016


Summary of changes:
 sdk/python/arvados/commands/put.py | 329 ++++++++++++++++++++++++++++++-------
 sdk/python/tests/test_arv_put.py   |  63 +++++--
 2 files changed, 318 insertions(+), 74 deletions(-)

       via  dcb4db28681b6949a56a1de579891cb375c423fe (commit)
      from  3847357436978a97470e0769f3200d354a2bd08e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit dcb4db28681b6949a56a1de579891cb375c423fe
Author: Lucas Di Pentima <lucas at curoverse.com>
Date:   Thu Jul 7 00:20:58 2016 -0300

    9463: Polishing the last details to make the integration tests work ok

diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 6e216ee..6bb1a0b 100755
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -310,11 +310,10 @@ class ArvPutCollectionCache(object):
     
     def _save(self):
         """
-        Atomically save (create temp file & rename() it)
+        Atomically save
         """
         # TODO: Should be a good idea to avoid _save() spamming? when writing 
         # lots of small files.
-        print "SAVE START"
         try:
             new_cache_fd, new_cache_name = tempfile.mkstemp(
                 dir=os.path.dirname(self.filename))
@@ -323,23 +322,24 @@ class ArvPutCollectionCache(object):
             json.dump(self.data, new_cache)
             os.rename(new_cache_name, self.filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
-            print "SAVE ERROR: %s" % error
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
                 pass
         else:
-            print "SAVE DONE!! %s" % self.filename
             self.cache_file.close()
             self.cache_file = new_cache
     
     def file_uploaded(self, path):
+        print "About to register an uploaded file: %s" % path
         if path in self.files.keys():
             self.data['uploaded'][path] = self.files[path]
             self._save()
+            print "Already registered the uploaded file!"
     
-    def set_collection(self, uuid):
-        self.data['col_locator'] = uuid
+    def set_collection(self, loc):
+        self.data['col_locator'] = loc
+        self._save()
     
     def collection(self):
         return self.data['col_locator']
@@ -406,27 +406,41 @@ class ArvPutCollectionCache(object):
         self.cache_file.close()
 
     def destroy(self):
-        # try:
-        #     os.unlink(self.filename)
-        # except OSError as error:
-        #     if error.errno != errno.ENOENT:  # That's what we wanted anyway.
-        #         raise
+        try:
+            os.unlink(self.filename)
+        except OSError as error:
+            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
+                raise
         self.close()
 
 class ArvPutUploader(object):
-    def __init__(self, paths):
+    def __init__(self, paths, reporter=None):
+        expected_bytes = expected_bytes_for(paths)
         self.cache = ArvPutCollectionCache(paths)
-        if self.cache.collection() is not None:
-            self.collection = ArvPutCollection(locator=self.cache.collection(), cache=self.cache)
-        else:
-            self.collection = ArvPutCollection(cache=self.cache)
-            self.cache.set_collection(self.collection.manifest_locator())
-        for p in paths:
-            if os.path.isdir(p):
-                self.collection.write_directory_tree(p)
-            elif os.path.isfile(p):
-                self.collection.write_file(p)
-        self.cache.destroy()
+        self.paths = paths
+        self.already_uploaded = False
+        # if self.cache.collection() is not None:
+        #     self.collection = ArvPutCollection(
+        #                         locator=self.cache.collection(),
+        #                         cache=self.cache,
+        #                         reporter=reporter,
+        #                         bytes_expected=expected_bytes)
+        # else:
+        self.collection = ArvPutCollection(
+                            cache=self.cache, 
+                            reporter=reporter,
+                            bytes_expected=expected_bytes)
+            # self.cache.set_collection(self.collection.manifest_locator())
+    
+    def do_upload(self):
+        if not self.already_uploaded:
+            for p in paths:
+                if os.path.isdir(p):
+                    self.collection.write_directory_tree(p)
+                elif os.path.isfile(p):
+                    self.collection.write_file(p, os.path.basename(p))
+            self.cache.destroy()
+            self.already_uploaded = True
     
     def manifest(self):
         return self.collection.manifest()
@@ -436,24 +450,69 @@ class ArvPutUploader(object):
 
 
 class ArvPutCollection(object):
-    def __init__(self, locator=None, cache=None, reporter=None, 
-                    bytes_expected=None, **kwargs):
-        self.collection_flush_time = 60
+    def __init__(self, cache=None, reporter=None, bytes_expected=None, 
+                name=None, owner_uuid=None, ensure_unique_name=False, 
+                num_retries=None, replication=None):
+        self.collection_flush_time = 60 # Secs
         self.bytes_written = 0
-        self._seen_inputs = []
         self.cache = cache
         self.reporter = reporter
+        self.num_retries=num_retries
         self.bytes_expected = bytes_expected
         
+        locator = self.cache.collection() if self.cache else None
+        
         if locator is None:
             self.collection = arvados.collection.Collection()
-            self.collection.save_new()
+            self.collection.save_new(name=name, owner_uuid=owner_uuid, 
+                                        ensure_unique_name=ensure_unique_name,
+                                        num_retries=num_retries)
+            if self.cache:
+                self.cache.set_collection(self.collection.manifest_locator())
         else:
             self.collection = arvados.collection.Collection(locator)
     
+    def save(self):
+        self.collection.save(num_retries=self.num_retries)
+    
     def manifest_locator(self):
         return self.collection.manifest_locator()
-            
+    
+    def portable_data_hash(self):
+        return self.collectin.portable_data_hash()
+    
+    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+        return self.collection.manifest_text(stream_name, strip, normalize)
+    
+    def _write(self, source_fd, output, first_block=True):
+        start_time = time.time()
+        while True:
+            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+            if not data:
+                break
+            output.write(data)
+            output.flush() # Commit block to Keep
+            self.bytes_written += len(data)
+            # Is it time to update the collection?
+            if (time.time() - start_time) > self.collection_flush_time:
+                self.collection.save(num_retries=self.num_retries)
+                start_time = time.time()
+            # Once a block is written on each file, mark it as uploaded on the cache
+            if first_block:
+                if self.cache:
+                    self.cache.file_uploaded(source_fd.name)
+                    self.collection.save(num_retries=self.num_retries)
+                    print "FLUSHED COLLECTION!!!"
+                first_block = False
+            self.report_progress()
+    
+    def write_stdin(self, filename):
+        with self.collection as c:
+            output = c.open(filename, 'w')
+            self._write(sys.stdin, output)
+            output.close()
+            self.collection.save()
+    
     def write_file(self, source, filename):
         if self.cache and source in self.cache.dirty_files():
             print "DIRTY: Removing file %s from collection to be uploaded again" % source
@@ -461,23 +520,30 @@ class ArvPutCollection(object):
         
         resume_offset = 0
         resume_upload = False
-
-        print "FIND file %s" % filename
-        if self.collection.find(filename):
+        try:
+            print "FIND file %s" % filename
+            collection_file = self.collection.find(filename)
+        except IOError:
+            # Not found
+            collection_file = None
+        
+        if collection_file:
             print "File %s already in the collection, checking!" % source
-            if os.path.getsize(source) == self.collection.find(filename).size():
+            if os.path.getsize(source) == collection_file.size():
                 print "WARNING: file %s already uploaded, skipping!" % source
                 # File already there, skip it.
+                self.bytes_written += os.path.getsize(source)
                 return
-            elif os.path.getsize(source) > self.collection.find(filename).size():
+            elif os.path.getsize(source) > collection_file.size():
                 print "WARNING: RESUMING file %s" % source
                 # File partially uploaded, resume!
                 resume_upload = True
-                resume_offset = self.collection.find(filename).size()
+                resume_offset = collection_file.size()
+                self.bytes_written += resume_offset
             else:
                 # Source file smaller than uploaded file, what happened here?
                 # TODO: Raise exception of some kind?
-                pass
+                return
 
         with open(source, 'r') as source_fd:
             with self.collection as c:
@@ -490,29 +556,12 @@ class ArvPutCollection(object):
                     print "Writing file, source: %s, filename: %s" % (source, filename)
                     output = c.open(filename, 'w')
                     first_block = True
-                    
-                start_time = time.time()
-                while True:
-                    data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
-                    if not data:
-                        break
-                    output.write(data)
-                    output.flush() # Commit block to Keep
-                    self.bytes_written += len(data)
-                    # Is it time to update the collection?
-                    if (time.time() - start_time) > self.collection_flush_time:
-                        self.collection.save()
-                        start_time = time.time()
-                    # Once a block is written on each file, mark it as uploaded on the cache
-                    if first_block:
-                        if self.cache:
-                            self.cache.file_uploaded(source)
-                        first_block = False
-                # File write finished
+                
+                self._write(source_fd, output, first_block)
                 output.close()
                 self.collection.save() # One last save...
 
-    def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1):
+    def write_directory_tree(self, path, stream_name='.'):
         if os.path.isdir(path):
             for item in os.listdir(path):
                 print "Checking path: '%s' - stream_name: '%s'" % (path, stream_name)
@@ -655,6 +704,172 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
+def main_new(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+    global api_client
+
+    args = parse_arguments(arguments)
+    status = 0
+    if api_client is None:
+        api_client = arvados.api('v1')
+
+    # Determine the name to use
+    if args.name:
+        if args.stream or args.raw:
+            print >>stderr, "Cannot use --name with --stream or --raw"
+            sys.exit(1)
+        collection_name = args.name
+    else:
+        collection_name = "Saved at {} by {}@{}".format(
+            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
+            pwd.getpwuid(os.getuid()).pw_name,
+            socket.gethostname())
+
+    if args.project_uuid and (args.stream or args.raw):
+        print >>stderr, "Cannot use --project-uuid with --stream or --raw"
+        sys.exit(1)
+
+    # Determine the parent project
+    try:
+        project_uuid = desired_project_uuid(api_client, args.project_uuid,
+                                            args.retries)
+    except (apiclient_errors.Error, ValueError) as error:
+        print >>stderr, error
+        sys.exit(1)
+
+    # write_copies diverges from args.replication here.
+    # args.replication is how many copies we will instruct Arvados to
+    # maintain (by passing it in collections().create()) after all
+    # data is written -- and if None was given, we'll use None there.
+    # Meanwhile, write_copies is how many copies of each data block we
+    # write to Keep, which has to be a number.
+    #
+    # If we simply changed args.replication from None to a default
+    # here, we'd end up erroneously passing the default replication
+    # level (instead of None) to collections().create().
+    write_copies = (args.replication or
+                    api_client._rootDesc.get('defaultCollectionReplication', 2))
+
+    if args.progress:
+        reporter = progress_writer(human_progress)
+    elif args.batch_progress:
+        reporter = progress_writer(machine_progress)
+    else:
+        reporter = None
+    bytes_expected = expected_bytes_for(args.paths)
+
+    resume_cache = None
+    if args.resume:
+        try:
+            resume_cache = ArvPutCollectionCache(args.paths)
+        except (IOError, OSError, ValueError):
+            pass  # Couldn't open cache directory/file.  Continue without it.
+        except ResumeCacheConflict:
+            print >>stderr, "\n".join([
+                "arv-put: Another process is already uploading this data.",
+                "         Use --no-resume if this is really what you want."])
+            sys.exit(1)
+
+    if args.stream or args.raw:
+        writer = ArvPutCollection(cache=resume_cache, 
+                        reporter=reporter,
+                        bytes_expected=bytes_expected,
+                        num_retries=args.retries,
+                        replication=write_copies)
+    else:
+        writer = ArvPutCollection(cache=resume_cache, 
+                        reporter=reporter,
+                        bytes_expected=bytes_expected,
+                        num_retries=args.retries,
+                        replication=write_copies,
+                        name=collection_name,
+                        owner_uuid=project_uuid,
+                        ensure_unique_name=True)
+
+    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
+    # the originals.
+    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+                            for sigcode in CAUGHT_SIGNALS}
+
+    if writer.bytes_written > 0:  # We're resuming a previous upload. TODO
+        print >>stderr, "\n".join([
+                "arv-put: Resuming previous upload from last checkpoint.",
+                "         Use the --no-resume option to start over."])
+
+    writer.report_progress()
+    for path in args.paths:  # Copy file data to Keep.
+        if path == '-':
+            writer.write_stdin(args.filename)
+        elif os.path.isdir(path):
+            writer.write_directory_tree(path)#, os.path.join('.', os.path.basename(path))) TODO: Check what happens with multiple directories params
+        else:
+            writer.write_file(path, args.filename or os.path.basename(path))
+
+    if args.progress:  # Print newline to split stderr from stdout for humans.
+        print >>stderr
+
+    output = None
+    if args.stream:
+        if args.normalize:
+            output = writer.manifest_text(normalize=True)
+        else:
+            output = writer.manifest_text()
+    elif args.raw:
+        output = ','.join(writer.data_locators()) # TODO
+    else:
+        try:
+    #         manifest_text = writer.manifest_text()
+    #         if args.normalize:
+    #             manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
+    #         replication_attr = 'replication_desired'
+    #         if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
+    #             # API called it 'redundancy' before #3410.
+    #             replication_attr = 'redundancy'
+    #         # Register the resulting collection in Arvados.
+    #         collection = api_client.collections().create(
+    #             body={
+    #                 'owner_uuid': project_uuid,
+    #                 'name': collection_name,
+    #                 'manifest_text': manifest_text,
+    #                 replication_attr: args.replication,
+    #                 },
+    #             ensure_unique_name=True
+    #             ).execute(num_retries=args.retries)
+    #
+    #         print >>stderr, "Collection saved as '%s'" % collection['name']
+    #
+            writer.save()
+            if args.portable_data_hash:
+                output = writer.portable_data_hash()
+            else:
+                output = writer.manifest_locator()
+            with open('/tmp/lucas.txt', 'w') as f:
+                f.write(output)
+
+        except apiclient_errors.Error as error:
+            print >>stderr, (
+                "arv-put: Error creating Collection on project: {}.".format(
+                    error))
+            status = 1
+
+    # Print the locator (uuid) of the new collection.
+    if output is None:
+        status = status or 1
+    else:
+        stdout.write(output)
+        if not output.endswith('\n'):
+            stdout.write('\n')
+
+    for sigcode, orig_handler in orig_signal_handlers.items():
+        signal.signal(sigcode, orig_handler)
+
+    if status != 0:
+        sys.exit(status)
+
+    if resume_cache is not None:
+        resume_cache.destroy()
+
+    return output
+
 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     global api_client
 
@@ -825,4 +1040,4 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     return output
 
 if __name__ == '__main__':
-    main()
+    main_new()
diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py
index c4ce823..0990075 100755
--- a/sdk/python/tests/test_arv_put.py
+++ b/sdk/python/tests/test_arv_put.py
@@ -8,6 +8,7 @@ import pwd
 import re
 import shutil
 import subprocess
+import multiprocessing
 import sys
 import tempfile
 import time
@@ -267,23 +268,51 @@ class ArvadosPutCollectionTest(run_test_server.TestCaseWithServers):
     #     shutil.rmtree(tmpdir)
     #     self.assertEqual(True, c.manifest())
 
-    def test_write_directory_twice(self):
-        data = 'b' * 1024 * 1024
-        tmpdir = tempfile.mkdtemp()
-        for size in [1, 5, 10, 70]:
-            with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
-                for _ in range(size):
-                    f.write(data)
-        os.mkdir(os.path.join(tmpdir, 'subdir1'))
-        for size in [2, 4, 6]:
-            with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
-                for _ in range(size):
-                    f.write(data)
-        c = arv_put.ArvPutUploader([tmpdir])
-        d = arv_put.ArvPutUploader([tmpdir])
-        print "ESCRIDIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
-        shutil.rmtree(tmpdir)
-        self.assertEqual(0, d.bytes_written())
+    def fake_reporter(self, written, expected):
+        # Use this callback as a intra-block pause to be able to simulate an interruption
+        print "Written %d / %d bytes" % (written, expected)
+        time.sleep(10)
+    
+    def bg_uploader(self, paths):
+        return arv_put.ArvPutUploader(paths, reporter=self.fake_reporter)
+
+    # def test_resume_large_file_upload(self):
+    #     import multiprocessing
+    #     data = 'x' * 1024 * 1024 # 1 MB
+    #     _, filename = tempfile.mkstemp()
+    #     fileobj = open(filename, 'w')
+    #     for _ in range(200):
+    #         fileobj.write(data)
+    #     fileobj.close()
+    #     uploader = multiprocessing.Process(target=self.bg_uploader, args=([filename],))
+    #     uploader.start()
+    #     time.sleep(5)
+    #     uploader.terminate()
+    #     time.sleep(1)
+    #     # cache = arv_put.ArvPutCollectionCache([filename])
+    #     # print "Collection detected: %s" % cache.collection()
+    #     # c = arv_put.ArvPutCollection(locator=cache.collection(), cache=cache)
+    #     # print "UPLOADED: %d" % c.collection[os.path.basename(filename)].size()
+    #     # self.assertLess(c.collection[os.path.basename(filename)].size(), os.path.getsize(filename))
+    #     os.unlink(filename)
+
+    # def test_write_directory_twice(self):
+    #     data = 'b' * 1024 * 1024
+    #     tmpdir = tempfile.mkdtemp()
+    #     for size in [1, 5, 10, 70]:
+    #         with open(os.path.join(tmpdir, 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     os.mkdir(os.path.join(tmpdir, 'subdir1'))
+    #     for size in [2, 4, 6]:
+    #         with open(os.path.join(tmpdir, 'subdir1', 'file_%d' % size), 'w') as f:
+    #             for _ in range(size):
+    #                 f.write(data)
+    #     c = arv_put.ArvPutUploader([tmpdir])
+    #     d = arv_put.ArvPutUploader([tmpdir])
+    #     print "ESCRIBIERON: c: %d, d: %d" % (c.bytes_written(), d.bytes_written())
+    #     shutil.rmtree(tmpdir)
+    #     self.assertEqual(0, d.bytes_written())
         
 
 class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list