[ARVADOS] created: e0d33f390ffb609332d966fe0ce43ada6188d890

git at public.curoverse.com git at public.curoverse.com
Thu Feb 13 18:05:58 EST 2014


        at  e0d33f390ffb609332d966fe0ce43ada6188d890 (commit)


commit e0d33f390ffb609332d966fe0ce43ada6188d890
Author: Peter Amstutz <tetron at peter.shell.qr1hi.arvadosapi.com>
Date:   Thu Feb 13 23:05:51 2014 +0000

    working on normalization

diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
index ea98d00..3d0005a 100644
--- a/sdk/python/arvados/collection.py
+++ b/sdk/python/arvados/collection.py
@@ -23,6 +23,60 @@ from stream import *
 import config
 import errors
 
+def normalize(collection):
+    streams = {}
+    for s in collection.all_streams():
+        for f in s.all_files():
+            filestream = s.name() + "/" + f.name()
+            r = filestream.rindex("/")
+            streamname = filestream[:r]
+            filename = filestream[r+1:]
+            if streamname not in streams:
+                streams[streamname] = {}
+            if filename not in streams[streamname]:
+                streams[streamname][filename] = []
+            print streamname, filename
+            streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
+            
+    manifest = ""
+    sortedstreams = list(streams.keys())
+    sortedstreams.sort()
+    import pprint
+    pprint.pprint(streams)
+    for s in sortedstreams:
+        stream = streams[s]
+        manifest += s
+        sortedfiles = list(stream.keys())
+        sortedfiles.sort()
+        for f in sortedfiles:
+            fn = stream[f]
+            for chunk in fn:
+                manifest += " " + chunk[StreamReader.LOCATOR]
+        for f in sortedfiles:
+            fn = stream[f]
+            streamoffset = 0L
+            fileoffset = 0L
+            easy = True
+            for chunk in fn:
+                if chunk[StreamReader.CHUNKOFFSET] != 0 or streamoffset != fileoffset:
+                    easy = False
+                streamoffset += chunk[StreamReader.BLOCKSIZE]
+                fileoffset += chunk[StreamReader.CHUNKSIZE]
+
+            if easy:
+                manifest += " " + "{0}:{1}:{2}".format(0, fileoffset, f)
+            else:
+                streamoffset = 0
+                fileoffset = 0
+                # not easy
+                for chunk in fn:
+                    manifest += " " + "{0}:{1}:{2}".format(streamoffset + chunk[StreamReader.CHUNKOFFSET], chunk[StreamReader.CHUNKSIZE], f)
+                    streamoffset += chunk[StreamReader.BLOCKSIZE]
+                    fileoffset += chunk[StreamReader.CHUNKSIZE]
+
+        manifest += "\n"
+    return manifest
+
 class CollectionReader(object):
     def __init__(self, manifest_locator_or_text):
         if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
index 0d0caee..d587c81 100644
--- a/sdk/python/arvados/stream.py
+++ b/sdk/python/arvados/stream.py
@@ -101,6 +101,9 @@ class StreamFileReader(object):
         if data != '':
             yield data
 
+    def stream_offset(self):
+        return self._pos
+
     def as_manifest(self):
         if self.size() == 0:
             return ("%s %s 0:0:%s\n"
@@ -114,7 +117,7 @@ class StreamReader(object):
         self._current_datablock_data = None
         self._current_datablock_pos = 0
         self._current_datablock_index = -1
-        self._pos = 0
+        self._pos = 0L
 
         self._stream_name = None
         self.data_locators = []
@@ -127,7 +130,7 @@ class StreamReader(object):
                 self.data_locators += [tok]
             elif re.search(r'^\d+:\d+:\S+', tok):
                 pos, size, name = tok.split(':',2)
-                self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
+                self.files += [[long(pos), long(size), name.replace('\\040', ' ')]]
             else:
                 raise errors.SyntaxError("Invalid manifest format")
 
@@ -163,6 +166,47 @@ class StreamReader(object):
                 resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
         return resp
 
+    LOCATOR = 0
+    BLOCKSIZE = 1
+    CHUNKOFFSET = 2
+    CHUNKSIZE = 3
+
+    def locators_and_ranges(self, range_start, range_size):
+        '''returns list of [block locator, blocksize, chunk offset, chunk size] that satisfies the range'''
+        print 'locators_and_ranges', range_start, range_size
+        resp = []
+        return_all_tokens = False
+        range_start = long(range_start)
+        range_size = long(range_size)
+        range_end = range_start + range_size
+        block_start = 0L
+        for locator in self.data_locators:
+            sizehint = re.search(r'[0-9a-f]{32}\+(\d+)', locator)
+            if not sizehint:
+                raise Exception("Manifest must include block sizes to be normalized")
+            block_size = long(sizehint.group(1))
+            block_end = block_start + block_size
+            if range_end < block_start:
+                # range ends before this block starts, so don't look at any more locators
+                break
+            if range_start > block_end:
+                # range starts after this block ends, so go to next block
+                next
+            elif range_start >= block_start and range_end <= block_end:
+                # range starts and ends in this block
+                resp.append([locator, block_size, range_start - block_start, range_size])
+            elif range_start >= block_start:
+                # range starts in this block
+                resp.append([locator, block_size, range_start - block_start, block_end - range_start])
+            elif range_start < block_start and range_end > block_end:
+                # range starts in a previous block and extends to further blocks
+                resp.append([locator, block_size, 0L, block_size])
+            elif range_start < block_start and range_end <= block_end:
+                # range starts in a previous block and ends in this block
+                resp.append([locator, block_size, 0L, range_end - block_start])
+            block_start = block_end
+        return resp
+
     def name(self):
         return self._stream_name
 
diff --git a/sdk/python/test_collections.py b/sdk/python/test_collections.py
index 913c888..1879b5f 100644
--- a/sdk/python/test_collections.py
+++ b/sdk/python/test_collections.py
@@ -205,3 +205,51 @@ class LocalCollectionGzipDecompressionTest(unittest.TestCase):
         self.assertEqual(got,
                          n_lines_in,
                          "decompression returned %d lines instead of %d" % (got, n_lines_in))
+
+class NormalizedCollectionTest(unittest.TestCase):
+    def setUp(self):
+        pass
+
+    def runTest(self):
+        m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
+        self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m1)),
+                         """. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt
+""")
+
+        m2 = """. 204e43b8a1185621ca55a94839582e6f+67108864 b9677abbac956bd3e86b1deb28dfac03+67108864 fc15aff2a762b13f521baf042140acec+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:227212247:var-GS000016015-ASM.tsv.bz2
+"""
+        self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m2)), m2)
+
+        m3 = """. 5348b82a029fd9e971a811ce1f71360b+43 3:40:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt"""
+        self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m3)),
+                         """. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 3:40:md5sum.txt 43:41:md5sum.txt 84:43:md5sum.txt
+""")
+
+        m4 = """. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar"""
+        self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m4)),
+                         """./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+""")
+
+        m5 = """. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 204e43b8a1185621ca55a94839582e6f+67108864 3:3:bar"""
+        self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m5)),
+                         """./foo 204e43b8a1185621ca55a94839582e6f+67108864 204e43b8a1185621ca55a94839582e6f+67108864 0:3:bar 67108867:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+""")
+
+        with open('gatkmanifest') as f6:
+            m6 = f6.read()
+            m6n = arvados.collection.normalize(arvados.CollectionReader(m6))
+            with open('gatkmanifest_normalized', 'w') as f6n:
+                f6n.write(m6n)
+            
+            #self.assertEqual(arvados.collection.normalize(arvados.CollectionReader(m6)), m6)
+

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list