[ARVADOS] updated: 1.1.0-174-g36d6196

Git user git at public.curoverse.com
Tue Nov 28 10:05:55 EST 2017


Summary of changes:
 sdk/go/arvados/collection_fs.go            |   6 +-
 sdk/go/arvados/collection_fs_test.go       |  62 ++++++-------
 sdk/go/keepclient/block_cache.go           |  11 ++-
 sdk/go/keepclient/collectionreader.go      | 144 +----------------------------
 sdk/go/keepclient/collectionreader_test.go |   9 +-
 5 files changed, 54 insertions(+), 178 deletions(-)

       via  36d61969535234a6ef7c2ce271c9fe18af2f7cee (commit)
       via  eb99f0ea588defa3a9e23e76eeac66d277013566 (commit)
      from  95ec747218f048e5bbfb986ff4eaeba2d3d2f80b (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 36d61969535234a6ef7c2ce271c9fe18af2f7cee
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Nov 28 09:20:51 2017 -0500

    12483: Use io.Seek* constants instead of deprecated os.SEEK_*.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 1aafe81..56717fd 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -568,11 +568,11 @@ func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
 	size := f.inode.Size()
 	ptr := f.ptr
 	switch whence {
-	case os.SEEK_SET:
+	case io.SeekStart:
 		ptr.off = off
-	case os.SEEK_CUR:
+	case io.SeekCurrent:
 		ptr.off += off
-	case os.SEEK_END:
+	case io.SeekEnd:
 		ptr.off = size + off
 	}
 	if ptr.off < 0 {
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index 5a7da9f..fac0b61 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -222,7 +222,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(err, check.Equals, io.EOF)
 	c.Check(string(buf[:3]), check.DeepEquals, "foo")
 
-	pos, err := f.Seek(-2, os.SEEK_CUR)
+	pos, err := f.Seek(-2, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(1))
 	c.Check(err, check.IsNil)
 
@@ -231,11 +231,11 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(n, check.Equals, 1)
 	c.Check(err, check.IsNil)
 
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(2))
 	c.Check(err, check.IsNil)
 
-	pos, err = f.Seek(0, os.SEEK_SET)
+	pos, err = f.Seek(0, io.SeekStart)
 	c.Check(pos, check.Equals, int64(0))
 	c.Check(err, check.IsNil)
 
@@ -245,13 +245,13 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(string(rbuf), check.Equals, "f*o")
 
 	// Write multiple blocks in one call
-	f.Seek(1, os.SEEK_SET)
+	f.Seek(1, io.SeekStart)
 	n, err = f.Write([]byte("0123456789abcdefg"))
 	c.Check(n, check.Equals, 17)
 	c.Check(err, check.IsNil)
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(18))
-	pos, err = f.Seek(-18, os.SEEK_CUR)
+	pos, err = f.Seek(-18, io.SeekCurrent)
 	c.Check(err, check.IsNil)
 	n, err = io.ReadFull(f, buf)
 	c.Check(n, check.Equals, 18)
@@ -264,32 +264,32 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
 	// truncate to current size
 	err = f.Truncate(18)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
 
 	// shrink to zero some data
 	f.Truncate(15)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "f0123456789abcd")
 
 	// grow to partial block/extent
 	f.Truncate(20)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
 
 	f.Truncate(0)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	f2.Write([]byte("12345678abcdefghijkl"))
 
 	// grow to block/extent boundary
 	f.Truncate(64)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(len(buf2), check.Equals, 64)
@@ -297,7 +297,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
 	// shrink to block/extent boundary
 	err = f.Truncate(32)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(len(buf2), check.Equals, 32)
@@ -305,7 +305,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
 	// shrink to partial block/extent
 	err = f.Truncate(15)
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "12345678abcdefg")
@@ -322,7 +322,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "")
-	f2.Seek(0, os.SEEK_SET)
+	f2.Seek(0, io.SeekStart)
 	buf2, err = ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "123")
@@ -350,21 +350,21 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
 		defer f.Close()
 		fi, err = f.Stat()
 		c.Check(fi.Size(), check.Equals, size)
-		pos, err := f.Seek(0, os.SEEK_END)
+		pos, err := f.Seek(0, io.SeekEnd)
 		c.Check(pos, check.Equals, size)
 	}
 
-	f.Seek(2, os.SEEK_END)
+	f.Seek(2, io.SeekEnd)
 	checkSize(0)
 	f.Write([]byte{1})
 	checkSize(3)
 
-	f.Seek(2, os.SEEK_CUR)
+	f.Seek(2, io.SeekCurrent)
 	checkSize(3)
 	f.Write([]byte{})
 	checkSize(5)
 
-	f.Seek(8, os.SEEK_SET)
+	f.Seek(8, io.SeekStart)
 	checkSize(5)
 	n, err := f.Read(make([]byte, 1))
 	c.Check(n, check.Equals, 0)
@@ -466,7 +466,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
 				case 0:
 					f.Truncate(int64(rand.Intn(64)))
 				case 1:
-					f.Seek(int64(rand.Intn(64)), os.SEEK_SET)
+					f.Seek(int64(rand.Intn(64)), io.SeekStart)
 				case 2:
 					_, err := f.Write([]byte("beep boop"))
 					c.Check(err, check.IsNil)
@@ -521,13 +521,13 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
 				}
 				copy(expect[woff:], wbytes)
 				f.Truncate(int64(trunc))
-				pos, err := f.Seek(int64(woff), os.SEEK_SET)
+				pos, err := f.Seek(int64(woff), io.SeekStart)
 				c.Check(pos, check.Equals, int64(woff))
 				c.Check(err, check.IsNil)
 				n, err := f.Write(wbytes)
 				c.Check(n, check.Equals, len(wbytes))
 				c.Check(err, check.IsNil)
-				pos, err = f.Seek(0, os.SEEK_SET)
+				pos, err = f.Seek(0, io.SeekStart)
 				c.Check(pos, check.Equals, int64(0))
 				c.Check(err, check.IsNil)
 				buf, err := ioutil.ReadAll(f)
@@ -815,7 +815,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 	f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
 	c.Assert(err, check.IsNil)
 	defer f.Close()
-	pos, err := f.Seek(0, os.SEEK_END)
+	pos, err := f.Seek(0, io.SeekEnd)
 	c.Check(pos, check.Equals, int64(0))
 	c.Check(err, check.IsNil)
 	fi, err = f.Stat()
@@ -827,16 +827,16 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 	f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
 	c.Assert(err, check.IsNil)
 	f.Write([]byte{1, 2, 3})
-	f.Seek(0, os.SEEK_SET)
+	f.Seek(0, io.SeekStart)
 	n, _ = f.Read(buf[:1])
 	c.Check(n, check.Equals, 1)
 	c.Check(buf[:1], check.DeepEquals, []byte{1})
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(1))
 	f.Write([]byte{4, 5, 6})
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(6))
-	f.Seek(0, os.SEEK_SET)
+	f.Seek(0, io.SeekStart)
 	n, err = f.Read(buf)
 	c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
 	c.Check(err, check.Equals, io.EOF)
@@ -844,14 +844,14 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 
 	f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
 	c.Assert(err, check.IsNil)
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(0))
 	c.Check(err, check.IsNil)
 	f.Read(buf[:3])
-	pos, _ = f.Seek(0, os.SEEK_CUR)
+	pos, _ = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(3))
 	f.Write([]byte{7, 8, 9})
-	pos, err = f.Seek(0, os.SEEK_CUR)
+	pos, err = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(9))
 	f.Close()
 
@@ -860,9 +860,9 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 	n, err = f.Write([]byte{3, 2, 1})
 	c.Check(n, check.Equals, 3)
 	c.Check(err, check.IsNil)
-	pos, _ = f.Seek(0, os.SEEK_CUR)
+	pos, _ = f.Seek(0, io.SeekCurrent)
 	c.Check(pos, check.Equals, int64(3))
-	pos, _ = f.Seek(0, os.SEEK_SET)
+	pos, _ = f.Seek(0, io.SeekStart)
 	c.Check(pos, check.Equals, int64(0))
 	n, err = f.Read(buf)
 	c.Check(n, check.Equals, 0)

commit eb99f0ea588defa3a9e23e76eeac66d277013566
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Nov 27 18:23:55 2017 -0500

    12483: Remove dead code, update tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 539975e..bac4a24 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -7,6 +7,8 @@ package keepclient
 import (
 	"io"
 	"sort"
+	"strconv"
+	"strings"
 	"sync"
 	"time"
 )
@@ -66,6 +68,13 @@ func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (
 // necessary.
 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
 	cacheKey := locator[:32]
+	bufsize := BLOCKSIZE
+	if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+		datasize, err := strconv.ParseInt(parts[1], 10, 32)
+		if err == nil && datasize >= 0 {
+			bufsize = int(datasize)
+		}
+	}
 	c.mtx.Lock()
 	if c.cache == nil {
 		c.cache = make(map[string]*cacheBlock)
@@ -81,7 +90,7 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
 			rdr, size, _, err := kc.Get(locator)
 			var data []byte
 			if err == nil {
-				data = make([]byte, size, BLOCKSIZE)
+				data = make([]byte, size, bufsize)
 				_, err = io.ReadFull(rdr, data)
 				err2 := rdr.Close()
 				if err == nil {
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index c071d3b..fa309f6 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -6,25 +6,12 @@ package keepclient
 
 import (
 	"errors"
-	"fmt"
-	"io"
 	"os"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
-const (
-	// After reading a data block from Keep, cfReader slices it up
-	// and sends the slices to a buffered channel to be consumed
-	// by the caller via Read().
-	//
-	// dataSliceSize is the maximum size of the slices, and
-	// therefore the maximum number of bytes that will be returned
-	// by a single call to Read().
-	dataSliceSize = 1 << 20
-)
-
 // ErrNoManifest indicates the given collection has no manifest
 // information (e.g., manifest_text was excluded by a "select"
 // parameter when retrieving the collection record).
@@ -38,8 +25,11 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
 	if !ok {
 		return nil, ErrNoManifest
 	}
-	m := manifest.Manifest{Text: mText}
-	return kc.ManifestFileReader(m, filename)
+	fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
+	if err != nil {
+		return nil, err
+	}
+	return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
 
 func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
@@ -49,127 +39,3 @@ func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (
 	}
 	return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
-
-type file struct {
-	kc       *KeepClient
-	segments []*manifest.FileSegment
-	size     int64 // total file size
-	offset   int64 // current read offset
-
-	// current/latest segment accessed -- might or might not match pos
-	seg           *manifest.FileSegment
-	segStart      int64 // position of segment relative to file
-	segData       []byte
-	segNext       []*manifest.FileSegment
-	readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
-	f.kc = nil
-	f.segments = nil
-	f.segData = nil
-	return nil
-}
-
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
-	if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
-		// f.seg does not cover the current read offset
-		// (f.pos).  Iterate over f.segments to find the one
-		// that does.
-		f.seg = nil
-		f.segStart = 0
-		f.segData = nil
-		f.segNext = f.segments
-		for len(f.segNext) > 0 {
-			seg := f.segNext[0]
-			f.segNext = f.segNext[1:]
-			segEnd := f.segStart + int64(seg.Len)
-			if segEnd > f.offset {
-				f.seg = seg
-				break
-			}
-			f.segStart = segEnd
-		}
-		f.readaheadDone = false
-	}
-	if f.seg == nil {
-		return 0, io.EOF
-	}
-	if f.segData == nil {
-		data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
-		if err != nil {
-			return 0, err
-		}
-		if len(data) < f.seg.Offset+f.seg.Len {
-			return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
-		}
-		f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
-	}
-	// dataOff and dataLen denote a portion of f.segData
-	// corresponding to a portion of the file at f.offset.
-	dataOff := int(f.offset - f.segStart)
-	dataLen := f.seg.Len - dataOff
-
-	if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
-		// If we have already read more than just the first
-		// few bytes of this file, and we have already
-		// consumed a noticeable portion of this segment, and
-		// there's more data for this file in the next segment
-		// ... then there's a good chance we are going to need
-		// the data for that next segment soon. Start getting
-		// it into the cache now.
-		go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
-		f.readaheadDone = true
-	}
-
-	n := len(buf)
-	if n > dataLen {
-		n = dataLen
-	}
-	copy(buf[:n], f.segData[dataOff:dataOff+n])
-	f.offset += int64(n)
-	return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
-	var want int64
-	switch whence {
-	case io.SeekStart:
-		want = offset
-	case io.SeekCurrent:
-		want = f.offset + offset
-	case io.SeekEnd:
-		want = f.size + offset
-	default:
-		return f.offset, fmt.Errorf("invalid whence %d", whence)
-	}
-	if want < 0 {
-		return f.offset, fmt.Errorf("attempted seek to %d", want)
-	}
-	if want > f.size {
-		want = f.size
-	}
-	f.offset = want
-	return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
-	return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
-	f.segments = nil
-	f.size = 0
-	for seg := range m.FileSegmentIterByName(path) {
-		f.segments = append(f.segments, seg)
-		f.size += int64(seg.Len)
-	}
-	if f.segments == nil {
-		return os.ErrNotExist
-	}
-	return nil
-}
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index df8bcb3..5d1e2a1 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -166,11 +166,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
 
 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 	h := md5.New()
-	var testdata []byte
 	buf := make([]byte, 4096)
 	locs := make([]string, len(buf))
+	testdata := make([]byte, 0, len(buf)*len(buf))
 	filesize := 0
-	for i := 0; i < len(locs); i++ {
+	for i := range locs {
 		_, err := rand.Read(buf[:i])
 		h.Write(buf[:i])
 		locs[i], _, err = s.kc.PutB(buf[:i])
@@ -219,11 +219,12 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
 	c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
 	c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
 
+	expectPos := curPos + size + 12345
 	curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
 	c.Check(err, check.IsNil)
-	c.Check(curPos, check.Equals, size)
+	c.Check(curPos, check.Equals, expectPos)
 
-	curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+	curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
 	c.Check(err, check.IsNil)
 	c.Check(curPos, check.Equals, int64(8))
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list