[ARVADOS] updated: 1.1.0-174-g36d6196
Git user
git at public.curoverse.com
Tue Nov 28 10:05:55 EST 2017
Summary of changes:
sdk/go/arvados/collection_fs.go | 6 +-
sdk/go/arvados/collection_fs_test.go | 62 ++++++-------
sdk/go/keepclient/block_cache.go | 11 ++-
sdk/go/keepclient/collectionreader.go | 144 +----------------------------
sdk/go/keepclient/collectionreader_test.go | 9 +-
5 files changed, 54 insertions(+), 178 deletions(-)
via 36d61969535234a6ef7c2ce271c9fe18af2f7cee (commit)
via eb99f0ea588defa3a9e23e76eeac66d277013566 (commit)
from 95ec747218f048e5bbfb986ff4eaeba2d3d2f80b (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 36d61969535234a6ef7c2ce271c9fe18af2f7cee
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Nov 28 09:20:51 2017 -0500
12483: Use io.Seek* constants instead of deprecated os.SEEK_*.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 1aafe81..56717fd 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -568,11 +568,11 @@ func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
size := f.inode.Size()
ptr := f.ptr
switch whence {
- case os.SEEK_SET:
+ case io.SeekStart:
ptr.off = off
- case os.SEEK_CUR:
+ case io.SeekCurrent:
ptr.off += off
- case os.SEEK_END:
+ case io.SeekEnd:
ptr.off = size + off
}
if ptr.off < 0 {
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index 5a7da9f..fac0b61 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -222,7 +222,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(err, check.Equals, io.EOF)
c.Check(string(buf[:3]), check.DeepEquals, "foo")
- pos, err := f.Seek(-2, os.SEEK_CUR)
+ pos, err := f.Seek(-2, io.SeekCurrent)
c.Check(pos, check.Equals, int64(1))
c.Check(err, check.IsNil)
@@ -231,11 +231,11 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(n, check.Equals, 1)
c.Check(err, check.IsNil)
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(2))
c.Check(err, check.IsNil)
- pos, err = f.Seek(0, os.SEEK_SET)
+ pos, err = f.Seek(0, io.SeekStart)
c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
@@ -245,13 +245,13 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(string(rbuf), check.Equals, "f*o")
// Write multiple blocks in one call
- f.Seek(1, os.SEEK_SET)
+ f.Seek(1, io.SeekStart)
n, err = f.Write([]byte("0123456789abcdefg"))
c.Check(n, check.Equals, 17)
c.Check(err, check.IsNil)
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(18))
- pos, err = f.Seek(-18, os.SEEK_CUR)
+ pos, err = f.Seek(-18, io.SeekCurrent)
c.Check(err, check.IsNil)
n, err = io.ReadFull(f, buf)
c.Check(n, check.Equals, 18)
@@ -264,32 +264,32 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
// truncate to current size
err = f.Truncate(18)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
// shrink to zero some data
f.Truncate(15)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "f0123456789abcd")
// grow to partial block/extent
f.Truncate(20)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
f.Truncate(0)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
f2.Write([]byte("12345678abcdefghijkl"))
// grow to block/extent boundary
f.Truncate(64)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(len(buf2), check.Equals, 64)
@@ -297,7 +297,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
// shrink to block/extent boundary
err = f.Truncate(32)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(len(buf2), check.Equals, 32)
@@ -305,7 +305,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
// shrink to partial block/extent
err = f.Truncate(15)
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "12345678abcdefg")
@@ -322,7 +322,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "")
- f2.Seek(0, os.SEEK_SET)
+ f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "123")
@@ -350,21 +350,21 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
defer f.Close()
fi, err = f.Stat()
c.Check(fi.Size(), check.Equals, size)
- pos, err := f.Seek(0, os.SEEK_END)
+ pos, err := f.Seek(0, io.SeekEnd)
c.Check(pos, check.Equals, size)
}
- f.Seek(2, os.SEEK_END)
+ f.Seek(2, io.SeekEnd)
checkSize(0)
f.Write([]byte{1})
checkSize(3)
- f.Seek(2, os.SEEK_CUR)
+ f.Seek(2, io.SeekCurrent)
checkSize(3)
f.Write([]byte{})
checkSize(5)
- f.Seek(8, os.SEEK_SET)
+ f.Seek(8, io.SeekStart)
checkSize(5)
n, err := f.Read(make([]byte, 1))
c.Check(n, check.Equals, 0)
@@ -466,7 +466,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
case 0:
f.Truncate(int64(rand.Intn(64)))
case 1:
- f.Seek(int64(rand.Intn(64)), os.SEEK_SET)
+ f.Seek(int64(rand.Intn(64)), io.SeekStart)
case 2:
_, err := f.Write([]byte("beep boop"))
c.Check(err, check.IsNil)
@@ -521,13 +521,13 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
}
copy(expect[woff:], wbytes)
f.Truncate(int64(trunc))
- pos, err := f.Seek(int64(woff), os.SEEK_SET)
+ pos, err := f.Seek(int64(woff), io.SeekStart)
c.Check(pos, check.Equals, int64(woff))
c.Check(err, check.IsNil)
n, err := f.Write(wbytes)
c.Check(n, check.Equals, len(wbytes))
c.Check(err, check.IsNil)
- pos, err = f.Seek(0, os.SEEK_SET)
+ pos, err = f.Seek(0, io.SeekStart)
c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
buf, err := ioutil.ReadAll(f)
@@ -815,7 +815,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
c.Assert(err, check.IsNil)
defer f.Close()
- pos, err := f.Seek(0, os.SEEK_END)
+ pos, err := f.Seek(0, io.SeekEnd)
c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
fi, err = f.Stat()
@@ -827,16 +827,16 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
c.Assert(err, check.IsNil)
f.Write([]byte{1, 2, 3})
- f.Seek(0, os.SEEK_SET)
+ f.Seek(0, io.SeekStart)
n, _ = f.Read(buf[:1])
c.Check(n, check.Equals, 1)
c.Check(buf[:1], check.DeepEquals, []byte{1})
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(1))
f.Write([]byte{4, 5, 6})
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(6))
- f.Seek(0, os.SEEK_SET)
+ f.Seek(0, io.SeekStart)
n, err = f.Read(buf)
c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
c.Check(err, check.Equals, io.EOF)
@@ -844,14 +844,14 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
c.Assert(err, check.IsNil)
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
f.Read(buf[:3])
- pos, _ = f.Seek(0, os.SEEK_CUR)
+ pos, _ = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(3))
f.Write([]byte{7, 8, 9})
- pos, err = f.Seek(0, os.SEEK_CUR)
+ pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(9))
f.Close()
@@ -860,9 +860,9 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
n, err = f.Write([]byte{3, 2, 1})
c.Check(n, check.Equals, 3)
c.Check(err, check.IsNil)
- pos, _ = f.Seek(0, os.SEEK_CUR)
+ pos, _ = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(3))
- pos, _ = f.Seek(0, os.SEEK_SET)
+ pos, _ = f.Seek(0, io.SeekStart)
c.Check(pos, check.Equals, int64(0))
n, err = f.Read(buf)
c.Check(n, check.Equals, 0)
commit eb99f0ea588defa3a9e23e76eeac66d277013566
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Nov 27 18:23:55 2017 -0500
12483: Remove dead code, update tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index 539975e..bac4a24 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -7,6 +7,8 @@ package keepclient
import (
"io"
"sort"
+ "strconv"
+ "strings"
"sync"
"time"
)
@@ -66,6 +68,13 @@ func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (
// necessary.
func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
cacheKey := locator[:32]
+ bufsize := BLOCKSIZE
+ if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+ datasize, err := strconv.ParseInt(parts[1], 10, 32)
+ if err == nil && datasize >= 0 {
+ bufsize = int(datasize)
+ }
+ }
c.mtx.Lock()
if c.cache == nil {
c.cache = make(map[string]*cacheBlock)
@@ -81,7 +90,7 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
rdr, size, _, err := kc.Get(locator)
var data []byte
if err == nil {
- data = make([]byte, size, BLOCKSIZE)
+ data = make([]byte, size, bufsize)
_, err = io.ReadFull(rdr, data)
err2 := rdr.Close()
if err == nil {
diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go
index c071d3b..fa309f6 100644
--- a/sdk/go/keepclient/collectionreader.go
+++ b/sdk/go/keepclient/collectionreader.go
@@ -6,25 +6,12 @@ package keepclient
import (
"errors"
- "fmt"
- "io"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
-const (
- // After reading a data block from Keep, cfReader slices it up
- // and sends the slices to a buffered channel to be consumed
- // by the caller via Read().
- //
- // dataSliceSize is the maximum size of the slices, and
- // therefore the maximum number of bytes that will be returned
- // by a single call to Read().
- dataSliceSize = 1 << 20
-)
-
// ErrNoManifest indicates the given collection has no manifest
// information (e.g., manifest_text was excluded by a "select"
// parameter when retrieving the collection record).
@@ -38,8 +25,11 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
if !ok {
return nil, ErrNoManifest
}
- m := manifest.Manifest{Text: mText}
- return kc.ManifestFileReader(m, filename)
+ fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
+ if err != nil {
+ return nil, err
+ }
+ return fs.OpenFile(filename, os.O_RDONLY, 0)
}
func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
@@ -49,127 +39,3 @@ func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (
}
return fs.OpenFile(filename, os.O_RDONLY, 0)
}
-
-type file struct {
- kc *KeepClient
- segments []*manifest.FileSegment
- size int64 // total file size
- offset int64 // current read offset
-
- // current/latest segment accessed -- might or might not match pos
- seg *manifest.FileSegment
- segStart int64 // position of segment relative to file
- segData []byte
- segNext []*manifest.FileSegment
- readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
- f.kc = nil
- f.segments = nil
- f.segData = nil
- return nil
-}
-
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
- if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
- // f.seg does not cover the current read offset
- // (f.pos). Iterate over f.segments to find the one
- // that does.
- f.seg = nil
- f.segStart = 0
- f.segData = nil
- f.segNext = f.segments
- for len(f.segNext) > 0 {
- seg := f.segNext[0]
- f.segNext = f.segNext[1:]
- segEnd := f.segStart + int64(seg.Len)
- if segEnd > f.offset {
- f.seg = seg
- break
- }
- f.segStart = segEnd
- }
- f.readaheadDone = false
- }
- if f.seg == nil {
- return 0, io.EOF
- }
- if f.segData == nil {
- data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
- if err != nil {
- return 0, err
- }
- if len(data) < f.seg.Offset+f.seg.Len {
- return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
- }
- f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
- }
- // dataOff and dataLen denote a portion of f.segData
- // corresponding to a portion of the file at f.offset.
- dataOff := int(f.offset - f.segStart)
- dataLen := f.seg.Len - dataOff
-
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
- // If we have already read more than just the first
- // few bytes of this file, and we have already
- // consumed a noticeable portion of this segment, and
- // there's more data for this file in the next segment
- // ... then there's a good chance we are going to need
- // the data for that next segment soon. Start getting
- // it into the cache now.
- go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
- f.readaheadDone = true
- }
-
- n := len(buf)
- if n > dataLen {
- n = dataLen
- }
- copy(buf[:n], f.segData[dataOff:dataOff+n])
- f.offset += int64(n)
- return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
- var want int64
- switch whence {
- case io.SeekStart:
- want = offset
- case io.SeekCurrent:
- want = f.offset + offset
- case io.SeekEnd:
- want = f.size + offset
- default:
- return f.offset, fmt.Errorf("invalid whence %d", whence)
- }
- if want < 0 {
- return f.offset, fmt.Errorf("attempted seek to %d", want)
- }
- if want > f.size {
- want = f.size
- }
- f.offset = want
- return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
- return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
- f.segments = nil
- f.size = 0
- for seg := range m.FileSegmentIterByName(path) {
- f.segments = append(f.segments, seg)
- f.size += int64(seg.Len)
- }
- if f.segments == nil {
- return os.ErrNotExist
- }
- return nil
-}
diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go
index df8bcb3..5d1e2a1 100644
--- a/sdk/go/keepclient/collectionreader_test.go
+++ b/sdk/go/keepclient/collectionreader_test.go
@@ -166,11 +166,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
h := md5.New()
- var testdata []byte
buf := make([]byte, 4096)
locs := make([]string, len(buf))
+ testdata := make([]byte, 0, len(buf)*len(buf))
filesize := 0
- for i := 0; i < len(locs); i++ {
+ for i := range locs {
_, err := rand.Read(buf[:i])
h.Write(buf[:i])
locs[i], _, err = s.kc.PutB(buf[:i])
@@ -219,11 +219,12 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
+ expectPos := curPos + size + 12345
curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
c.Check(err, check.IsNil)
- c.Check(curPos, check.Equals, size)
+ c.Check(curPos, check.Equals, expectPos)
- curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+ curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
c.Check(err, check.IsNil)
c.Check(curPos, check.Equals, int64(8))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list