[ARVADOS] created: ab6a70e86dd041f3b4da167c59e3e91309f14365
Git user
git at public.curoverse.com
Tue Feb 7 19:14:24 EST 2017
at ab6a70e86dd041f3b4da167c59e3e91309f14365 (commit)
commit ab6a70e86dd041f3b4da167c59e3e91309f14365
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 6 17:16:01 2017 -0500
9397: Add manifest normalization and sub-manifest extraction by path.
Introduces "SegmentedManifest" which stores streams -> files -> file segments.
Enables reexport of manifest in normalized form, as well as extraction of
individual files, streams or sets of streams. Also adds binary search for
efficiently determining first block to access for some stream offset.
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index 22b1c97..362baf8 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -9,15 +9,13 @@ import (
"fmt"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"regexp"
+ "sort"
"strconv"
"strings"
)
var ErrInvalidToken = errors.New("Invalid token")
-var LocatorPattern = regexp.MustCompile(
- "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9 at _-]+)*$")
-
type Manifest struct {
Text string
Err error
@@ -29,12 +27,6 @@ type BlockLocator struct {
Hints []string
}
-type DataSegment struct {
- BlockLocator
- Locator string
- StreamOffset uint64
-}
-
// FileSegment is a portion of a file that is contained within a
// single block.
type FileSegment struct {
@@ -55,10 +47,20 @@ type FileStreamSegment struct {
type ManifestStream struct {
StreamName string
Blocks []string
+ BlockOffsets []uint64
FileStreamSegments []FileStreamSegment
Err error
}
+// Array of segments referencing file content
+type SegmentedFile []FileSegment
+
+// Map of files to list of file segments referencing file content
+type SegmentedStream map[string]SegmentedFile
+
+// Map of streams
+type SegmentedManifest map[string]SegmentedStream
+
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
func unescapeSeq(seq string) string {
@@ -73,16 +75,20 @@ func unescapeSeq(seq string) string {
return string([]byte{byte(i)})
}
+func EscapeName(s string) string {
+ return strings.Replace(s, " ", `\040`, -1)
+}
+
func UnescapeName(s string) string {
return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
}
func ParseBlockLocator(s string) (b BlockLocator, err error) {
- if !LocatorPattern.MatchString(s) {
+ if !blockdigest.LocatorPattern.MatchString(s) {
err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
"\"%s\".",
s,
- LocatorPattern.String())
+ blockdigest.LocatorPattern.String())
} else {
tokens := strings.Split(s, "+")
var blockSize int64
@@ -123,7 +129,7 @@ func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
}
func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
- ch := make(chan *FileSegment)
+ ch := make(chan *FileSegment, 64)
go func() {
s.sendFileSegmentIterByName(filepath, ch)
close(ch)
@@ -131,10 +137,42 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm
return ch
}
+func FirstBlock(offsets []uint64, range_start uint64) int {
+ // range_start/block_start is the inclusive lower bound
+ // range_end/block_end is the exclusive upper bound
+
+ hi := len(offsets) - 1
+ var lo int
+ i := ((hi + lo) / 2)
+ block_start := offsets[i]
+ block_end := offsets[i+1]
+
+ // perform a binary search for the first block
+ // assumes that all of the blocks are contiguous, so range_start is guaranteed
+ // to either fall into the range of a block or be outside the block range entirely
+ for !(range_start >= block_start && range_start < block_end) {
+ if lo == i {
+ // must be out of range, fail
+ return -1
+ }
+ if range_start > block_start {
+ lo = i
+ } else {
+ hi = i
+ i = ((hi + lo) / 2)
+ block_start = offsets[i]
+ block_end = offsets[i+1]
+ }
+ }
+ return i
+}
+
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
- blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
- target := "./" + filepath
+ target := filepath
+ if !strings.HasPrefix(target, "./") {
+ target = "./" + target
+ }
for _, fTok := range s.FileStreamSegments {
wantPos := fTok.SegPos
wantLen := fTok.SegLen
@@ -147,43 +185,41 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
continue
}
- // Linear search for blocks containing data for this
- // file
- var blockPos uint64 = 0 // position of block in stream
- for i, loc := range s.Blocks {
- if blockPos >= wantPos+wantLen {
+
+ // Binary search to determine first block in the stream
+ i := FirstBlock(s.BlockOffsets, wantPos)
+ if i == -1 {
+ // error
+ break
+ }
+ for i < len(s.Blocks) {
+ blockPos := s.BlockOffsets[i]
+ blockEnd := s.BlockOffsets[i+1]
+ if blockEnd <= wantPos {
+ // current block comes before current file span
+ // (shouldn't happen, FirstBlock() should start us
+ // on the right block)
break
}
- if len(blockLens) <= i {
- blockLens = blockLens[:i+1]
- b, err := ParseBlockLocator(loc)
- if err != nil {
- // Unparseable locator -> unusable
- // stream.
- ch <- nil
- return
- }
- blockLens[i] = b.Size
- }
- blockLen := uint64(blockLens[i])
- if blockPos+blockLen <= wantPos {
- blockPos += blockLen
- continue
+ if blockPos >= wantPos+wantLen {
+ // current block comes after current file span
+ break
}
+
fseg := FileSegment{
- Locator: loc,
+ Locator: s.Blocks[i],
Offset: 0,
- Len: blockLens[i],
+ Len: int(blockEnd - blockPos),
}
if blockPos < wantPos {
fseg.Offset = int(wantPos - blockPos)
fseg.Len -= fseg.Offset
}
- if blockPos+blockLen > wantPos+wantLen {
+ if blockEnd > wantPos+wantLen {
fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
}
ch <- &fseg
- blockPos += blockLen
+ i += 1
}
}
}
@@ -212,6 +248,19 @@ func parseManifestStream(s string) (m ManifestStream) {
return
}
+ m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+ var streamoffset uint64
+ for i, b := range m.Blocks {
+ bl, err := ParseBlockLocator(b)
+ if err != nil {
+ m.Err = err
+ return
+ }
+ m.BlockOffsets[i] = streamoffset
+ streamoffset += uint64(bl.Size)
+ }
+ m.BlockOffsets[len(m.Blocks)] = streamoffset
+
if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
return
@@ -229,6 +278,183 @@ func parseManifestStream(s string) (m ManifestStream) {
return
}
+func SplitPath(path string) (streamname, filename string) {
+ pathIdx := strings.LastIndex(path, "/")
+ if pathIdx >= 0 {
+ streamname = path[0:pathIdx]
+ filename = path[pathIdx+1:]
+ } else {
+ streamname = path
+ filename = ""
+ }
+ return
+}
+
+func (m *Manifest) SegmentManifest() *SegmentedManifest {
+ files := make(SegmentedManifest)
+
+ for stream := range m.StreamIter() {
+ currentStreamfiles := make(map[string]bool)
+ for _, f := range stream.FileStreamSegments {
+ sn := stream.StreamName
+ if sn != "." && !strings.HasPrefix(sn, "./") {
+ sn = "./" + sn
+ }
+ if strings.HasSuffix(sn, "/") {
+ sn = sn[0 : len(sn)-1]
+ }
+ path := sn + "/" + f.Name
+ streamname, filename := SplitPath(path)
+ if files[streamname] == nil {
+ files[streamname] = make(SegmentedStream)
+ }
+ if !currentStreamfiles[path] {
+ segs := files[streamname][filename]
+ for seg := range stream.FileSegmentIterByName(path) {
+ segs = append(segs, *seg)
+ }
+ files[streamname][filename] = segs
+ currentStreamfiles[path] = true
+ }
+ }
+ }
+
+ return &files
+}
+
+func (stream *SegmentedStream) NormalizeStream(name string) string {
+ var sortedfiles []string
+ for k, _ := range *stream {
+ sortedfiles = append(sortedfiles, k)
+ }
+ sort.Strings(sortedfiles)
+
+ stream_tokens := []string{EscapeName(name)}
+
+ blocks := make(map[string]int64)
+ var streamoffset int64
+
+ // Go through each file and add each referenced block exactly once.
+ for _, streamfile := range sortedfiles {
+ for _, segment := range (*stream)[streamfile] {
+ if _, ok := blocks[segment.Locator]; !ok {
+ stream_tokens = append(stream_tokens, segment.Locator)
+ blocks[segment.Locator] = streamoffset
+ b, _ := ParseBlockLocator(segment.Locator)
+ streamoffset += int64(b.Size)
+ }
+ }
+ }
+
+ if len(stream_tokens) == 1 {
+ stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
+ }
+
+ for _, streamfile := range sortedfiles {
+ // Add in file segments
+ span_start := int64(-1)
+ span_end := int64(0)
+ fout := EscapeName(streamfile)
+ for _, segment := range (*stream)[streamfile] {
+ // Collapse adjacent segments
+ streamoffset = blocks[segment.Locator] + int64(segment.Offset)
+ if span_start == -1 {
+ span_start = streamoffset
+ span_end = streamoffset + int64(segment.Len)
+ } else {
+ if streamoffset == span_end {
+ span_end += int64(segment.Len)
+ } else {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+ span_start = streamoffset
+ span_end = streamoffset + int64(segment.Len)
+ }
+ }
+ }
+
+ if span_start != -1 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+ }
+
+ if len((*stream)[streamfile]) == 0 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
+ }
+ }
+
+ return strings.Join(stream_tokens, " ") + "\n"
+}
+
+func (m *Manifest) NormalizeManifest() string {
+ segments := m.SegmentManifest()
+
+ var sortedstreams []string
+ for k, _ := range *segments {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
+ var manifest string
+ for _, k := range sortedstreams {
+ stream := (*segments)[k]
+ manifest += stream.NormalizeStream(k)
+ }
+ return manifest
+}
+
+func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
+ if path == "" {
+ path = "."
+ }
+ if relocate == "" {
+ relocate = "."
+ }
+
+ streamname, filename := SplitPath(path)
+ var relocate_stream, relocate_filename string
+ relocate_stream, relocate_filename = SplitPath(relocate)
+
+ if stream, ok := (*m)[path]; ok {
+ // refers to a single stream
+ return stream.NormalizeStream(relocate)
+ } else if stream, ok := (*m)[streamname]; ok {
+ // refers to a single file in a stream
+ newstream := make(SegmentedStream)
+ if relocate_filename == "" {
+ relocate_filename = filename
+ }
+ newstream[relocate_filename] = stream[filename]
+ return newstream.NormalizeStream(relocate_stream)
+ } else {
+ // refers to multiple streams
+ manifest := ""
+ prefix := path
+ if !strings.HasSuffix(prefix, "/") {
+ prefix += "/"
+ }
+ if !strings.HasSuffix(relocate, "/") {
+ relocate += "/"
+ }
+
+ var sortedstreams []string
+ for k, _ := range *m {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
+ for _, k := range sortedstreams {
+ if strings.HasPrefix(k, prefix) {
+ v := (*m)[k]
+ manifest += v.NormalizeStream(relocate + k[len(prefix):])
+ }
+ }
+ return manifest
+ }
+}
+
+func (m *Manifest) ManifestForPath(path, relocate string) string {
+ return m.SegmentManifest().ManifestForPath(path, relocate)
+}
+
func (m *Manifest) StreamIter() <-chan ManifestStream {
ch := make(chan ManifestStream)
go func(input string) {
@@ -252,10 +478,13 @@ func (m *Manifest) StreamIter() <-chan ManifestStream {
}
func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
- ch := make(chan *FileSegment)
+ ch := make(chan *FileSegment, 64)
+ if !strings.HasPrefix(filepath, "./") {
+ filepath = "./" + filepath
+ }
go func() {
for stream := range m.StreamIter() {
- if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
+ if !strings.HasPrefix(filepath, stream.StreamName+"/") {
continue
}
stream.sendFileSegmentIterByName(filepath, ch)
diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go
index 2fe4272..f9dda3e 100644
--- a/sdk/go/manifest/manifest_test.go
+++ b/sdk/go/manifest/manifest_test.go
@@ -251,3 +251,74 @@ func TestBlockIterWithBadManifest(t *testing.T) {
}
}
}
+
+func TestNormalizeManifest(t *testing.T) {
+ m1 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m1.NormalizeManifest(),
+ `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt
+`)
+
+ m2 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 b9677abbac956bd3e86b1deb28dfac03+67108864 fc15aff2a762b13f521baf042140acec+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:227212247:var-GS000016015-ASM.tsv.bz2
+`}
+ expectEqual(t, m2.NormalizeManifest(), m2.Text)
+
+ m3 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 3:40:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m3.NormalizeManifest(), `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 3:124:md5sum.txt
+`)
+
+ m4 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar
+`}
+
+ expectEqual(t, m4.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ expectEqual(t, m4.ManifestForPath("./foo", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo", "./baz"), "./baz 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./baz"), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:baz 67108864:3:baz\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./quux/"), "./quux 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath(".", "."), `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+ expectEqual(t, m4.ManifestForPath(".", "./zip"), `./zip/foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zip/zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ m5 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 204e43b8a1185621ca55a94839582e6f+67108864 3:3:bar
+`}
+ expectEqual(t, m5.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 0:6:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ m8 := Manifest{Text: `./a\040b\040c 59ca0efa9f5633cb0371bbc0355478d8+13 0:13:hello\040world.txt
+`}
+ expectEqual(t, m8.NormalizeManifest(), m8.Text)
+
+ m9 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m9.ManifestForPath("", ""), ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:20:two\n")
+
+ m10 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m10.ManifestForPath("./two", "./three"), ". acbd18db4cc2f85cedef654fccc4a4d8+40 20:20:three\n")
+
+ m11 := Manifest{Text: arvadostest.PathologicalManifest}
+ expectEqual(t, m11.NormalizeManifest(), `. acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3+Z+K at xyzzy d41d8cd98f00b204e9800998ecf8427e+0 0:1:f 1:4:ooba 5:1:r 5:4:rbaz 9:0:zero at 0 9:0:zero at 1 9:0:zero at 4 9:0:zero at 9
+./foo acbd18db4cc2f85cedef654fccc4a4d8+3 d41d8cd98f00b204e9800998ecf8427e+0 0:3:foo 0:3:foo 3:0:zero
+./foo\040bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:baz 0:3:baz\040waz
+./overlapReverse acbd18db4cc2f85cedef654fccc4a4d8+3 2:1:o 2:1:ofoo 0:3:ofoo 1:2:oo
+./segmented acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 d41d8cd98f00b204e9800998ecf8427e+0 0:1:frob 5:1:frob 1:1:frob 6:0:frob 3:1:frob 1:2:oof 0:1:oof
+`)
+
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list