[ARVADOS] updated: 559f3e22645f92ba9d5b13c5a55bf495f38e9b8e
Git user
git at public.curoverse.com
Tue Feb 7 16:44:11 EST 2017
Summary of changes:
sdk/go/manifest/manifest.go | 178 +++++++++++++++++++++++++++------------
sdk/go/manifest/manifest_test.go | 71 +++++++++++++++-
2 files changed, 190 insertions(+), 59 deletions(-)
via 559f3e22645f92ba9d5b13c5a55bf495f38e9b8e (commit)
via e79722b0c9344451cf02c2986976c40a9daeb9dc (commit)
from 6045f3a69038dbc2f14d9141d450341c9611ec14 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 559f3e22645f92ba9d5b13c5a55bf495f38e9b8e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Feb 7 16:43:30 2017 -0500
9397: Improve manifest normalization efficiency
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index cc16090..d1c95b7 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -16,9 +16,6 @@ import (
var ErrInvalidToken = errors.New("Invalid token")
-var LocatorPattern = regexp.MustCompile(
- "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]+)*$")
-
type Manifest struct {
Text string
Err error
@@ -30,12 +27,6 @@ type BlockLocator struct {
Hints []string
}
-type DataSegment struct {
- BlockLocator
- Locator string
- StreamOffset uint64
-}
-
// FileSegment is a portion of a file that is contained within a
// single block.
type FileSegment struct {
@@ -52,10 +43,13 @@ type FileStreamSegment struct {
Name string
}
+type BlockRange struct{ Begin, End int64 }
+
// Represents a single line from a manifest.
type ManifestStream struct {
StreamName string
Blocks []string
+ BlockRanges []BlockRange
FileStreamSegments []FileStreamSegment
Err error
}
@@ -84,7 +78,7 @@ func unescapeSeq(seq string) string {
}
func EscapeName(s string) string {
- return strings.Replace(s, " ", "\\040", -1)
+ return strings.Replace(s, " ", `\040`, -1)
}
func UnescapeName(s string) string {
@@ -92,11 +86,11 @@ func UnescapeName(s string) string {
}
func ParseBlockLocator(s string) (b BlockLocator, err error) {
- if !LocatorPattern.MatchString(s) {
+ if !blockdigest.LocatorPattern.MatchString(s) {
err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
"\"%s\".",
s,
- LocatorPattern.String())
+ blockdigest.LocatorPattern.String())
} else {
tokens := strings.Split(s, "+")
var blockSize int64
@@ -145,6 +139,36 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm
return ch
}
+func (s *ManifestStream) FirstBlock(range_start int64) int64 {
+ // range_start/block_start is the inclusive lower bound
+ // range_end/block_end is the exclusive upper bound
+
+ hi := int64(len(s.BlockRanges))
+ var lo int64
+ i := ((hi + lo) / 2)
+ block_start := s.BlockRanges[i].Begin
+ block_end := s.BlockRanges[i].End
+
+ // perform a binary search for the first block
+ // assumes that all of the blocks are contiguous, so range_start is guaranteed
+ // to either fall into the range of a block or be outside the block range entirely
+ for !(range_start >= block_start && range_start < block_end) {
+ if lo == i {
+ // must be out of range, fail
+ return -1
+ }
+ if range_start > block_start {
+ lo = i
+ } else {
+ hi = i
+ i = ((hi + lo) / 2)
+ block_start = s.BlockRanges[i].Begin
+ block_end = s.BlockRanges[i].End
+ }
+ }
+ return i
+}
+
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
@@ -229,6 +253,19 @@ func parseManifestStream(s string) (m ManifestStream) {
return
}
+ m.BlockRanges = make([]BlockRange, len(m.Blocks))
+ var streamoffset int64
+ for i, b := range m.Blocks {
+ bl, err := ParseBlockLocator(b)
+ if err != nil {
+ m.Err = err
+ return
+ }
+ m.BlockRange[i].Begin = streamoffset
+ m.BlockRange[i].End = streamoffset + int64(bl.Size)
+ streamoffset = m.BlockRange[i].End
+ }
+
if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
return
@@ -262,6 +299,7 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest {
files := make(SegmentedManifest)
for stream := range m.StreamIter() {
+ currentStreamfiles := make(map[string]bool)
for _, f := range stream.FileStreamSegments {
sn := stream.StreamName
if sn != "." && !strings.HasPrefix(sn, "./") {
@@ -275,12 +313,13 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest {
if files[streamname] == nil {
files[streamname] = make(SegmentedStream)
}
- if files[streamname][filename] == nil {
- var segs SegmentedFile
- for seg := range m.FileSegmentIterByName(path) {
+ if !currentStreamfiles[path] {
+ segs := files[streamname][filename]
+ for seg := range stream.FileSegmentIterByName(path) {
segs = append(segs, *seg)
}
files[streamname][filename] = segs
+ currentStreamfiles[path] = true
}
}
}
@@ -352,9 +391,17 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
func (m *Manifest) NormalizeManifest() string {
segments := m.SegmentManifest()
+
+ var sortedstreams []string
+ for k, _ := range *segments {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
var manifest string
- for k, v := range *segments {
- manifest += v.NormalizeStream(k)
+ for _, k := range sortedstreams {
+ stream := (*segments)[k]
+ manifest += stream.NormalizeStream(k)
}
return manifest
}
@@ -377,18 +424,32 @@ func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
} else if stream, ok := (*m)[streamname]; ok {
// refers to a single file in a stream
newstream := make(SegmentedStream)
+ if relocate_filename == "" {
+ relocate_filename = filename
+ }
newstream[relocate_filename] = stream[filename]
return newstream.NormalizeStream(relocate_stream)
} else {
// refers to multiple streams
manifest := ""
- prefix := streamname
+ prefix := path
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
- for k, v := range *m {
+ if !strings.HasSuffix(relocate, "/") {
+ relocate += "/"
+ }
+
+ var sortedstreams []string
+ for k, _ := range *m {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
+ for _, k := range sortedstreams {
if strings.HasPrefix(k, prefix) {
- manifest += v.NormalizeStream(relocate_stream + k[len(prefix)-1:])
+ v := (*m)[k]
+ manifest += v.NormalizeStream(relocate + k[len(prefix):])
}
}
return manifest
diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go
index 7d7d45f..fe83a73 100644
--- a/sdk/go/manifest/manifest_test.go
+++ b/sdk/go/manifest/manifest_test.go
@@ -253,7 +253,72 @@ func TestBlockIterWithBadManifest(t *testing.T) {
}
func TestNormalizeManifest(t *testing.T) {
- m := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
- normalized := m.ManifestForPath("", "")
- expectEqual(t, normalized, ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:20:two\n")
+ m1 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m1.NormalizeManifest(),
+ `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt
+`)
+
+ m2 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 b9677abbac956bd3e86b1deb28dfac03+67108864 fc15aff2a762b13f521baf042140acec+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:227212247:var-GS000016015-ASM.tsv.bz2
+`}
+ expectEqual(t, m2.NormalizeManifest(), m2.Text)
+
+ m3 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 3:40:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m3.NormalizeManifest(), `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 3:124:md5sum.txt
+`)
+
+ m4 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar
+`}
+
+ expectEqual(t, m4.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ expectEqual(t, m4.ManifestForPath("./foo", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo", "./baz"), "./baz 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./baz"), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:baz 67108864:3:baz\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./quux/"), "./quux 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath(".", "."), `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+ expectEqual(t, m4.ManifestForPath(".", "./zip"), `./zip/foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zip/zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ m5 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 204e43b8a1185621ca55a94839582e6f+67108864 3:3:bar
+`}
+ expectEqual(t, m5.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 0:6:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ // with self.data_file('1000G_ref_manifest') as f6:
+ // m6 = f6.read()
+ // self.assertEqual(arvados.CollectionReader(m6, self.api_client).manifest_text(normalize=True), m6)
+
+ // with self.data_file('jlake_manifest') as f7:
+ // m7 = f7.read()
+ // self.assertEqual(arvados.CollectionReader(m7, self.api_client).manifest_text(normalize=True), m7)
+
+ m8 := Manifest{Text: `./a\040b\040c 59ca0efa9f5633cb0371bbc0355478d8+13 0:13:hello\040world.txt
+`}
+ expectEqual(t, m8.NormalizeManifest(), m8.Text)
+
+ m9 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m9.ManifestForPath("", ""), ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:20:two\n")
+
+ m10 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m10.ManifestForPath("./two", "./three"), ". acbd18db4cc2f85cedef654fccc4a4d8+40 20:20:three\n")
+
}
commit e79722b0c9344451cf02c2986976c40a9daeb9dc
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Feb 7 13:47:24 2017 -0500
9397: Syntax/logic fixes
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index f2fb9b5..cc16090 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -84,7 +84,7 @@ func unescapeSeq(seq string) string {
}
func EscapeName(s string) string {
- return s.ReplaceAll(" ", "\\040")
+ return strings.Replace(s, " ", "\\040", -1)
}
func UnescapeName(s string) string {
@@ -148,7 +148,10 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
- target := "./" + filepath
+ target := filepath
+ if !strings.HasPrefix(target, "./") {
+ target = "./" + target
+ }
for _, fTok := range s.FileStreamSegments {
wantPos := fTok.SegPos
wantLen := fTok.SegLen
@@ -252,6 +255,7 @@ func SplitPath(path string) (streamname, filename string) {
streamname = path
filename = ""
}
+ return
}
func (m *Manifest) SegmentManifest() *SegmentedManifest {
@@ -260,10 +264,10 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest {
for stream := range m.StreamIter() {
for _, f := range stream.FileStreamSegments {
sn := stream.StreamName
- if sn != "." && !sn.StartsWith("./") {
+ if sn != "." && !strings.HasPrefix(sn, "./") {
sn = "./" + sn
}
- if sn.EndsWith("/") {
+ if strings.HasSuffix(sn, "/") {
sn = sn[0 : len(sn)-1]
}
path := sn + "/" + f.Name
@@ -272,24 +276,24 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest {
files[streamname] = make(SegmentedStream)
}
if files[streamname][filename] == nil {
- var segs []FileSegment
- for seg := range FileSegmentIterByName(name) {
- segs = append(segs, seg)
+ var segs SegmentedFile
+ for seg := range m.FileSegmentIterByName(path) {
+ segs = append(segs, *seg)
}
files[streamname][filename] = segs
}
}
}
- return files
+ return &files
}
func (stream *SegmentedStream) NormalizeStream(name string) string {
var sortedfiles []string
- for k, _ := range stream {
- sortedfiles.append(sortedfiles, k)
+ for k, _ := range *stream {
+ sortedfiles = append(sortedfiles, k)
}
- strings.Sort(sortedfiles)
+ sort.Strings(sortedfiles)
stream_tokens := []string{EscapeName(name)}
@@ -298,12 +302,12 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
// Go through each file and add each referenced block exactly once.
for _, streamfile := range sortedfiles {
- for _, segment := range stream[streamfile] {
+ for _, segment := range (*stream)[streamfile] {
if _, ok := blocks[segment.Locator]; !ok {
stream_tokens = append(stream_tokens, segment.Locator)
blocks[segment.Locator] = streamoffset
- b, err := ParseBlockLocator(segment.Locator)
- streamoffset += b.Size
+ b, _ := ParseBlockLocator(segment.Locator)
+ streamoffset += int64(b.Size)
}
}
}
@@ -316,40 +320,40 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
// Add in file segments
span_start := int64(-1)
span_end := int64(0)
- fout = EscapeName(streamfile)
- for _, segment := range stream[streamfile] {
+ fout := EscapeName(streamfile)
+ for _, segment := range (*stream)[streamfile] {
// Collapse adjacent segments
- streamoffset = blocks[segment.Locator] + segment.Offset
+ streamoffset = blocks[segment.Locator] + int64(segment.Offset)
if span_start == -1 {
span_start = streamoffset
- span_end = streamoffset + segment.Len
+ span_end = streamoffset + int64(segment.Len)
} else {
if streamoffset == span_end {
- span_end += segment.Len
+ span_end += int64(segment.Len)
} else {
- stream_tokens = append(stream_tokens, fmt.Sprintf("%i:%i:%s", span_start, span_end-span_start, fout))
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
span_start = streamoffset
- span_end = streamoffset + segment.Len
+ span_end = streamoffset + int64(segment.Len)
}
}
}
if span_start != -1 {
- stream_tokens = append(stream_tokens, fmt.Sprintf("%i:%i:%s", span_start, span_end-span_start, fout))
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
}
- if len(stream[streamfile]) == 0 {
+ if len((*stream)[streamfile]) == 0 {
stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
}
}
- return string.Join(" ", stream_tokens) + "\n"
+ return strings.Join(stream_tokens, " ") + "\n"
}
func (m *Manifest) NormalizeManifest() string {
segments := m.SegmentManifest()
var manifest string
- for k, v := range segments {
+ for k, v := range *segments {
manifest += v.NormalizeStream(k)
}
return manifest
@@ -359,42 +363,40 @@ func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
if path == "" {
path = "."
}
+ if relocate == "" {
+ relocate = "."
+ }
streamname, filename := SplitPath(path)
var relocate_stream, relocate_filename string
- if relocate != "" {
- relocate_stream, relocate_filename = SplitPath(relocate)
- } else {
- relocate_stream = streamname
- relocate_filename = filename
- }
+ relocate_stream, relocate_filename = SplitPath(relocate)
- if stream, ok := m[path]; ok {
+ if stream, ok := (*m)[path]; ok {
// refers to a single stream
- return stream.NormalizeManifest(relocate)
- } else if stream, ok := m[stream]; ok {
+ return stream.NormalizeStream(relocate)
+ } else if stream, ok := (*m)[streamname]; ok {
// refers to a single file in a stream
newstream := make(SegmentedStream)
newstream[relocate_filename] = stream[filename]
- return newstream.NormalizeManifest(relocate_stream)
+ return newstream.NormalizeStream(relocate_stream)
} else {
// refers to multiple streams
manifest := ""
prefix := streamname
- if !prefix.EndsWith("/") {
+ if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
- for k, v := range m {
- if k.StartsWith(prefix) {
- manifest += v.NormalizeManifest(relocate_stream + k[len(prefix)-1:])
+ for k, v := range *m {
+ if strings.HasPrefix(k, prefix) {
+ manifest += v.NormalizeStream(relocate_stream + k[len(prefix)-1:])
}
}
return manifest
}
}
-func (m *Manifest) ManifestForPath(path string) string {
- return m.SegmentedManifest().ManifestForPath(path)
+func (m *Manifest) ManifestForPath(path, relocate string) string {
+ return m.SegmentManifest().ManifestForPath(path, relocate)
}
func (m *Manifest) StreamIter() <-chan ManifestStream {
@@ -421,9 +423,12 @@ func (m *Manifest) StreamIter() <-chan ManifestStream {
func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
ch := make(chan *FileSegment)
+ if !strings.HasPrefix(filepath, "./") {
+ filepath = "./" + filepath
+ }
go func() {
for stream := range m.StreamIter() {
- if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
+ if !strings.HasPrefix(filepath, stream.StreamName+"/") {
continue
}
stream.sendFileSegmentIterByName(filepath, ch)
diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go
index 0d58a9e..7d7d45f 100644
--- a/sdk/go/manifest/manifest_test.go
+++ b/sdk/go/manifest/manifest_test.go
@@ -253,7 +253,7 @@ func TestBlockIterWithBadManifest(t *testing.T) {
}
func TestNormalizeManifest(t *testing.T) {
- m := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 10:10:one 20:10:two 30:10:two\n"}
- normalized := m.NormalizedManifestForPath("")
- expectEqual(t, normalized, ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:40:two\n")
+ m := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ normalized := m.ManifestForPath("", "")
+ expectEqual(t, normalized, ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:20:two\n")
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list