[ARVADOS] updated: a117d9790fc566653aead700f8cb207858c8fca9

git at public.curoverse.com git at public.curoverse.com
Mon Dec 22 22:38:17 EST 2014


Summary of changes:
 sdk/python/arvados/arvfile.py    | 160 +++++++++++++++++++++++++++++----------
 sdk/python/arvados/collection.py |  47 +++++++++++-
 sdk/python/arvados/keep.py       |  10 +--
 sdk/python/tests/test_arvfile.py | 150 ++++++++++++++++++++++++++++++++++++
 4 files changed, 317 insertions(+), 50 deletions(-)
 create mode 100644 sdk/python/tests/test_arvfile.py

       via  a117d9790fc566653aead700f8cb207858c8fca9 (commit)
      from  1811fb602be08a1f9ff9f71070861d8a2af60849 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a117d9790fc566653aead700f8cb207858c8fca9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 22 22:39:42 2014 -0500

    3198: Async put, read prefetch via BlockManager.  Added arvfile tests (forgot
    to add before).  No testing yet.

diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index cd1605e..bb89b9d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -6,6 +6,9 @@ from .ranges import *
 from arvados.retry import retry_method
 import config
 import hashlib
+import hashlib
+import threading
+import Queue
 
 def split(path):
     """split(path) -> streamname, filename
@@ -205,27 +208,120 @@ class StreamFileReader(ArvadosFileReaderBase):
 
 
 class BufferBlock(object):
-    def __init__(self, locator, starting_size=2**14):
-        self.locator = locator
+    WRITABLE = 0
+    PENDING = 1
+    COMMITTED = 2
+
+    def __init__(self, blockid, starting_size=2**14):
+        self.blockid = blockid
         self.buffer_block = bytearray(starting_size)
         self.buffer_view = memoryview(self.buffer_block)
         self.write_pointer = 0
+        self.state = BufferBlock.WRITABLE
+        self._locator = None
 
     def append(self, data):
-        while (self.write_pointer+len(data)) > len(self.buffer_block):
-            new_buffer_block = bytearray(len(self.buffer_block) * 2)
-            new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
-            self.buffer_block = new_buffer_block
-            self.buffer_view = memoryview(self.buffer_block)
-        self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
-        self.write_pointer += len(data)
+        if self.state == BufferBlock.WRITABLE:
+            while (self.write_pointer+len(data)) > len(self.buffer_block):
+                new_buffer_block = bytearray(len(self.buffer_block) * 2)
+                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+                self.buffer_block = new_buffer_block
+                self.buffer_view = memoryview(self.buffer_block)
+            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+            self.write_pointer += len(data)
+            self._locator = None
+        else:
+            raise AssertionError("Buffer block is not writable")
 
     def size(self):
         return self.write_pointer
 
-    def calculate_locator(self):
-        return "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+    def locator(self):
+        if self._locator is None
+            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+        return self._locator
+
+class AsyncKeepWriteErrors(Exception):
+    def __init__(self, errors):
+        self.errors = errors
 
+class BlockManager(object):
+    def __init__(self, keep):
+        self._keep = keep
+        self._bufferblocks = {}
+        self._put_queue = None
+        self._put_errors = None
+        self._threads = None
+        self._continue_worker = True
+        self._prefetch_queue = None
+        self._prefetch_thread = None
+
+    def alloc_bufferblock(self):
+        bb = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+        self._bufferblocks[bb.blockid] = bb
+        return bb
+
+    def commit_bufferblock(self, block):
+        def worker(self):
+            while self._continue_worker:
+                try:
+                    b = self._put_queue.get()
+                    b._locator = self._keep.put(item)
+                    b.state = BufferBlock.COMMITTED
+                    b.buffer_view = None
+                    b.buffer_block = None
+                except Exception as e:
+                    self._error.put(e)
+                finally:
+                    self._queue.task_done()
+
+        if self._threads is None:
+            self._put_queue = Queue.Queue()
+            self._put_errors = Queue.Queue()
+            self._threads = [threading.Thread(target=worker, args=(self,)),
+                             threading.Thread(target=worker, args=(self,))]
+
+        block.state = BufferBlock.PENDING
+        self._queue.put(block)
+
+    def get_block(self, locator, num_retries):
+        if locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            if bb.state != BufferBlock.COMMITTED:
+                return bb.buffer_view[0:bb.write_pointer].tobytes()
+            else:
+                locator = bb._locator
+        return self._keep.get(locator, num_retries=num_retries)
+
+    def commit_all(self):
+        for k,v in self._bufferblocks:
+            if v.state == BufferBlock.WRITABLE:
+                self.commit_bufferblock(v)
+        self._queue.join()
+        if not self._errors.empty():
+            e = []
+            try:
+                while True:
+                    e.append(self._errors.get(False))
+            except Queue.Empty:
+                pass
+            raise AsyncKeepWriteErrors(e)
+
+    def block_prefetch(self, locator):
+        def worker(keep, loc):
+            while self._continue_worker:
+                try:
+                    b = self._prefetch_queue.get()
+                    keep.get(loc)
+                except:
+                    pass
+
+        if locator in self._bufferblocks:
+            return
+        if self._prefetch_thread is None:
+            self._prefetch_queue = Queue.Queue()
+            self._prefetch_thread = threading.Thread(target=worker, args=(self,))
+        self._prefetch_queue.put(locator)
 
 class ArvadosFile(object):
     def __init__(self, stream=[], segments=[], keep=None):
@@ -238,7 +334,6 @@ class ArvadosFile(object):
         for s in segments:
             self.add_segment(stream, s.range_start, s.range_size)
         self._current_bblock = None
-        self._bufferblocks = None
         self._keep = keep
 
     def set_unmodified(self):
@@ -265,36 +360,23 @@ class ArvadosFile(object):
         self._segments = new_segs
         self._modified = True
 
-    def _keepget(self, locator, num_retries):
-        if self._bufferblocks and locator in self._bufferblocks:
-            bb = self._bufferblocks[locator]
-            return bb.buffer_view[0:bb.write_pointer].tobytes()
-        else:
-            return self._keep.get(locator, num_retries=num_retries)
-
     def readfrom(self, offset, size, num_retries):
         if size == 0 or offset >= self.size():
             return ''
         if self._keep is None:
             self._keep = KeepClient(num_retries=num_retries)
         data = []
-        # TODO: initiate prefetch on all blocks in the range (offset, offset + size + config.KEEP_BLOCK_SIZE)
+
+        for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
+            self.bbm.block_prefetch(lr.locator)
 
         for lr in locators_and_ranges(self._segments, offset, size):
             # TODO: if data is empty, wait on block get, otherwise only
             # get more data if the block is already in the cache.
-            data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+            data.append(self.bbm.get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
         return ''.join(data)
 
-    def _init_bufferblock(self):
-        if self._bufferblocks is None:
-            self._bufferblocks = {}
-        self._current_bblock = BufferBlock("bufferblock%i" % len(self._bufferblocks))
-        self._bufferblocks[self._current_bblock.locator] = self._current_bblock
-
     def _repack_writes(self):
-        pass
-         # TODO: fixme
         '''Test if the buffer block has more data than is referenced by actual segments
         (this happens when a buffered write over-writes a file range written in
         a previous buffered write).  Re-pack the buffer block for efficiency
@@ -304,20 +386,19 @@ class ArvadosFile(object):
 
         # Sum up the segments to get the total bytes of the file referencing
         # into the buffer block.
-        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.locator]
+        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
         write_total = sum([s.range_size for s in bufferblock_segs])
 
         if write_total < self._current_bblock.size():
             # There is more data in the buffer block than is actually accounted for by segments, so
             # re-pack into a new buffer by copying over to a new buffer block.
-            new_bb = BufferBlock(self._current_bblock.locator, starting_size=write_total)
+            new_bb = BufferBlock(self._current_bblock.blockid, starting_size=write_total)
             for t in bufferblock_segs:
                 new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
                 t.segment_offset = new_bb.size() - t.range_size
 
             self._current_bblock = new_bb
-            self._bufferblocks[self._current_bblock.locator] = self._current_bblock
-
+            self.bbm[self._current_bblock.blockid] = self._current_bblock
 
     def writeto(self, offset, data, num_retries):
         if len(data) == 0:
@@ -331,16 +412,17 @@ class ArvadosFile(object):
 
         self._modified = True
 
-        if self._current_bblock is None:
-            self._init_bufferblock()
+        if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE::
+            self._current_bblock = self.bbm.alloc_bufferblock()
 
-        if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes()
-            if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
-                self._init_bufferblock()
+            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
+                self.bbm.commit_bufferblock(self._current_bblock)
+                self._current_bblock = self.bbm.alloc_bufferblock()
 
         self._current_bblock.append(data)
-        replace_range(self._segments, offset, len(data), self._current_bblock.locator, self._current_bblock.write_pointer - len(data))
+        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
     def add_segment(self, blocks, pos, size):
         self._modified = True
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index ea9f5de..da359e8 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -810,8 +810,11 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __iter__(self):
-        for k in self._items.keys():
-            yield k
+        self._items.iterkeys()
+
+    @_populate_first
+    def iterkeys(self):
+        self._items.iterkeys()
 
     @_populate_first
     def __getitem__(self, k):
@@ -860,9 +863,47 @@ class Collection(CollectionBase):
         return self._items.items()
 
     @_populate_first
+    def manifest_text(self, strip=False, normalize=False):
+        if self.modified() or self._manifest_text is None or normalize:
+            return export_manifest(self, stream_name=".", portable_locators=strip)
+        else:
+            if strip:
+                return self.stripped_manifest()
+            else:
+                return self._manifest_text
+
+    def portable_data_hash(self):
+        stripped = self.manifest_text(strip=True)
+        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+    @_populate_first
+    def commit_bufferblocks(self):
+        pass
+
+    @_populate_first
     def save(self):
-        self._my_keep().put(self.portable_manifest_text())
+        if self.modified():
+            self._my_keep().put(self.manifest_text(strip=True))
+            if re.match(util.collection_uuid_pattern, self._manifest_locator):
+                self._api_response = self._api_client.collections().update(
+                    uuid=self._manifest_locator,
+                    body={'manifest_text': self.manifest_text(strip=False)}
+                    }).execute(
+                        num_retries=self.num_retries)
+            else:
+                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+            self.set_unmodified()
 
+    @_populate_first
+    def save_as(self, name, owner_uuid=None):
+        self._my_keep().put(self.manifest_text(strip=True))
+        body = {"manifest_text": self.manifest_text(strip=False),
+                "name": name}
+        if owner_uuid:
+            body["owner_uuid"] = owner_uuid
+        self._api_response = self._api_client.collections().create(body=body}).execute(num_retries=self.num_retries)
+        self._manifest_locator = self._api_response["uuid"]
+        self.set_unmodified()
 
 
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index f4c8596..0c5eac8 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -171,8 +171,7 @@ class KeepBlockCache(object):
 
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
+        with self._cache_lock:
             # Select all slots except those where ready.is_set() and content is
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
@@ -183,14 +182,11 @@ class KeepBlockCache(object):
                         del self._cache[i]
                         break
                 sm = sum([slot.size() for slot in self._cache])
-        finally:
-            self._cache_lock.release()
 
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
+        with self._cache_lock:
             # Test if the locator is already in the cache
             for i in xrange(0, len(self._cache)):
                 if self._cache[i].locator == locator:
@@ -205,8 +201,6 @@ class KeepBlockCache(object):
             n = KeepBlockCache.CacheSlot(locator)
             self._cache.insert(0, n)
             return n, True
-        finally:
-            self._cache_lock.release()
 
 class KeepClient(object):
 
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
new file mode 100644
index 0000000..9edb2c5
--- /dev/null
+++ b/sdk/python/tests/test_arvfile.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+
+import bz2
+import gzip
+import io
+import mock
+import os
+import unittest
+import hashlib
+
+import arvados
+from arvados import StreamReader, StreamFileReader, Range, import_manifest, export_manifest
+
+import arvados_testutil as tutil
+
+
+class ArvadosFileWriterTestCase(unittest.TestCase):
+    class MockKeep(object):
+        def __init__(self, blocks):
+            self.blocks = blocks
+        def get(self, locator, num_retries=0):
+            return self.blocks[locator]
+        def put(self, data):
+            pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+            self.blocks[pdh] = str(data)
+            return pdh
+
+    def test_truncate(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual(writer.size(), 10)
+        writer.seek(5)
+        self.assertEqual("56789", writer.read(8))
+        writer.truncate(8)
+        writer.seek(5, os.SEEK_SET)
+        self.assertEqual("567", writer.read(8))
+
+    def test_append(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        writer.seek(5, os.SEEK_SET)
+        self.assertEqual("56789", writer.read(8))
+        writer.seek(10, os.SEEK_SET)
+        writer.write("foo")
+        self.assertEqual(writer.size(), 13)
+        writer.seek(5, os.SEEK_SET)
+        self.assertEqual("56789foo", writer.read(8))
+
+    def test_write0(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual("0123456789", writer.readfrom(0, 13))
+        writer.seek(0, os.SEEK_SET)
+        writer.write("foo")
+        self.assertEqual(writer.size(), 10)
+        self.assertEqual("foo3456789", writer.readfrom(0, 13))
+        self.assertEqual(". acbd18db4cc2f85cedef654fccc4a4d8+3 781e5e245d69b566979b86e28d23f2c7+10 0:3:count.txt 6:7:count.txt\n", export_manifest(c))
+
+    def test_write1(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual("0123456789", writer.readfrom(0, 13))
+        writer.seek(3, os.SEEK_SET)
+        writer.write("foo")
+        self.assertEqual(writer.size(), 10)
+        self.assertEqual("012foo6789", writer.readfrom(0, 13))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", export_manifest(c))
+
+    def test_write2(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual("0123456789", writer.readfrom(0, 13))
+        writer.seek(7, os.SEEK_SET)
+        writer.write("foo")
+        self.assertEqual(writer.size(), 10)
+        self.assertEqual("0123456foo", writer.readfrom(0, 13))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:7:count.txt 10:3:count.txt\n", export_manifest(c))
+
+    def test_write3(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt 0:10:count.txt\n',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual("012345678901234", writer.readfrom(0, 15))
+        writer.seek(7, os.SEEK_SET)
+        writer.write("foobar")
+        self.assertEqual(writer.size(), 20)
+        self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 3858f62230ac3c915f300c664312c63f+6 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", export_manifest(c))
+
+    def test_write4(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:4:count.txt 0:4:count.txt 0:4:count.txt',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        self.assertEqual("012301230123", writer.readfrom(0, 15))
+        writer.seek(2, os.SEEK_SET)
+        writer.write("abcdefg")
+        self.assertEqual(writer.size(), 12)
+        self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 7ac66c0f148de9519b8bd264312c4d64+7 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", export_manifest(c))
+
+    def test_write_large(self):
+        c = import_manifest('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+                              keep=ArvadosFileWriterTestCase.MockKeep({}))
+        writer = c.open("count.txt", "r+")
+        text = ''.join(["0123456789" for a in xrange(0, 100)])
+        for b in xrange(0, 100000):
+            writer.write(text)
+        self.assertEqual(writer.size(), 100000000)
+        self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", export_manifest(c))
+
+    def test_write_rewrite0(self):
+        c = import_manifest('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+                              keep=ArvadosFileWriterTestCase.MockKeep({}))
+        writer = c.open("count.txt", "r+")
+        for b in xrange(0, 10):
+            writer.seek(0, os.SEEK_SET)
+            writer.write("0123456789")
+        writer.arvadosfile._repack_writes()
+        self.assertEqual(writer.size(), 10)
+        self.assertEqual("0123456789", writer.readfrom(0, 20))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", export_manifest(c))
+
+    def test_write_rewrite1(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        for b in xrange(0, 10):
+            writer.seek(10, os.SEEK_SET)
+            writer.write("abcdefghij")
+        writer.arvadosfile._repack_writes()
+        self.assertEqual(writer.size(), 20)
+        self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", export_manifest(c))
+
+    def test_write_rewrite2(self):
+        c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt',
+                              keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+        writer = c.open("count.txt", "r+")
+        for b in xrange(0, 10):
+            writer.seek(5, os.SEEK_SET)
+            writer.write("abcdefghij")
+        writer.arvadosfile._repack_writes()
+        self.assertEqual(writer.size(), 15)
+        self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", export_manifest(c))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list