[ARVADOS] created: 1b16fe46cccf4af94349ddad730b5c7f3bc03718

Git user git at public.curoverse.com
Fri Jan 27 14:32:30 EST 2017


        at  1b16fe46cccf4af94349ddad730b5c7f3bc03718 (commit)


commit 1b16fe46cccf4af94349ddad730b5c7f3bc03718
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jan 27 14:32:19 2017 -0500

    10990: Use AsyncStream to minimize store-and-forward latency

diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 7d03b68..3c1974e 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -1,7 +1,7 @@
 package keepclient
 
 import (
-	"io/ioutil"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"sort"
 	"sync"
 	"time"
@@ -48,7 +48,7 @@ func (c *BlockCache) Sweep() {
 
 // Get returns data from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
+func (c *BlockCache) Get(kc *KeepClient, locator string) (*streamer.StreamReader, error) {
 	c.setupOnce.Do(c.setup)
 	cacheKey := locator[:32]
 	c.mtx.Lock()
@@ -60,13 +60,12 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
 		}
 		c.cache[cacheKey] = b
 		go func() {
-			rdr, _, _, err := kc.Get(locator)
-			var data []byte
+			rdr, bufsize, _, err := kc.Get(locator)
+			c.mtx.Lock()
 			if err == nil {
-				data, err = ioutil.ReadAll(rdr)
+				b.data = streamer.AsyncStreamFromReader(int(bufsize), rdr)
 			}
-			c.mtx.Lock()
-			b.data, b.err = data, err
+			b.err = err
 			c.mtx.Unlock()
 			close(b.fetched)
 			go c.Sweep()
@@ -81,7 +80,8 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
 	c.mtx.Lock()
 	b.lastUse = time.Now()
 	c.mtx.Unlock()
-	return b.data, b.err
+
+	return b.data.MakeStreamReader(), b.err
 }
 
 func (c *BlockCache) setup() {
@@ -97,7 +97,7 @@ func (ts timeSlice) Less(i, j int) bool { return ts[i].Before(ts[j]) }
 func (ts timeSlice) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
 
 type cacheBlock struct {
-	data    []byte
+	data    *streamer.AsyncStream
 	err     error
 	fetched chan struct{}
 	lastUse time.Time
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index 61fabef..a87041c 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -3,10 +3,10 @@ package keepclient
 import (
 	"errors"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/manifest"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"io"
 	"os"
-
-	"git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // A Reader implements, io.Reader, io.Seeker, and io.Closer, and has a
@@ -67,7 +67,7 @@ type file struct {
 	// current/latest segment accessed -- might or might not match pos
 	seg           *manifest.FileSegment
 	segStart      int64 // position of segment relative to file
-	segData       []byte
+	segData       *streamer.StreamReader
 	segNext       []*manifest.FileSegment
 	readaheadDone bool
 }
@@ -76,6 +76,9 @@ type file struct {
 func (f *file) Close() error {
 	f.kc = nil
 	f.segments = nil
+	if f.segData != nil {
+		f.segData.Close()
+	}
 	f.segData = nil
 	return nil
 }
@@ -88,6 +91,9 @@ func (f *file) Read(buf []byte) (int, error) {
 		// that does.
 		f.seg = nil
 		f.segStart = 0
+		if f.segData != nil {
+			f.segData.Close()
+		}
 		f.segData = nil
 		f.segNext = f.segments
 		for len(f.segNext) > 0 {
@@ -110,17 +116,17 @@ func (f *file) Read(buf []byte) (int, error) {
 		if err != nil {
 			return 0, err
 		}
-		if len(data) < f.seg.Offset+f.seg.Len {
-			return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
+		if int(data.Len()) < f.seg.Offset+f.seg.Len {
+			return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, data.Len(), f.seg.Locator)
 		}
-		f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
+		f.segData = data
 	}
 	// dataOff and dataLen denote a portion of f.segData
 	// corresponding to a portion of the file at f.offset.
 	dataOff := int(f.offset - f.segStart)
-	dataLen := f.seg.Len - dataOff
+	dataLen := f.seg.Len - (f.seg.Offset + dataOff)
 
-	if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
+	if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && uint64(dataOff+dataLen) > f.segData.Len()/16 {
 		// If we have already read more than just the first
 		// few bytes of this file, and we have already
 		// consumed a noticeable portion of this segment, and
@@ -136,9 +142,10 @@ func (f *file) Read(buf []byte) (int, error) {
 	if n > dataLen {
 		n = dataLen
 	}
-	copy(buf[:n], f.segData[dataOff:dataOff+n])
-	f.offset += int64(n)
-	return n, nil
+	f.segData.Seek(int64(f.seg.Offset+dataOff), io.SeekStart)
+	count, err := f.segData.Read(buf[:n])
+	f.offset += int64(count)
+	return count, err
 }
 
 // Seek implements io.Seeker.
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
index a46ca4c..c8d012f 100644
--- a/sdk/go/streamer/streamer.go
+++ b/sdk/go/streamer/streamer.go
@@ -37,6 +37,7 @@ package streamer
 
 import (
 	"errors"
+	"fmt"
 	"io"
 )
 
@@ -152,3 +153,29 @@ func (this *AsyncStream) Close() error {
 	close(this.wait_zero_readers)
 	return nil
 }
+
+func (this *StreamReader) Seek(offset int64, whence int) (int64, error) {
+	var want int64
+	switch whence {
+	case io.SeekStart:
+		want = offset
+	case io.SeekCurrent:
+		want = int64(this.offset) + offset
+	case io.SeekEnd:
+		want = int64(this.Len()) + offset
+	default:
+		return int64(this.offset), fmt.Errorf("invalid whence %d", whence)
+	}
+	if want < 0 {
+		return int64(this.offset), fmt.Errorf("attempted seek to %d", want)
+	}
+	if want > int64(this.Len()) {
+		want = int64(this.Len())
+	}
+	this.offset = int(want)
+	return want, nil
+}
+
+func (this *StreamReader) Len() uint64 {
+	return uint64(len(this.stream.buffer))
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list