[ARVADOS] created: 1b16fe46cccf4af94349ddad730b5c7f3bc03718
Git user
git at public.curoverse.com
Fri Jan 27 14:32:30 EST 2017
at 1b16fe46cccf4af94349ddad730b5c7f3bc03718 (commit)
commit 1b16fe46cccf4af94349ddad730b5c7f3bc03718
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Jan 27 14:32:19 2017 -0500
10990: Use AsyncStream to minimize store-and-forward latency
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 7d03b68..3c1974e 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -1,7 +1,7 @@
package keepclient
import (
- "io/ioutil"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"sort"
"sync"
"time"
@@ -48,7 +48,7 @@ func (c *BlockCache) Sweep() {
// Get returns data from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
c.setupOnce.Do(c.setup)
cacheKey := locator[:32]
c.mtx.Lock()
@@ -60,13 +60,12 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
}
c.cache[cacheKey] = b
go func() {
- rdr, _, _, err := kc.Get(locator)
- var data []byte
+ rdr, bufsize, _, err := kc.Get(locator)
+ c.mtx.Lock()
if err == nil {
- data, err = ioutil.ReadAll(rdr)
+ b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
}
- c.mtx.Lock()
- b.data, b.err = data, err
+ b.err = err
c.mtx.Unlock()
close(b.fetched)
go c.Sweep()
@@ -81,7 +80,8 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
c.mtx.Lock()
b.lastUse = time.Now()
c.mtx.Unlock()
- return b.data, b.err
+
+ return b.data.MakeStreamReader(), b.err
}
func (c *BlockCache) setup() {
@@ -97,7 +97,7 @@ func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
type cacheBlock struct {
- data []byte
+ data *streamer.AsyncStream
err error
fetched chan struct{}
lastUse time.Time
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index 61fabef..a87041c 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -3,10 +3,10 @@ package keepclient
import (
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"os"
-
- "git.curoverse.com/arvados.git/sdk/go/manifest"
)
// A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
@@ -67,7 +67,7 @@ type file struct {
// current/latest segment accessed -- might or might not match pos
seg *manifest.FileSegment
segStart int64 // position of segment relative to file
- segData []byte
+ segData *streamer.StreamReader
segNext []*manifest.FileSegment
readaheadDone bool
}
@@ -76,6 +76,9 @@ type file struct {
func (f *file) Close() error {
f.kc = nil
f.segments = nil
+ if f.segData != nil {
+ f.segData.Close()
+ }
f.segData = nil
return nil
}
@@ -88,6 +91,9 @@ func (f *file) Read(buf []byte) (int, error) {
// that does.
f.seg = nil
f.segStart = 0
+ if f.segData != nil {
+ f.segData.Close()
+ }
f.segData = nil
f.segNext = f.segments
for len(f.segNext) > 0 {
@@ -110,17 +116,17 @@ func (f *file) Read(buf []byte) (int, error) {
if err != nil {
return 0, err
}
- if len(data) < f.seg.Offset+f.seg.Len {
- return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
+ if int(data.Len()) < f.seg.Offset+f.seg.Len {
+ return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, data.Len(), f.seg.Locator)
}
- f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
+ f.segData = data
}
// dataOff and dataLen denote a portion of f.segData
// corresponding to a portion of the file at f.offset.
dataOff := int(f.offset - f.segStart)
- dataLen := f.seg.Len - dataOff
+ dataLen := f.seg.Len - (f.seg.Offset + dataOff)
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+ if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && uint64(dataOff+dataLen) > f.segData.Len()/16 {
// If we have already read more than just the first
// few bytes of this file, and we have already
// consumed a noticeable portion of this segment, and
@@ -136,9 +142,10 @@ func (f *file) Read(buf []byte) (int, error) {
if n > dataLen {
n = dataLen
}
- copy(buf[:n], f.segData[dataOff:dataOff+n])
- f.offset += int64(n)
- return n, nil
+ f.segData.Seek(int64(f.seg.Offset+dataOff), io.SeekStart)
+ count, err := f.segData.Read(buf[:n])
+ f.offset += int64(count)
+ return count, err
}
// Seek implements io.Seeker.
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
index a46ca4c..c8d012f 100644
--- a/sdk/go/streamer/streamer.go
+++ b/sdk/go/streamer/streamer.go
@@ -37,6 +37,7 @@ package streamer
import (
"errors"
+ "fmt"
"io"
)
@@ -152,3 +153,29 @@ func (this *AsyncStream) Close() error {
close(this.wait_zero_readers)
return nil
}
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+ var want int64
+ switch whence {
+ case io.SeekStart:
+ want = offset
+ case io.SeekCurrent:
+ want = int64(this.offset) + offset
+ case io.SeekEnd:
+ want = int64(this.Len()) + offset
+ default:
+ return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+ }
+ if want < 0 {
+ return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+ }
+ if want > int64(this.Len()) {
+ want = int64(this.Len())
+ }
+ this.offset = int(want)
+ return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+ return uint64(len(this.stream.buffer))
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list