[ARVADOS] created: e0d33f390ffb609332d966fe0ce43ada6188d890
git at public.curoverse.com
git at public.curoverse.com
Thu Feb 13 18:05:58 EST 2014
at e0d33f390ffb609332d966fe0ce43ada6188d890 (commit)
commit e0d33f390ffb609332d966fe0ce43ada6188d890
Author: Peter Amstutz <tetron at peter.shell.qr1hi.arvadosapi.com>
Date: Thu Feb 13 23:05:51 2014 +0000
working on normalization
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index ea98d00..3d0005a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -23,6 +23,60 @@ from stream import *
import config
import errors
+def normalize(collection):
+ streams = {}
+ for s in collection.all_streams():
+ for f in s.all_files():
+ filestream = s.name() + "/" + f.name()
+ r = filestream.rindex("/")
+ streamname = filestream[:r]
+ filename = filestream[r+1:]
+ if streamname not in streams:
+ streams[streamname] = {}
+ if filename not in streams[streamname]:
+ streams[streamname][filename] = []
+ print streamname, filename
+ streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
+
+ manifest = ""
+ sortedstreams = list(streams.keys())
+ sortedstreams.sort()
+ import pprint
+ pprint.pprint(streams)
+ for s in sortedstreams:
+ stream = streams[s]
+ manifest += s
+ sortedfiles = list(stream.keys())
+ sortedfiles.sort()
+ for f in sortedfiles:
+ fn = stream[f]
+ for chunk in fn:
+ manifest += " " + chunk[StreamReader.LOCATOR]
+ for f in sortedfiles:
+ fn = stream[f]
+ streamoffset = 0L
+ fileoffset = 0L
+ easy = True
+ for chunk in fn:
+ if chunk[StreamReader.CHUNKOFFSET] != 0 or streamoffset != fileoffset:
+ easy = False
+ streamoffset += chunk[StreamReader.BLOCKSIZE]
+ fileoffset += chunk[StreamReader.CHUNKSIZE]
+
+ if easy:
+ manifest += " " + "{0}:{1}:{2}".format(0, fileoffset, f)
+ else:
+ streamoffset = 0
+ fileoffset = 0
+ # not easy
+ for chunk in fn:
+ manifest += " " + "{0}:{1}:{2}".format(streamoffset + chunk[StreamReader.CHUNKOFFSET], chunk[StreamReader.CHUNKSIZE], f)
+ streamoffset += chunk[StreamReader.BLOCKSIZE]
+ fileoffset += chunk[StreamReader.CHUNKSIZE]
+
+ manifest += "\n"
+ return manifest
+
class CollectionReader(object):
def __init__(self, manifest_locator_or_text):
if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 0d0caee..d587c81 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -101,6 +101,9 @@ class StreamFileReader(object):
if data != '':
yield data
+ def stream_offset(self):
+ return self._pos
+
def as_manifest(self):
if self.size() == 0:
return ("%s %s 0:0:%s\n"
@@ -114,7 +117,7 @@ class StreamReader(object):
self._current_datablock_data = None
self._current_datablock_pos = 0
self._current_datablock_index = -1
- self._pos = 0
+ self._pos = 0L
self._stream_name = None
self.data_locators = []
@@ -127,7 +130,7 @@ class StreamReader(object):
self.data_locators += [tok]
elif re.search(r'^\d+:\d+:\S+', tok):
pos, size, name = tok.split(':',2)
- self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
+ self.files += [[long(pos), long(size), name.replace('\\040', ' ')]]
else:
raise errors.SyntaxError("Invalid manifest format")
@@ -163,6 +166,47 @@ class StreamReader(object):
resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
return resp
+ LOCATOR = 0
+ BLOCKSIZE = 1
+ CHUNKOFFSET = 2
+ CHUNKSIZE = 3
+
+ def locators_and_ranges(self, range_start, range_size):
+ '''returns list of [block locator, blocksize, chunk offset, chunk size] that satisfies the range'''
+ print 'locators_and_ranges', range_start, range_size
+ resp = []
+ return_all_tokens = False
+ range_start = long(range_start)
+ range_size = long(range_size)
+ range_end = range_start + range_size
+ block_start = 0L
+ for locator in self.data_locators:
+ sizehint = re.search(r'[0-9a-f]{32}\+(\d+)', locator)
+ if not sizehint:
+ raise Exception("Manifest must include block sizes to be normalized")
+ block_size = long(sizehint.group(1))
+ block_end = block_start + block_size
+ if range_end < block_start:
+ # range ends before this block starts, so don't look at any more locators
+ break
+ if range_start > block_end:
+ # range starts after this block ends, so go to next block
+ next
+ elif range_start >= block_start and range_end <= block_end:
+ # range starts and ends in this block
+ resp.append([locator, block_size, range_start - block_start, range_size])
+ elif range_start >= block_start:
+ # range starts in this block
+ resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+ elif range_start < block_start and range_end > block_end:
+ # range starts in a previous block and extends to further blocks
+ resp.append([locator, block_size, 0L, block_size])
+ elif range_start < block_start and range_end <= block_end:
+ # range starts in a previous block and ends in this block
+ resp.append([locator, block_size, 0L, range_end - block_start])
+ block_start = block_end
+ return resp
+
def name(self):
return self._stream_name
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 913c888..1879b5f 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -205,3 +205,51 @@ class LocalCollectionGzipDecompressionTest(unittest.TestCase):
self.assertEqual(got,
n_lines_in,
"decompression returned %d lines instead of %d" % (got, n_lines_in))
+
+class NormalizedCollectionTest(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def runTest(self):
+ m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
+ self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m1)),
+ """. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt
+""")
+
+ m2 = """. 204e43b8a1185621ca55a94839582e6f+67108864 b9677abbac956bd3e86b1deb28dfac03+67108864 fc15aff2a762b13f521baf042140acec+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:227212247:var-GS000016015-ASM.tsv.bz2
+"""
+ self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m2)), m2)
+
+ m3 = """. 5348b82a029fd9e971a811ce1f71360b+43 3:40:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
+ self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m3)),
+ """. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 3:40:md5sum.txt 43:41:md5sum.txt 84:43:md5sum.txt
+""")
+
+ m4 = """. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar"""
+ self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m4)),
+ """./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+""")
+
+ m5 = """. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 204e43b8a1185621ca55a94839582e6f+67108864 3:3:bar"""
+ self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m5)),
+ """./foo 204e43b8a1185621ca55a94839582e6f+67108864 204e43b8a1185621ca55a94839582e6f+67108864 0:3:bar 67108867:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+""")
+
+ with open('gatkmanifest') as f6:
+ m6 = f6.read()
+ m6n = arvados.collection.normalize(arvados.CollectionReader(m6))
+ with open('gatkmanifest_normalized', 'w') as f6n:
+ f6n.write(m6n)
+
+ #self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m6)), m6)
+
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list