[ARVADOS] created: 6045f3a69038dbc2f14d9141d450341c9611ec14
Git user
git at public.curoverse.com
Mon Feb 6 17:18:15 EST 2017
at 6045f3a69038dbc2f14d9141d450341c9611ec14 (commit)
commit 6045f3a69038dbc2f14d9141d450341c9611ec14
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Feb 6 17:16:01 2017 -0500
9397: Add manifest normalization and sub-manifest extraction by path.
diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go
index a9745ae..f2fb9b5 100644
--- a/sdk/go/manifest/manifest.go
+++ b/sdk/go/manifest/manifest.go
@@ -60,6 +60,15 @@ type ManifestStream struct {
Err error
}
+// Array of segments referencing file content
+type SegmentedFile []FileSegment
+
+// Map of files to list of file segments referencing file content
+type SegmentedStream map[string]SegmentedFile
+
+// Map of streams
+type SegmentedManifest map[string]SegmentedStream
+
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
func unescapeSeq(seq string) string {
@@ -74,6 +83,10 @@ func unescapeSeq(seq string) string {
return string([]byte{byte(i)})
}
+func EscapeName(s string) string {
+ return s.ReplaceAll(" ", "\\040")
+}
+
func UnescapeName(s string) string {
return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
}
@@ -230,101 +243,158 @@ func parseManifestStream(s string) (m ManifestStream) {
return
}
-func (m *Manifest) NormalizeManifest() map[string]ManifestStream {
- streams := make(map[string]ManifestStream)
+func SplitPath(path string) (streamname, filename string) {
+ pathIdx := strings.LastIndex(path, "/")
+ if pathIdx >= 0 {
+ streamname = path[0:pathIdx]
+ filename = path[pathIdx+1:]
+ } else {
+ streamname = path
+ filename = ""
+ }
+}
+
+func (m *Manifest) SegmentManifest() *SegmentedManifest {
+ files := make(SegmentedManifest)
for stream := range m.StreamIter() {
- ms := streams[stream.StreamName]
-
- if ms.StreamName == "" { // new stream
- streams[stream.StreamName] = stream
- } else {
- ms.Blocks = append(ms.Blocks, stream.Blocks...)
- ms.FileStreamSegments = append(ms.FileStreamSegments, stream.FileStreamSegments...)
+ for _, f := range stream.FileStreamSegments {
+ sn := stream.StreamName
+ if sn != "." && !sn.StartsWith("./") {
+ sn = "./" + sn
+ }
+ if sn.EndsWith("/") {
+ sn = sn[0 : len(sn)-1]
+ }
+ path := sn + "/" + f.Name
+ streamname, filename := SplitPath(path)
+ if files[streamname] == nil {
+ files[streamname] = make(SegmentedStream)
+ }
+ if files[streamname][filename] == nil {
+ var segs []FileSegment
+ for seg := range FileSegmentIterByName(name) {
+ segs = append(segs, seg)
+ }
+ files[streamname][filename] = segs
+ }
}
}
- return streams
+ return files
}
-func (m *Manifest) NormalizedManifestForPath(path string) string {
- normalized := m.NormalizeManifest()
-
- var streams []string
- for _, stream := range normalized {
- streams = append(streams, stream.StreamName)
+func (stream *SegmentedStream) NormalizeStream(name string) string {
+ var sortedfiles []string
+ for k, _ := range stream {
+ sortedfiles.append(sortedfiles, k)
}
- sort.Strings(streams)
+ strings.Sort(sortedfiles)
- path = strings.Trim(path, "/")
- var subdir, filename string
+ stream_tokens := []string{EscapeName(name)}
- if path != "" {
- if strings.Index(path, "/") == -1 {
- isStream := false
- for _, v := range streams {
- if v == "./"+path {
- isStream = true
- }
- }
- if isStream {
- subdir = path
- } else {
- filename = path
- }
- } else {
- pathIdx := strings.LastIndex(path, "/")
- if pathIdx >= 0 {
- subdir = path[0:pathIdx]
- filename = path[pathIdx+1:]
+ blocks := make(map[string]int64)
+ var streamoffset int64
+
+ // Go through each file and add each referenced block exactly once.
+ for _, streamfile := range sortedfiles {
+ for _, segment := range stream[streamfile] {
+ if _, ok := blocks[segment.Locator]; !ok {
+ stream_tokens = append(stream_tokens, segment.Locator)
+ blocks[segment.Locator] = streamoffset
+ b, err := ParseBlockLocator(segment.Locator)
+ streamoffset += b.Size
}
}
}
- manifestForPath := ""
+ if len(stream_tokens) == 1 {
+ stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
+ }
- for _, streamName := range streams {
- stream := normalized[streamName]
+ for _, streamfile := range sortedfiles {
+ // Add in file segments
+ span_start := int64(-1)
+ span_end := int64(0)
+ fout = EscapeName(streamfile)
+ for _, segment := range stream[streamfile] {
+ // Collapse adjacent segments
+ streamoffset = blocks[segment.Locator] + segment.Offset
+ if span_start == -1 {
+ span_start = streamoffset
+ span_end = streamoffset + segment.Len
+ } else {
+ if streamoffset == span_end {
+ span_end += segment.Len
+ } else {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%i:%i:%s", span_start, span_end-span_start, fout))
+ span_start = streamoffset
+ span_end = streamoffset + segment.Len
+ }
+ }
+ }
- if subdir != "" && stream.StreamName != "./"+subdir {
- continue
+ if span_start != -1 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%i:%i:%s", span_start, span_end-span_start, fout))
}
- manifestForPath += stream.StreamName + " " + strings.Join(stream.Blocks, " ") + " "
+ if len(stream[streamfile]) == 0 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
+ }
+ }
- currentName := ""
- currentSpan := []uint64{0, 0}
- for _, fss := range stream.FileStreamSegments {
- if filename != "" && fss.Name != filename {
- continue
- }
+ return string.Join(" ", stream_tokens) + "\n"
+}
- if fss.Name != currentName && currentName != "" {
- manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + " "
- }
+func (m *Manifest) NormalizeManifest() string {
+ segments := m.SegmentManifest()
+ var manifest string
+ for k, v := range segments {
+ manifest += v.NormalizeStream(k)
+ }
+ return manifest
+}
- if fss.Name != currentName {
- currentName = fss.Name
- currentSpan = []uint64{0, 0}
- }
+func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
+ if path == "" {
+ path = "."
+ }
- if currentSpan[1] == 0 {
- currentSpan = []uint64{fss.SegPos, fss.SegLen}
- } else {
- if currentSpan[1] == fss.SegPos {
- currentSpan[1] += fss.SegLen
- } else if currentSpan[0]+currentSpan[1] == fss.SegPos {
- currentSpan[1] = fss.SegPos + fss.SegLen
- } else {
- manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]+fss.SegLen) + ":" + fss.Name + " "
- currentSpan = []uint64{fss.SegPos, fss.SegPos + fss.SegLen}
- }
+ streamname, filename := SplitPath(path)
+ var relocate_stream, relocate_filename string
+ if relocate != "" {
+ relocate_stream, relocate_filename = SplitPath(relocate)
+ } else {
+ relocate_stream = streamname
+ relocate_filename = filename
+ }
+
+ if stream, ok := m[path]; ok {
+ // refers to a single stream
+ return stream.NormalizeManifest(relocate)
+ } else if stream, ok := m[stream]; ok {
+ // refers to a single file in a stream
+ newstream := make(SegmentedStream)
+ newstream[relocate_filename] = stream[filename]
+ return newstream.NormalizeManifest(relocate_stream)
+ } else {
+ // refers to multiple streams
+ manifest := ""
+ prefix := streamname
+ if !prefix.EndsWith("/") {
+ prefix += "/"
+ }
+ for k, v := range m {
+ if k.StartsWith(prefix) {
+ manifest += v.NormalizeManifest(relocate_stream + k[len(prefix)-1:])
}
}
- manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + "\n"
+ return manifest
}
+}
- return manifestForPath
+func (m *Manifest) ManifestForPath(path string) string {
+ return m.SegmentedManifest().ManifestForPath(path)
}
func (m *Manifest) StreamIter() <-chan ManifestStream {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list