[ARVADOS] updated: a117d9790fc566653aead700f8cb207858c8fca9
git at public.curoverse.com
git at public.curoverse.com
Mon Dec 22 22:38:17 EST 2014
Summary of changes:
sdk/python/arvados/arvfile.py | 160 +++++++++++++++++++++++++++++----------
sdk/python/arvados/collection.py | 47 +++++++++++-
sdk/python/arvados/keep.py | 10 +--
sdk/python/tests/test_arvfile.py | 150 ++++++++++++++++++++++++++++++++++++
4 files changed, 317 insertions(+), 50 deletions(-)
create mode 100644 sdk/python/tests/test_arvfile.py
via a117d9790fc566653aead700f8cb207858c8fca9 (commit)
from 1811fb602be08a1f9ff9f71070861d8a2af60849 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a117d9790fc566653aead700f8cb207858c8fca9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 22 22:39:42 2014 -0500
3198: Async put, read prefetch via BlockManager. Added arvfile tests (forgot
to add before). No testing yet.
diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py
index cd1605e..bb89b9d 100644
--- a/sdk/python/arvados/arvfile.py
+++ b/sdk/python/arvados/arvfile.py
@@ -6,6 +6,9 @@ from .ranges import *
from arvados.retry import retry_method
import config
import hashlib
+import hashlib
+import threading
+import Queue
def split(path):
"""split(path) -> streamname, filename
@@ -205,27 +208,120 @@ class StreamFileReader(ArvadosFileReaderBase):
class BufferBlock(object):
- def __init__(self, locator, starting_size=2**14):
- self.locator = locator
+ WRITABLE = 0
+ PENDING = 1
+ COMMITTED = 2
+
+ def __init__(self, blockid, starting_size=2**14):
+ self.blockid = blockid
self.buffer_block = bytearray(starting_size)
self.buffer_view = memoryview(self.buffer_block)
self.write_pointer = 0
+ self.state = BufferBlock.WRITABLE
+ self._locator = None
def append(self, data):
- while (self.write_pointer+len(data)) > len(self.buffer_block):
- new_buffer_block = bytearray(len(self.buffer_block) * 2)
- new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
- self.buffer_block = new_buffer_block
- self.buffer_view = memoryview(self.buffer_block)
- self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
- self.write_pointer += len(data)
+ if self.state == BufferBlock.WRITABLE:
+ while (self.write_pointer+len(data)) > len(self.buffer_block):
+ new_buffer_block = bytearray(len(self.buffer_block) * 2)
+ new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
+ self.buffer_block = new_buffer_block
+ self.buffer_view = memoryview(self.buffer_block)
+ self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
+ self.write_pointer += len(data)
+ self._locator = None
+ else:
+ raise AssertionError("Buffer block is not writable")
def size(self):
return self.write_pointer
- def calculate_locator(self):
- return "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+ def locator(self):
+ if self._locator is None
+ self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
+ return self._locator
+
+class AsyncKeepWriteErrors(Exception):
+ def __init__(self, errors):
+ self.errors = errors
+class BlockManager(object):
+ def __init__(self, keep):
+ self._keep = keep
+ self._bufferblocks = {}
+ self._put_queue = None
+ self._put_errors = None
+ self._threads = None
+ self._continue_worker = True
+ self._prefetch_queue = None
+ self._prefetch_thread = None
+
+ def alloc_bufferblock(self):
+ bb = BufferBlock("bufferblock%i" % len(self._bufferblocks))
+ self._bufferblocks[bb.blockid] = bb
+ return bb
+
+ def commit_bufferblock(self, block):
+ def worker(self):
+ while self._continue_worker:
+ try:
+ b = self._put_queue.get()
+ b._locator = self._keep.put(item)
+ b.state = BufferBlock.COMMITTED
+ b.buffer_view = None
+ b.buffer_block = None
+ except Exception as e:
+ self._error.put(e)
+ finally:
+ self._queue.task_done()
+
+ if self._threads is None:
+ self._put_queue = Queue.Queue()
+ self._put_errors = Queue.Queue()
+ self._threads = [threading.Thread(target=worker, args=(self,)),
+ threading.Thread(target=worker, args=(self,))]
+
+ block.state = BufferBlock.PENDING
+ self._queue.put(block)
+
+ def get_block(self, locator, num_retries):
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ if bb.state != BufferBlock.COMMITTED:
+ return bb.buffer_view[0:bb.write_pointer].tobytes()
+ else:
+ locator = bb._locator
+ return self._keep.get(locator, num_retries=num_retries)
+
+ def commit_all(self):
+ for k,v in self._bufferblocks:
+ if v.state == BufferBlock.WRITABLE:
+ self.commit_bufferblock(v)
+ self._queue.join()
+ if not self._errors.empty():
+ e = []
+ try:
+ while True:
+ e.append(self._errors.get(False))
+ except Queue.Empty:
+ pass
+ raise AsyncKeepWriteErrors(e)
+
+ def block_prefetch(self, locator):
+ def worker(keep, loc):
+ while self._continue_worker:
+ try:
+ b = self._prefetch_queue.get()
+ keep.get(loc)
+ except:
+ pass
+
+ if locator in self._bufferblocks:
+ return
+ if self._prefetch_thread is None:
+ self._prefetch_queue = Queue.Queue()
+ self._prefetch_thread = threading.Thread(target=worker, args=(self,))
+ self._prefetch_queue.put(locator)
class ArvadosFile(object):
def __init__(self, stream=[], segments=[], keep=None):
@@ -238,7 +334,6 @@ class ArvadosFile(object):
for s in segments:
self.add_segment(stream, s.range_start, s.range_size)
self._current_bblock = None
- self._bufferblocks = None
self._keep = keep
def set_unmodified(self):
@@ -265,36 +360,23 @@ class ArvadosFile(object):
self._segments = new_segs
self._modified = True
- def _keepget(self, locator, num_retries):
- if self._bufferblocks and locator in self._bufferblocks:
- bb = self._bufferblocks[locator]
- return bb.buffer_view[0:bb.write_pointer].tobytes()
- else:
- return self._keep.get(locator, num_retries=num_retries)
-
def readfrom(self, offset, size, num_retries):
if size == 0 or offset >= self.size():
return ''
if self._keep is None:
self._keep = KeepClient(num_retries=num_retries)
data = []
- # TODO: initiate prefetch on all blocks in the range (offset, offset + size + config.KEEP_BLOCK_SIZE)
+
+ for lr in locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE):
+ self.bbm.block_prefetch(lr.locator)
for lr in locators_and_ranges(self._segments, offset, size):
# TODO: if data is empty, wait on block get, otherwise only
# get more data if the block is already in the cache.
- data.append(self._keepget(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
+ data.append(self.bbm.get_block(lr.locator, num_retries=num_retries)[lr.segment_offset:lr.segment_offset+lr.segment_size])
return ''.join(data)
- def _init_bufferblock(self):
- if self._bufferblocks is None:
- self._bufferblocks = {}
- self._current_bblock = BufferBlock("bufferblock%i" % len(self._bufferblocks))
- self._bufferblocks[self._current_bblock.locator] = self._current_bblock
-
def _repack_writes(self):
- pass
- # TODO: fixme
'''Test if the buffer block has more data than is referenced by actual segments
(this happens when a buffered write over-writes a file range written in
a previous buffered write). Re-pack the buffer block for efficiency
@@ -304,20 +386,19 @@ class ArvadosFile(object):
# Sum up the segments to get the total bytes of the file referencing
# into the buffer block.
- bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.locator]
+ bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
write_total = sum([s.range_size for s in bufferblock_segs])
if write_total < self._current_bblock.size():
# There is more data in the buffer block than is actually accounted for by segments, so
# re-pack into a new buffer by copying over to a new buffer block.
- new_bb = BufferBlock(self._current_bblock.locator, starting_size=write_total)
+ new_bb = BufferBlock(self._current_bblock.blockid, starting_size=write_total)
for t in bufferblock_segs:
new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
t.segment_offset = new_bb.size() - t.range_size
self._current_bblock = new_bb
- self._bufferblocks[self._current_bblock.locator] = self._current_bblock
-
+ self.bbm[self._current_bblock.blockid] = self._current_bblock
def writeto(self, offset, data, num_retries):
if len(data) == 0:
@@ -331,16 +412,17 @@ class ArvadosFile(object):
self._modified = True
- if self._current_bblock is None:
- self._init_bufferblock()
+ if self._current_bblock is None or self._current_bblock.state != BufferBlock.WRITABLE::
+ self._current_bblock = self.bbm.alloc_bufferblock()
- if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
+ if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
self._repack_writes()
- if (self._current_bblock.write_pointer + len(data)) > config.KEEP_BLOCK_SIZE:
- self._init_bufferblock()
+ if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
+ self.bbm.commit_bufferblock(self._current_bblock)
+ self._current_bblock = self.bbm.alloc_bufferblock()
self._current_bblock.append(data)
- replace_range(self._segments, offset, len(data), self._current_bblock.locator, self._current_bblock.write_pointer - len(data))
+ replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
def add_segment(self, blocks, pos, size):
self._modified = True
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index ea9f5de..da359e8 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -810,8 +810,11 @@ class Collection(CollectionBase):
@_populate_first
def __iter__(self):
- for k in self._items.keys():
- yield k
+ self._items.iterkeys()
+
+ @_populate_first
+ def iterkeys(self):
+ self._items.iterkeys()
@_populate_first
def __getitem__(self, k):
@@ -860,9 +863,47 @@ class Collection(CollectionBase):
return self._items.items()
@_populate_first
+ def manifest_text(self, strip=False, normalize=False):
+ if self.modified() or self._manifest_text is None or normalize:
+ return export_manifest(self, stream_name=".", portable_locators=strip)
+ else:
+ if strip:
+ return self.stripped_manifest()
+ else:
+ return self._manifest_text
+
+ def portable_data_hash(self):
+ stripped = self.manifest_text(strip=True)
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+
+ @_populate_first
+ def commit_bufferblocks(self):
+ pass
+
+ @_populate_first
def save(self):
- self._my_keep().put(self.portable_manifest_text())
+ if self.modified():
+ self._my_keep().put(self.manifest_text(strip=True))
+ if re.match(util.collection_uuid_pattern, self._manifest_locator):
+ self._api_response = self._api_client.collections().update(
+ uuid=self._manifest_locator,
+ body={'manifest_text': self.manifest_text(strip=False)}
+ }).execute(
+ num_retries=self.num_retries)
+ else:
+ raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
+ self.set_unmodified()
+ @_populate_first
+ def save_as(self, name, owner_uuid=None):
+ self._my_keep().put(self.manifest_text(strip=True))
+ body = {"manifest_text": self.manifest_text(strip=False),
+ "name": name}
+ if owner_uuid:
+ body["owner_uuid"] = owner_uuid
+ self._api_response = self._api_client.collections().create(body=body}).execute(num_retries=self.num_retries)
+ self._manifest_locator = self._api_response["uuid"]
+ self.set_unmodified()
def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index f4c8596..0c5eac8 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -171,8 +171,7 @@ class KeepBlockCache(object):
def cap_cache(self):
'''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
+ with self._cache_lock:
# Select all slots except those where ready.is_set() and content is
# None (that means there was an error reading the block).
self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
@@ -183,14 +182,11 @@ class KeepBlockCache(object):
del self._cache[i]
break
sm = sum([slot.size() for slot in self._cache])
- finally:
- self._cache_lock.release()
def reserve_cache(self, locator):
'''Reserve a cache slot for the specified locator,
or return the existing slot.'''
- self._cache_lock.acquire()
- try:
+ with self._cache_lock:
# Test if the locator is already in the cache
for i in xrange(0, len(self._cache)):
if self._cache[i].locator == locator:
@@ -205,8 +201,6 @@ class KeepBlockCache(object):
n = KeepBlockCache.CacheSlot(locator)
self._cache.insert(0, n)
return n, True
- finally:
- self._cache_lock.release()
class KeepClient(object):
diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py
new file mode 100644
index 0000000..9edb2c5
--- /dev/null
+++ b/sdk/python/tests/test_arvfile.py
@@ -0,0 +1,150 @@
+#!/usr/bin/env python
+
+import bz2
+import gzip
+import io
+import mock
+import os
+import unittest
+import hashlib
+
+import arvados
+from arvados import StreamReader, StreamFileReader, Range, import_manifest, export_manifest
+
+import arvados_testutil as tutil
+
+
+class ArvadosFileWriterTestCase(unittest.TestCase):
+ class MockKeep(object):
+ def __init__(self, blocks):
+ self.blocks = blocks
+ def get(self, locator, num_retries=0):
+ return self.blocks[locator]
+ def put(self, data):
+ pdh = "%s+%i" % (hashlib.md5(data).hexdigest(), len(data))
+ self.blocks[pdh] = str(data)
+ return pdh
+
+ def test_truncate(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 10)
+ writer.seek(5)
+ self.assertEqual("56789", writer.read(8))
+ writer.truncate(8)
+ writer.seek(5, os.SEEK_SET)
+ self.assertEqual("567", writer.read(8))
+
+ def test_append(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ writer.seek(5, os.SEEK_SET)
+ self.assertEqual("56789", writer.read(8))
+ writer.seek(10, os.SEEK_SET)
+ writer.write("foo")
+ self.assertEqual(writer.size(), 13)
+ writer.seek(5, os.SEEK_SET)
+ self.assertEqual("56789foo", writer.read(8))
+
+ def test_write0(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual("0123456789", writer.readfrom(0, 13))
+ writer.seek(0, os.SEEK_SET)
+ writer.write("foo")
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("foo3456789", writer.readfrom(0, 13))
+ self.assertEqual(". acbd18db4cc2f85cedef654fccc4a4d8+3 781e5e245d69b566979b86e28d23f2c7+10 0:3:count.txt 6:7:count.txt\n", export_manifest(c))
+
+ def test_write1(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual("0123456789", writer.readfrom(0, 13))
+ writer.seek(3, os.SEEK_SET)
+ writer.write("foo")
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("012foo6789", writer.readfrom(0, 13))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:count.txt 10:3:count.txt 6:4:count.txt\n", export_manifest(c))
+
+ def test_write2(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual("0123456789", writer.readfrom(0, 13))
+ writer.seek(7, os.SEEK_SET)
+ writer.write("foo")
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("0123456foo", writer.readfrom(0, 13))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:7:count.txt 10:3:count.txt\n", export_manifest(c))
+
+ def test_write3(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt 0:10:count.txt\n',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual("012345678901234", writer.readfrom(0, 15))
+ writer.seek(7, os.SEEK_SET)
+ writer.write("foobar")
+ self.assertEqual(writer.size(), 20)
+ self.assertEqual("0123456foobar34", writer.readfrom(0, 15))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 3858f62230ac3c915f300c664312c63f+6 0:7:count.txt 10:6:count.txt 3:7:count.txt\n", export_manifest(c))
+
+ def test_write4(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:4:count.txt 0:4:count.txt 0:4:count.txt',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ self.assertEqual("012301230123", writer.readfrom(0, 15))
+ writer.seek(2, os.SEEK_SET)
+ writer.write("abcdefg")
+ self.assertEqual(writer.size(), 12)
+ self.assertEqual("01abcdefg123", writer.readfrom(0, 15))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 7ac66c0f148de9519b8bd264312c4d64+7 0:2:count.txt 10:7:count.txt 1:3:count.txt\n", export_manifest(c))
+
+ def test_write_large(self):
+ c = import_manifest('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+ keep=ArvadosFileWriterTestCase.MockKeep({}))
+ writer = c.open("count.txt", "r+")
+ text = ''.join(["0123456789" for a in xrange(0, 100)])
+ for b in xrange(0, 100000):
+ writer.write(text)
+ self.assertEqual(writer.size(), 100000000)
+ self.assertEqual(". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", export_manifest(c))
+
+ def test_write_rewrite0(self):
+ c = import_manifest('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+ keep=ArvadosFileWriterTestCase.MockKeep({}))
+ writer = c.open("count.txt", "r+")
+ for b in xrange(0, 10):
+ writer.seek(0, os.SEEK_SET)
+ writer.write("0123456789")
+ writer.arvadosfile._repack_writes()
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("0123456789", writer.readfrom(0, 20))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n", export_manifest(c))
+
+ def test_write_rewrite1(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ for b in xrange(0, 10):
+ writer.seek(10, os.SEEK_SET)
+ writer.write("abcdefghij")
+ writer.arvadosfile._repack_writes()
+ self.assertEqual(writer.size(), 20)
+ self.assertEqual("0123456789abcdefghij", writer.readfrom(0, 20))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n", export_manifest(c))
+
+ def test_write_rewrite2(self):
+ c = import_manifest('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt',
+ keep=ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}))
+ writer = c.open("count.txt", "r+")
+ for b in xrange(0, 10):
+ writer.seek(5, os.SEEK_SET)
+ writer.write("abcdefghij")
+ writer.arvadosfile._repack_writes()
+ self.assertEqual(writer.size(), 15)
+ self.assertEqual("01234abcdefghij", writer.readfrom(0, 20))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:5:count.txt 10:10:count.txt\n", export_manifest(c))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list