[ARVADOS] updated: 57e0202fcbec42bf9f9aabade183c66551be0a88

git at public.curoverse.com git at public.curoverse.com
Wed Aug 6 10:27:22 EDT 2014


Summary of changes:
 crunch_scripts/split-fastq.py | 106 ++++++++++++++++++++++++++----------------
 1 file changed, 67 insertions(+), 39 deletions(-)

       via  57e0202fcbec42bf9f9aabade183c66551be0a88 (commit)
       via  f2263977500c541ae8eefdd1462264593b979d63 (commit)
       via  290274d8413e674d087542f247e944c1277bae2b (commit)
       via  86f97fe1bf8ebb7fb9b3c8c730687074d44d4ad8 (commit)
       via  a73069f409eab6d7ee0c4935097805b228e0e67c (commit)
       via  363d128327963a7c1d93992613a741af4d5f55fd (commit)
      from  d94d24f806debac76b29896bf486067be3cb084c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 57e0202fcbec42bf9f9aabade183c66551be0a88
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 20:57:03 2014 +0000

    Disable chunking

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index bbf267f..ece593d 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -18,7 +18,7 @@ prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
 
 manifest_list = []
 
-chunking = arvados.getjobparam('chunking')
+chunking = False #arvados.getjobparam('chunking')
 
 def nextline(reader, start):
     n = -1

commit f2263977500c541ae8eefdd1462264593b979d63
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 20:10:17 2014 +0000

    Chunking fastq seems to work, but is very slow.

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index e8c8712..bbf267f 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -16,55 +16,73 @@ inp = arvados.CollectionReader(arvados.getjobparam('reads'))
 
 prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
 
-manifest_text = ""
+manifest_list = []
+
+chunking = arvados.getjobparam('chunking')
 
-def readline(reader, start):
-    line = ""
+def nextline(reader, start):
     n = -1
-    while n == -1:
-        r = reader.readfrom(start, 1024)
+    while True:
+        r = reader.readfrom(start, 128)
         if r == '':
             break
         n = string.find(r, "\n")
-        line += r[0:n]
-        start += len(r)
-    return line
+        if n > -1:
+            break
+        else:
+            start += 128
+    return n
 
 def splitfastq(p):
     for i in xrange(0, len(p)):
         p[i]["start"] = 0
         p[i]["end"] = 0
 
-    while True:
-        recordsize = [0, 0]
+    count = 0
+    recordsize = [0, 0]
 
-        # read 4 lines starting at "start"
-        for ln in xrange(0, 4):
-            for i in xrange(0, len(p)):
-                r = readline(p[i]["reader"], p[i]["start"])
-                if r == '':
-                    return
-                recordsize[i] += len(r)
+    global piece
+    finish = False
+    while not finish:
+        for i in xrange(0, len(p)):
+            recordsize[i] = 0
 
-        splitnow = False
+        # read next 4 lines
+        for i in xrange(0, len(p)):
+            for ln in xrange(0, 4):
+                r = nextline(p[i]["reader"], p[i]["end"]+recordsize[i])
+                if r == -1:
+                    finish = True
+                    break
+                recordsize[i] += (r+1)
+
+        splitnow = finish
         for i in xrange(0, len(p)):
-            if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= arvados.BLOCKSIZE:
+            if ((p[i]["end"] - p[i]["start"]) + recordsize[i]) >= (64*1024*1024):
                 splitnow = True
 
         if splitnow:
             for i in xrange(0, len(p)):
-                global piece
-                global manifest_text
+                global manifest_list
+                print "Finish piece ./_%s/%s (%s %s)" % (piece, p[i]["reader"].name(), p[i]["start"], p[i]["end"])
                 manifest = []
-                manifest.extend("./_" + str(piece))
-                manifest.extend([d[arvados.LOCATOR] for d in p["reader"]._stream._data_locators])
-                manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR], seg[arvados.BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
-                manifest_text += manifest.join(" ") + "\n"
+                manifest.extend(["./_" + str(piece)])
+                manifest.extend([d[arvados.LOCATOR] for d in p[i]["reader"]._stream._data_locators])
+
+                print p[i]
+                print arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])
+
+                manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR]+seg[arvados.OFFSET], seg[arvados.SEGMENTSIZE], p[i]["reader"].name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
+                manifest_list.append(manifest)
+                print "Finish piece %s" % (" ".join(manifest))
                 p[i]["start"] = p[i]["end"]
+            piece += 1
         else:
             for i in xrange(0, len(p)):
                 p[i]["end"] += recordsize[i]
-
+            count += 1
+            if count % 10000 == 0:
+                print "Record %s at %s" % (count, p[i]["end"])
 
 for s in inp.all_streams():
     if s.name() == ".":
@@ -77,15 +95,17 @@ for s in inp.all_streams():
                     p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
                 else:
                     p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
-                splitfastq(p)
-                #m0 = p[0]["reader"].as_manifest()[1:]
-                #m1 = p[1]["reader"].as_manifest()[1:]
-                #manifest_text += "./_" + str(piece) + m0
-                #manifest_text += "./_" + str(piece) + m1
-                piece += 1
+                if chunking:
+                    splitfastq(p)
+                else:
+                    m0 = p[0]["reader"].as_manifest()[1:]
+                    m1 = p[1]["reader"].as_manifest()[1:]
+                    manifest_list.append(["./_" + str(piece), m0[:-1]])
+                    manifest_list.append(["./_" + str(piece), m1[:-1]])
+                    piece += 1
 
 # No pairs found so just put each fastq file into a separate directory
-if manifest_text == "":
+if len(manifest_list) == 0:
     for s in inp.all_streams():
         prog = re.compile("(.*?).fastq(.gz)?$")
         if s.name() == ".":
@@ -94,9 +114,13 @@ if manifest_text == "":
                 if result != None:
                     p = [{}]
                     p[0]["reader"] = s.files()[result.group(0)]
-                    splitfastq(p)
-                    #m0 = p[0]["reader"].as_manifest()[1:]
-                    #manifest_text += "./_" + str(piece) + m0
-                    piece += 1
+                    if chunking:
+                        splitfastq(p)
+                    else:
+                        m0 = p[0]["reader"].as_manifest()[1:]
+                        manifest_list.append(["./_" + str(piece), m0])
+                        piece += 1
+
+manifest_text = "\n".join(" ".join(m) for m in manifest_list)
 
 arvados.current_task().set_output(manifest_text)

commit 290274d8413e674d087542f247e944c1277bae2b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 13:38:20 2014 -0400

    arvados.LOCATOR

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index 128e9f6..e8c8712 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -57,8 +57,8 @@ def splitfastq(p):
                 global manifest_text
                 manifest = []
                 manifest.extend("./_" + str(piece))
-                manifest.extend([d[LOCATOR] for d in p["reader"]._stream._data_locators])
-                manifest.extend(["{}:{}:{}".format(seg[LOCATOR], seg[BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
+                manifest.extend([d[arvados.LOCATOR] for d in p["reader"]._stream._data_locators])
+                manifest.extend(["{}:{}:{}".format(seg[arvados.LOCATOR], seg[arvados.BLOCKSIZE], self.name().replace(' ', '\\040')) for seg in arvados.locators_and_ranges(p[i]["reader"].segments, p[i]["start"], p[i]["end"] - p[i]["start"])])
                 manifest_text += manifest.join(" ") + "\n"
                 p[i]["start"] = p[i]["end"]
         else:

commit 86f97fe1bf8ebb7fb9b3c8c730687074d44d4ad8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 13:37:11 2014 -0400

    import string

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index 846d84f..128e9f6 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -3,6 +3,7 @@
 import arvados
 import re
 import hashlib
+import string
 
 api = arvados.api('v1')
 

commit a73069f409eab6d7ee0c4935097805b228e0e67c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 13:36:14 2014 -0400

    Fix syntax error

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index a279198..846d84f 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -75,7 +75,7 @@ for s in inp.all_streams():
                 if result.group(2) != None:
                     p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
                 else:
-                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"
+                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"]
                 splitfastq(p)
                 #m0 = p[0]["reader"].as_manifest()[1:]
                 #m1 = p[1]["reader"].as_manifest()[1:]

commit 363d128327963a7c1d93992613a741af4d5f55fd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Aug 5 13:35:05 2014 -0400

    Check for None

diff --git a/crunch_scripts/split-fastq.py b/crunch_scripts/split-fastq.py
index fc61142..a279198 100755
--- a/crunch_scripts/split-fastq.py
+++ b/crunch_scripts/split-fastq.py
@@ -13,7 +13,7 @@ manifest_text = ""
 
 inp = arvados.CollectionReader(arvados.getjobparam('reads'))
 
-prog = re.compile("(.*?)_1.fastq(.gz)?$")
+prog = re.compile(r'(.*?)_1.fastq(.gz)?$')
 
 manifest_text = ""
 
@@ -72,7 +72,10 @@ for s in inp.all_streams():
             if result != None:
                 p = [{}, {}]
                 p[0]["reader"] = s.files()[result.group(0)]
-                p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
+                if result.group(2) != None:
+                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq" + result.group(2)]
+                else:
+                    p[1]["reader"] = s.files()[result.group(1) + "_2.fastq"
                 splitfastq(p)
                 #m0 = p[0]["reader"].as_manifest()[1:]
                 #m1 = p[1]["reader"].as_manifest()[1:]

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list