[ARVADOS] updated: 1.1.0-138-gda2f8a1
Git user
git at public.curoverse.com
Sat Nov 11 17:28:50 EST 2017
Summary of changes:
sdk/go/arvados/collection_fs.go | 233 ++++++++++++++++++++++++-----------
sdk/go/arvados/collection_fs_test.go | 202 ++++++++++++++++++++++++++++--
sdk/go/keepclient/block_cache.go | 6 +-
sdk/go/keepclient/keepclient.go | 2 +-
4 files changed, 357 insertions(+), 86 deletions(-)
via da2f8a1023b3b625a462718f8836ac27c25b957d (commit)
via 9009baaf0e3aa800de8af11c741d0adc46563200 (commit)
via 6ce00fb7121813f187b555435a3f01c2aa380f93 (commit)
from cad7d333436703d48c2811de8a26caef9fc130ad (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit da2f8a1023b3b625a462718f8836ac27c25b957d
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sat Nov 11 16:48:54 2017 -0500
12483: Add tests for concurrent writers and random write sequences.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index fc2a2b9..5a3afbd 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -263,7 +263,7 @@ func (fn *filenode) Truncate(size int64) error {
fn.Lock()
defer fn.Unlock()
if size < fn.fileinfo.size {
- ptr := fn.seek(filenodePtr{off: size})
+ ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
if ptr.extentOff == 0 {
fn.extents = fn.extents[:ptr.extentIdx]
} else {
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index e635bb7..ff1d738 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -5,10 +5,13 @@
package arvados
import (
+ "fmt"
"io"
"io/ioutil"
+ "math/rand"
"net/http"
"os"
+ "sync"
"testing"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
@@ -298,6 +301,105 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
}
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var wg sync.WaitGroup
+ for n := 0; n < 128; n++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < 6502; i++ {
+ switch rand.Int() & 3 {
+ case 0:
+ f.Truncate(int64(rand.Intn(64)))
+ case 1:
+ f.Seek(int64(rand.Intn(64)), os.SEEK_SET)
+ case 2:
+ _, err := f.Write([]byte("beep boop"))
+ c.Check(err, check.IsNil)
+ case 3:
+ _, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ }
+ }
+ }()
+ }
+ wg.Wait()
+
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var wg sync.WaitGroup
+ for n := 0; n < 128; n++ {
+ wg.Add(1)
+ go func(n int) {
+ defer wg.Done()
+ expect := make([]byte, 0, 64)
+ wbytes := []byte("there's no simple explanation for anything important that any of us do")
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < 6502; i++ {
+ trunc := rand.Intn(65)
+ woff := rand.Intn(trunc + 1)
+ wbytes = wbytes[:rand.Intn(64-woff+1)]
+ for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+ buf[i] = 0
+ }
+ expect = expect[:trunc]
+ if trunc < woff+len(wbytes) {
+ expect = expect[:woff+len(wbytes)]
+ }
+ copy(expect[woff:], wbytes)
+ func() {
+ defer func() {
+ if e := recover(); e != nil {
+ c.Logf("trunc %d, woff %d, wlen %d", trunc, woff, len(wbytes))
+ c.Logf("%#v", f.(*file).inode.(*filenode).extents)
+ panic(e)
+ }
+ }()
+ f.Truncate(int64(trunc))
+ pos, err := f.Seek(int64(woff), os.SEEK_SET)
+ c.Check(pos, check.Equals, int64(woff))
+ c.Check(err, check.IsNil)
+ n, err := f.Write(wbytes)
+ c.Check(n, check.Equals, len(wbytes))
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, os.SEEK_SET)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Check(string(buf), check.Equals, string(expect))
+ c.Check(err, check.IsNil)
+ }()
+ }
+ }(n)
+ }
+ wg.Wait()
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Logf("Readdir(): %#v", fi)
+}
+
// Gocheck boilerplate
func Test(t *testing.T) {
check.TestingT(t)
commit 9009baaf0e3aa800de8af11c741d0adc46563200
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sat Nov 11 15:32:47 2017 -0500
12483: Add (File)Truncate().
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 1669f88..fc2a2b9 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -35,6 +35,7 @@ type File interface {
Size() int64
Readdir(int) ([]os.FileInfo, error)
Stat() (os.FileInfo, error)
+ Truncate(int64) error
}
type keepClient interface {
@@ -122,6 +123,7 @@ type inode interface {
Parent() inode
Read([]byte, filenodePtr) (int, filenodePtr, error)
Write([]byte, filenodePtr) (int, filenodePtr, error)
+ Truncate(int64) error
Readdir() []os.FileInfo
Stat() os.FileInfo
sync.Locker
@@ -257,6 +259,48 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
return
}
+func (fn *filenode) Truncate(size int64) error {
+ fn.Lock()
+ defer fn.Unlock()
+ if size < fn.fileinfo.size {
+ ptr := fn.seek(filenodePtr{off: size})
+ if ptr.extentOff == 0 {
+ fn.extents = fn.extents[:ptr.extentIdx]
+ } else {
+ fn.extents = fn.extents[:ptr.extentIdx+1]
+ e := fn.extents[ptr.extentIdx]
+ if e, ok := e.(writableExtent); ok {
+ e.Truncate(ptr.extentOff)
+ } else {
+ fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+ }
+ }
+ fn.fileinfo.size = size
+ fn.repacked++
+ return nil
+ }
+ for size > fn.fileinfo.size {
+ grow := size - fn.fileinfo.size
+ var e writableExtent
+ var ok bool
+ if len(fn.extents) == 0 {
+ e = &memExtent{}
+ fn.extents = append(fn.extents, e)
+ } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
+ e = &memExtent{}
+ fn.extents = append(fn.extents, e)
+ } else {
+ fn.repacked++
+ }
+ if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
+ grow = maxgrow
+ }
+ e.Truncate(e.Len() + int(grow))
+ fn.fileinfo.size += grow
+ }
+ return nil
+}
+
func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
fn.Lock()
defer fn.Unlock()
@@ -435,6 +479,10 @@ func (f *file) Seek(off int64, whence int) (pos int64, err error) {
return f.ptr.off, nil
}
+func (f *file) Truncate(size int64) error {
+ return f.inode.Truncate(size)
+}
+
func (f *file) Write(p []byte) (n int, err error) {
if !f.writable {
return 0, ErrReadOnlyFile
@@ -628,6 +676,10 @@ func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
return 0, ptr, ErrInvalidOperation
}
+func (dn *dirnode) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
name = strings.TrimSuffix(name, "/")
if name == "." || name == "" {
@@ -720,6 +772,12 @@ func (me *memExtent) Truncate(n int) {
newbuf := make([]byte, n, newsize)
copy(newbuf, me.buf)
me.buf = newbuf
+ } else {
+ // Zero unused part when shrinking, in case we grow
+ // and start using it again later.
+ for i := n; i < len(me.buf); i++ {
+ me.buf[i] = 0
+ }
}
me.buf = me.buf[:n]
}
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index e4eeeff..e635bb7 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -236,6 +236,66 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
buf2, err := ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // truncate to current size
+ err = f.Truncate(18)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // shrink to zero some data
+ f.Truncate(15)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+ // grow to partial block/extent
+ f.Truncate(20)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+ f.Truncate(0)
+ f2.Write([]byte("12345678abcdefghijkl"))
+
+ // grow to block/extent boundary
+ f.Truncate(64)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 64)
+ c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 8)
+
+ // shrink to block/extent boundary
+ err = f.Truncate(32)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 32)
+ c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 4)
+
+ // shrink to partial block/extent
+ err = f.Truncate(15)
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "12345678abcdefg")
+ c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 2)
+
+ // Truncate to size=3 while f2's ptr is at 15
+ err = f.Truncate(3)
+ c.Check(err, check.IsNil)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "")
+ f2.Seek(0, os.SEEK_SET)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "123")
+ c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
}
// Gocheck boilerplate
commit 6ce00fb7121813f187b555435a3f01c2aa380f93
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sat Nov 11 13:26:37 2017 -0500
12483: Simplify extent packing, reduce type casting.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 44b3c72..1669f88 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -23,9 +23,9 @@ var (
ErrFileExists = errors.New("file exists")
ErrInvalidOperation = errors.New("invalid operation")
ErrPermission = os.ErrPermission
-)
-const maxBlockSize = 1 << 26
+ maxBlockSize = 1 << 26
+)
type File interface {
io.Reader
@@ -38,7 +38,7 @@ type File interface {
}
type keepClient interface {
- ReadAt(locator string, p []byte, off int64) (int, error)
+ ReadAt(locator string, p []byte, off int) (int, error)
}
type fileinfo struct {
@@ -177,7 +177,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
} else if ptr.repacked == fn.repacked {
// extentIdx and extentOff accurately reflect ptr.off,
// but might have fallen off the end of an extent
- if int64(ptr.extentOff) >= fn.extents[ptr.extentIdx].Len() {
+ if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
ptr.extentIdx++
ptr.extentOff = 0
}
@@ -200,7 +200,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
// sum(fn.extents[i].Len()) -- but that can't happen
// because we have ensured fn.fileinfo.size is always
// accurate.
- extLen := fn.extents[ptr.extentIdx].Len()
+ extLen := int64(fn.extents[ptr.extentIdx].Len())
if off+extLen > ptr.off {
ptr.extentOff = int(ptr.off - off)
break
@@ -214,7 +214,7 @@ func (fn *filenode) appendExtent(e extent) {
fn.Lock()
defer fn.Unlock()
fn.extents = append(fn.extents, e)
- fn.fileinfo.size += e.Len()
+ fn.fileinfo.size += int64(e.Len())
fn.repacked++
}
@@ -246,7 +246,7 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
if n > 0 {
ptr.off += int64(n)
ptr.extentOff += n
- if int64(ptr.extentOff) == fn.extents[ptr.extentIdx].Len() {
+ if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
ptr.extentIdx++
ptr.extentOff = 0
if ptr.extentIdx < len(fn.extents) && err == io.EOF {
@@ -280,76 +280,90 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
_, curWritable = fn.extents[cur].(writableExtent)
}
var prevAppendable bool
- if prev >= 0 && fn.extents[prev].Len() < int64(maxBlockSize) {
+ if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
_, prevAppendable = fn.extents[prev].(writableExtent)
}
- if ptr.extentOff > 0 {
- if !curWritable {
- // Split a non-writable block.
- if max := int(fn.extents[cur].Len()) - ptr.extentOff; max <= len(cando) {
- cando = cando[:max]
- fn.extents = append(fn.extents, nil)
- copy(fn.extents[cur+1:], fn.extents[cur:])
- } else {
- fn.extents = append(fn.extents, nil, nil)
- copy(fn.extents[cur+2:], fn.extents[cur:])
- fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
- }
- cur++
- prev++
- e := &memExtent{}
- e.Truncate(len(cando))
- fn.extents[cur] = e
- fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
- ptr.extentIdx++
- ptr.extentOff = 0
- fn.repacked++
- ptr.repacked++
+ if ptr.extentOff > 0 && !curWritable {
+ // Split a non-writable block.
+ if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
+ // Truncate cur, and insert a new
+ // extent after it.
+ cando = cando[:max]
+ fn.extents = append(fn.extents, nil)
+ copy(fn.extents[cur+1:], fn.extents[cur:])
+ } else {
+ // Split cur into two copies, truncate
+ // the one on the left, shift the one
+ // on the right, and insert a new
+ // extent between them.
+ fn.extents = append(fn.extents, nil, nil)
+ copy(fn.extents[cur+2:], fn.extents[cur:])
+ fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
}
- } else if len(fn.extents) == 0 {
- // File has no extents yet.
+ cur++
+ prev++
e := &memExtent{}
e.Truncate(len(cando))
- fn.fileinfo.size += e.Len()
- fn.extents = append(fn.extents, e)
+ fn.extents[cur] = e
+ fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
+ ptr.extentIdx++
+ ptr.extentOff = 0
+ fn.repacked++
+ ptr.repacked++
} else if curWritable {
if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
cando = cando[:fit]
}
} else {
if prevAppendable {
- // Grow prev.
- if cangrow := int(maxBlockSize - fn.extents[prev].Len()); cangrow < len(cando) {
+ // Shrink cando if needed to fit in prev extent.
+ if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
cando = cando[:cangrow]
}
- ptr.extentIdx--
- ptr.extentOff = int(fn.extents[prev].Len())
- fn.extents[prev].(*memExtent).Truncate(ptr.extentOff + len(cando))
- } else {
- // Insert an extent between prev and cur. It will be the new prev.
- fn.extents = append(fn.extents, nil)
- copy(fn.extents[cur+1:], fn.extents[cur:])
- e := &memExtent{}
- e.Truncate(len(cando))
- fn.extents[cur] = e
- cur++
- prev++
}
if cur == len(fn.extents) {
- // There is no cur.
- } else if el := int(fn.extents[cur].Len()); el <= len(cando) {
- // Drop cur.
+ // ptr is at EOF, filesize is changing.
+ fn.fileinfo.size += int64(len(cando))
+ } else if el := fn.extents[cur].Len(); el <= len(cando) {
+ // cando is long enough that we won't
+ // need cur any more. shrink cando to
+ // be exactly as long as cur
+ // (otherwise we'd accidentally shift
+ // the effective position of all
+ // extents after cur).
cando = cando[:el]
copy(fn.extents[cur:], fn.extents[cur+1:])
fn.extents = fn.extents[:len(fn.extents)-1]
} else {
- // Shrink cur.
+ // shrink cur by the same #bytes we're growing prev
fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
}
- ptr.repacked++
- fn.repacked++
+ if prevAppendable {
+ // Grow prev.
+ ptr.extentIdx--
+ ptr.extentOff = fn.extents[prev].Len()
+ fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // Insert an extent between prev and cur, and advance prev/cur.
+ fn.extents = append(fn.extents, nil)
+ if cur < len(fn.extents) {
+ copy(fn.extents[cur+1:], fn.extents[cur:])
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // appending a new extent does
+ // not invalidate any ptrs
+ }
+ e := &memExtent{}
+ e.Truncate(len(cando))
+ fn.extents[cur] = e
+ cur++
+ prev++
+ }
}
// Finally we can copy bytes from cando to the current extent.
@@ -359,7 +373,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
ptr.off += int64(len(cando))
ptr.extentOff += len(cando)
- if fn.extents[ptr.extentIdx].Len() == int64(ptr.extentOff) {
+ if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
ptr.extentOff = 0
ptr.extentIdx++
}
@@ -528,10 +542,14 @@ func (dn *dirnode) loadManifest(txt string) {
// FIXME: broken manifest
continue
}
+ // Map the stream offset/range coordinates to
+ // block/offset/range coordinates and add
+ // corresponding storedExtents to the filenode
var pos int64
for _, e := range extents {
- if pos+e.Len() < offset {
- pos += e.Len()
+ next := pos + int64(e.Len())
+ if next < offset {
+ pos = next
continue
}
if pos > offset+length {
@@ -541,7 +559,7 @@ func (dn *dirnode) loadManifest(txt string) {
if pos < offset {
blkOff = int(offset - pos)
}
- blkLen := int(e.Len()) - blkOff
+ blkLen := e.Len() - blkOff
if pos+int64(blkOff+blkLen) > offset+length {
blkLen = int(offset + length - pos - int64(blkOff))
}
@@ -551,7 +569,7 @@ func (dn *dirnode) loadManifest(txt string) {
offset: blkOff,
length: blkLen,
})
- pos += e.Len()
+ pos = next
}
f.Close()
}
@@ -664,8 +682,10 @@ func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, err
type extent interface {
io.ReaderAt
- Len() int64
- Slice(int, int) extent
+ Len() int
+ // Return a new extent with a subsection of the data from this
+ // one. length<0 means length=Len()-off.
+ Slice(off int, length int) extent
}
type writableExtent interface {
@@ -678,15 +698,17 @@ type memExtent struct {
buf []byte
}
-func (me *memExtent) Len() int64 {
- return int64(len(me.buf))
+func (me *memExtent) Len() int {
+ return len(me.buf)
}
-func (me *memExtent) Slice(n, size int) extent {
- if size < 0 {
- size = len(me.buf) - n
+func (me *memExtent) Slice(off, length int) extent {
+ if length < 0 {
+ length = len(me.buf) - off
}
- return &memExtent{buf: me.buf[n : n+size]}
+ buf := make([]byte, length)
+ copy(buf, me.buf[off:])
+ return &memExtent{buf: buf}
}
func (me *memExtent) Truncate(n int) {
@@ -710,7 +732,7 @@ func (me *memExtent) WriteAt(p []byte, off int) {
}
func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
- if off > me.Len() {
+ if off > int64(me.Len()) {
err = io.EOF
return
}
@@ -728,8 +750,8 @@ type storedExtent struct {
length int
}
-func (se storedExtent) Len() int64 {
- return int64(se.length)
+func (se storedExtent) Len() int {
+ return se.length
}
func (se storedExtent) Slice(n, size int) extent {
@@ -742,20 +764,23 @@ func (se storedExtent) Slice(n, size int) extent {
}
func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
- maxlen := int(int64(se.length) - off)
+ if off > int64(se.length) {
+ return 0, io.EOF
+ }
+ maxlen := se.length - int(off)
if len(p) > maxlen {
p = p[:maxlen]
- n, err = se.cache.ReadAt(se.locator, p, off+int64(se.offset))
+ n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
if err == nil {
err = io.EOF
}
return
}
- return se.cache.ReadAt(se.locator, p, off+int64(se.offset))
+ return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
}
type blockCache interface {
- ReadAt(locator string, p []byte, off int64) (n int, err error)
+ ReadAt(locator string, p []byte, off int) (n int, err error)
}
type keepBlockCache struct {
@@ -764,7 +789,7 @@ type keepBlockCache struct {
var scratch = make([]byte, 2<<26)
-func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
return kbc.kc.ReadAt(locator, p, off)
}
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index ae57078..e4eeeff 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -21,12 +21,12 @@ type keepClientStub struct {
blocks map[string][]byte
}
-func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
buf := kcs.blocks[locator[:32]]
if buf == nil {
return 0, os.ErrNotExist
}
- return copy(p, buf[int(off):]), nil
+ return copy(p, buf[off:]), nil
}
type CollectionFSSuite struct {
@@ -177,13 +177,21 @@ func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
}
func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
+ defer f.Close()
st, err := f.Stat()
c.Assert(err, check.IsNil)
c.Check(st.Size(), check.Equals, int64(3))
- buf := make([]byte, 4)
+ f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f2.Close()
+
+ buf := make([]byte, 64)
n, err := f.Read(buf)
c.Check(n, check.Equals, 3)
c.Check(err, check.Equals, io.EOF)
@@ -193,6 +201,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(pos, check.Equals, int64(1))
c.Check(err, check.IsNil)
+ // Split a storedExtent in two, and insert a memExtent
n, err = f.Write([]byte("*"))
c.Check(n, check.Equals, 1)
c.Check(err, check.IsNil)
@@ -205,11 +214,28 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
- buf = make([]byte, 4)
- buf, err = ioutil.ReadAll(f)
- c.Check(len(buf), check.Equals, 3)
+ rbuf, err := ioutil.ReadAll(f)
+ c.Check(len(rbuf), check.Equals, 3)
+ c.Check(err, check.IsNil)
+ c.Check(string(rbuf), check.Equals, "f*o")
+
+ // Write multiple blocks in one call
+ f.Seek(1, os.SEEK_SET)
+ n, err = f.Write([]byte("0123456789abcdefg"))
+ c.Check(n, check.Equals, 17)
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, os.SEEK_CUR)
+ c.Check(pos, check.Equals, int64(18))
+ pos, err = f.Seek(-18, os.SEEK_CUR)
+ c.Check(err, check.IsNil)
+ n, err = io.ReadFull(f, buf)
+ c.Check(n, check.Equals, 18)
+ c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+ c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+ buf2, err := ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
- c.Check(string(buf), check.Equals, "f*o")
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
}
// Gocheck boilerplate
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index c1138fa..539975e 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -51,15 +51,15 @@ func (c *BlockCache) Sweep() {
// ReadAt returns data from the cache, first retrieving it from Keep if
// necessary.
-func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int64) (int, error) {
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
buf, err := c.Get(kc, locator)
if err != nil {
return 0, err
}
- if off > int64(len(buf)) {
+ if off > len(buf) {
return 0, io.ErrUnexpectedEOF
}
- return copy(p, buf[int(off):]), nil
+ return copy(p, buf[off:]), nil
}
// Get returns data from the cache, first retrieving it from Keep if
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index c3d63ed..4bc0fc5 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -294,7 +294,7 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
// ReadAt() retrieves a portion of block from the cache if it's
// present, otherwise from the network.
-func (kc *KeepClient) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
return kc.cache().ReadAt(kc, locator, p, off)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list