[ARVADOS] updated: 1.1.0-138-gda2f8a1

Git user git at public.curoverse.com
Sat Nov 11 17:28:50 EST 2017


Summary of changes:
 sdk/go/arvados/collection_fs.go      | 233 ++++++++++++++++++++++++-----------
 sdk/go/arvados/collection_fs_test.go | 202 ++++++++++++++++++++++++++++--
 sdk/go/keepclient/block_cache.go     |   6 +-
 sdk/go/keepclient/keepclient.go      |   2 +-
 4 files changed, 357 insertions(+), 86 deletions(-)

       via  da2f8a1023b3b625a462718f8836ac27c25b957d (commit)
       via  9009baaf0e3aa800de8af11c741d0adc46563200 (commit)
       via  6ce00fb7121813f187b555435a3f01c2aa380f93 (commit)
      from  cad7d333436703d48c2811de8a26caef9fc130ad (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit da2f8a1023b3b625a462718f8836ac27c25b957d
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sat Nov 11 16:48:54 2017 -0500

    12483: Add tests for concurrent writers and random write sequences.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index fc2a2b9..5a3afbd 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -263,7 +263,7 @@ func (fn *filenode) Truncate(size int64) error {
 	fn.Lock()
 	defer fn.Unlock()
 	if size < fn.fileinfo.size {
-		ptr := fn.seek(filenodePtr{off: size})
+		ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
 		if ptr.extentOff == 0 {
 			fn.extents = fn.extents[:ptr.extentIdx]
 		} else {
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index e635bb7..ff1d738 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -5,10 +5,13 @@
 package arvados
 
 import (
+	"fmt"
 	"io"
 	"io/ioutil"
+	"math/rand"
 	"net/http"
 	"os"
+	"sync"
 	"testing"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
@@ -298,6 +301,105 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
 }
 
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+	maxBlockSize = 8
+	defer func() { maxBlockSize = 2 << 26 }()
+
+	var wg sync.WaitGroup
+	for n := 0; n < 128; n++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+			c.Assert(err, check.IsNil)
+			defer f.Close()
+			for i := 0; i < 6502; i++ {
+				switch rand.Int() & 3 {
+				case 0:
+					f.Truncate(int64(rand.Intn(64)))
+				case 1:
+					f.Seek(int64(rand.Intn(64)), os.SEEK_SET)
+				case 2:
+					_, err := f.Write([]byte("beep boop"))
+					c.Check(err, check.IsNil)
+				case 3:
+					_, err := ioutil.ReadAll(f)
+					c.Check(err, check.IsNil)
+				}
+			}
+		}()
+	}
+	wg.Wait()
+
+	f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+	c.Assert(err, check.IsNil)
+	defer f.Close()
+	buf, err := ioutil.ReadAll(f)
+	c.Check(err, check.IsNil)
+	c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+	maxBlockSize = 8
+	defer func() { maxBlockSize = 2 << 26 }()
+
+	var wg sync.WaitGroup
+	for n := 0; n < 128; n++ {
+		wg.Add(1)
+		go func(n int) {
+			defer wg.Done()
+			expect := make([]byte, 0, 64)
+			wbytes := []byte("there's no simple explanation for anything important that any of us do")
+			f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+			c.Assert(err, check.IsNil)
+			defer f.Close()
+			for i := 0; i < 6502; i++ {
+				trunc := rand.Intn(65)
+				woff := rand.Intn(trunc + 1)
+				wbytes = wbytes[:rand.Intn(64-woff+1)]
+				for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+					buf[i] = 0
+				}
+				expect = expect[:trunc]
+				if trunc < woff+len(wbytes) {
+					expect = expect[:woff+len(wbytes)]
+				}
+				copy(expect[woff:], wbytes)
+				func() {
+					defer func() {
+						if e := recover(); e != nil {
+							c.Logf("trunc %d, woff %d, wlen %d", trunc, woff, len(wbytes))
+							c.Logf("%#v", f.(*file).inode.(*filenode).extents)
+							panic(e)
+						}
+					}()
+					f.Truncate(int64(trunc))
+					pos, err := f.Seek(int64(woff), os.SEEK_SET)
+					c.Check(pos, check.Equals, int64(woff))
+					c.Check(err, check.IsNil)
+					n, err := f.Write(wbytes)
+					c.Check(n, check.Equals, len(wbytes))
+					c.Check(err, check.IsNil)
+					pos, err = f.Seek(0, os.SEEK_SET)
+					c.Check(pos, check.Equals, int64(0))
+					c.Check(err, check.IsNil)
+					buf, err := ioutil.ReadAll(f)
+					c.Check(string(buf), check.Equals, string(expect))
+					c.Check(err, check.IsNil)
+				}()
+			}
+		}(n)
+	}
+	wg.Wait()
+
+	root, err := s.fs.Open("/")
+	c.Assert(err, check.IsNil)
+	defer root.Close()
+	fi, err := root.Readdir(-1)
+	c.Check(err, check.IsNil)
+	c.Logf("Readdir(): %#v", fi)
+}
+
 // Gocheck boilerplate
 func Test(t *testing.T) {
 	check.TestingT(t)

commit 9009baaf0e3aa800de8af11c741d0adc46563200
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sat Nov 11 15:32:47 2017 -0500

    12483: Add (File)Truncate().
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 1669f88..fc2a2b9 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -35,6 +35,7 @@ type File interface {
 	Size() int64
 	Readdir(int) ([]os.FileInfo, error)
 	Stat() (os.FileInfo, error)
+	Truncate(int64) error
 }
 
 type keepClient interface {
@@ -122,6 +123,7 @@ type inode interface {
 	Parent() inode
 	Read([]byte, filenodePtr) (int, filenodePtr, error)
 	Write([]byte, filenodePtr) (int, filenodePtr, error)
+	Truncate(int64) error
 	Readdir() []os.FileInfo
 	Stat() os.FileInfo
 	sync.Locker
@@ -257,6 +259,48 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 	return
 }
 
+func (fn *filenode) Truncate(size int64) error {
+	fn.Lock()
+	defer fn.Unlock()
+	if size < fn.fileinfo.size {
+		ptr := fn.seek(filenodePtr{off: size})
+		if ptr.extentOff == 0 {
+			fn.extents = fn.extents[:ptr.extentIdx]
+		} else {
+			fn.extents = fn.extents[:ptr.extentIdx+1]
+			e := fn.extents[ptr.extentIdx]
+			if e, ok := e.(writableExtent); ok {
+				e.Truncate(ptr.extentOff)
+			} else {
+				fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+			}
+		}
+		fn.fileinfo.size = size
+		fn.repacked++
+		return nil
+	}
+	for size > fn.fileinfo.size {
+		grow := size - fn.fileinfo.size
+		var e writableExtent
+		var ok bool
+		if len(fn.extents) == 0 {
+			e = &memExtent{}
+			fn.extents = append(fn.extents, e)
+		} else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
+			e = &memExtent{}
+			fn.extents = append(fn.extents, e)
+		} else {
+			fn.repacked++
+		}
+		if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
+			grow = maxgrow
+		}
+		e.Truncate(e.Len() + int(grow))
+		fn.fileinfo.size += grow
+	}
+	return nil
+}
+
 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
 	fn.Lock()
 	defer fn.Unlock()
@@ -435,6 +479,10 @@ func (f *file) Seek(off int64, whence int) (pos int64, err error) {
 	return f.ptr.off, nil
 }
 
+func (f *file) Truncate(size int64) error {
+	return f.inode.Truncate(size)
+}
+
 func (f *file) Write(p []byte) (n int, err error) {
 	if !f.writable {
 		return 0, ErrReadOnlyFile
@@ -628,6 +676,10 @@ func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
 	return 0, ptr, ErrInvalidOperation
 }
 
+func (dn *dirnode) Truncate(int64) error {
+	return ErrInvalidOperation
+}
+
 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
 	name = strings.TrimSuffix(name, "/")
 	if name == "." || name == "" {
@@ -720,6 +772,12 @@ func (me *memExtent) Truncate(n int) {
 		newbuf := make([]byte, n, newsize)
 		copy(newbuf, me.buf)
 		me.buf = newbuf
+	} else {
+		// Zero unused part when shrinking, in case we grow
+		// and start using it again later.
+		for i := n; i < len(me.buf); i++ {
+			me.buf[i] = 0
+		}
 	}
 	me.buf = me.buf[:n]
 }
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index e4eeeff..e635bb7 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -236,6 +236,66 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	buf2, err := ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
 	c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+	// truncate to current size
+	err = f.Truncate(18)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+	// shrink to zero some data
+	f.Truncate(15)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+	// grow to partial block/extent
+	f.Truncate(20)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+	f.Truncate(0)
+	f2.Write([]byte("12345678abcdefghijkl"))
+
+	// grow to block/extent boundary
+	f.Truncate(64)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(len(buf2), check.Equals, 64)
+	c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 8)
+
+	// shrink to block/extent boundary
+	err = f.Truncate(32)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(len(buf2), check.Equals, 32)
+	c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 4)
+
+	// shrink to partial block/extent
+	err = f.Truncate(15)
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "12345678abcdefg")
+	c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 2)
+
+	// Truncate to size=3 while f2's ptr is at 15
+	err = f.Truncate(3)
+	c.Check(err, check.IsNil)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "")
+	f2.Seek(0, os.SEEK_SET)
+	buf2, err = ioutil.ReadAll(f2)
+	c.Check(err, check.IsNil)
+	c.Check(string(buf2), check.Equals, "123")
+	c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
 }
 
 // Gocheck boilerplate

commit 6ce00fb7121813f187b555435a3f01c2aa380f93
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sat Nov 11 13:26:37 2017 -0500

    12483: Simplify extent packing, reduce type casting.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go
index 44b3c72..1669f88 100644
--- a/sdk/go/arvados/collection_fs.go
+++ b/sdk/go/arvados/collection_fs.go
@@ -23,9 +23,9 @@ var (
 	ErrFileExists       = errors.New("file exists")
 	ErrInvalidOperation = errors.New("invalid operation")
 	ErrPermission       = os.ErrPermission
-)
 
-const maxBlockSize = 1 << 26
+	maxBlockSize = 1 << 26
+)
 
 type File interface {
 	io.Reader
@@ -38,7 +38,7 @@ type File interface {
 }
 
 type keepClient interface {
-	ReadAt(locator string, p []byte, off int64) (int, error)
+	ReadAt(locator string, p []byte, off int) (int, error)
 }
 
 type fileinfo struct {
@@ -177,7 +177,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
 	} else if ptr.repacked == fn.repacked {
 		// extentIdx and extentOff accurately reflect ptr.off,
 		// but might have fallen off the end of an extent
-		if int64(ptr.extentOff) >= fn.extents[ptr.extentIdx].Len() {
+		if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
 			ptr.extentIdx++
 			ptr.extentOff = 0
 		}
@@ -200,7 +200,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
 		// sum(fn.extents[i].Len()) -- but that can't happen
 		// because we have ensured fn.fileinfo.size is always
 		// accurate.
-		extLen := fn.extents[ptr.extentIdx].Len()
+		extLen := int64(fn.extents[ptr.extentIdx].Len())
 		if off+extLen > ptr.off {
 			ptr.extentOff = int(ptr.off - off)
 			break
@@ -214,7 +214,7 @@ func (fn *filenode) appendExtent(e extent) {
 	fn.Lock()
 	defer fn.Unlock()
 	fn.extents = append(fn.extents, e)
-	fn.fileinfo.size += e.Len()
+	fn.fileinfo.size += int64(e.Len())
 	fn.repacked++
 }
 
@@ -246,7 +246,7 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
 	if n > 0 {
 		ptr.off += int64(n)
 		ptr.extentOff += n
-		if int64(ptr.extentOff) == fn.extents[ptr.extentIdx].Len() {
+		if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
 			ptr.extentIdx++
 			ptr.extentOff = 0
 			if ptr.extentIdx < len(fn.extents) && err == io.EOF {
@@ -280,76 +280,90 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 			_, curWritable = fn.extents[cur].(writableExtent)
 		}
 		var prevAppendable bool
-		if prev >= 0 && fn.extents[prev].Len() < int64(maxBlockSize) {
+		if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
 			_, prevAppendable = fn.extents[prev].(writableExtent)
 		}
-		if ptr.extentOff > 0 {
-			if !curWritable {
-				// Split a non-writable block.
-				if max := int(fn.extents[cur].Len()) - ptr.extentOff; max <= len(cando) {
-					cando = cando[:max]
-					fn.extents = append(fn.extents, nil)
-					copy(fn.extents[cur+1:], fn.extents[cur:])
-				} else {
-					fn.extents = append(fn.extents, nil, nil)
-					copy(fn.extents[cur+2:], fn.extents[cur:])
-					fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
-				}
-				cur++
-				prev++
-				e := &memExtent{}
-				e.Truncate(len(cando))
-				fn.extents[cur] = e
-				fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
-				ptr.extentIdx++
-				ptr.extentOff = 0
-				fn.repacked++
-				ptr.repacked++
+		if ptr.extentOff > 0 && !curWritable {
+			// Split a non-writable block.
+			if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
+				// Truncate cur, and insert a new
+				// extent after it.
+				cando = cando[:max]
+				fn.extents = append(fn.extents, nil)
+				copy(fn.extents[cur+1:], fn.extents[cur:])
+			} else {
+				// Split cur into two copies, truncate
+				// the one on the left, shift the one
+				// on the right, and insert a new
+				// extent between them.
+				fn.extents = append(fn.extents, nil, nil)
+				copy(fn.extents[cur+2:], fn.extents[cur:])
+				fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
 			}
-		} else if len(fn.extents) == 0 {
-			// File has no extents yet.
+			cur++
+			prev++
 			e := &memExtent{}
 			e.Truncate(len(cando))
-			fn.fileinfo.size += e.Len()
-			fn.extents = append(fn.extents, e)
+			fn.extents[cur] = e
+			fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
+			ptr.extentIdx++
+			ptr.extentOff = 0
+			fn.repacked++
+			ptr.repacked++
 		} else if curWritable {
 			if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
 				cando = cando[:fit]
 			}
 		} else {
 			if prevAppendable {
-				// Grow prev.
-				if cangrow := int(maxBlockSize - fn.extents[prev].Len()); cangrow < len(cando) {
+				// Shrink cando if needed to fit in prev extent.
+				if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
 					cando = cando[:cangrow]
 				}
-				ptr.extentIdx--
-				ptr.extentOff = int(fn.extents[prev].Len())
-				fn.extents[prev].(*memExtent).Truncate(ptr.extentOff + len(cando))
-			} else {
-				// Insert an extent between prev and cur. It will be the new prev.
-				fn.extents = append(fn.extents, nil)
-				copy(fn.extents[cur+1:], fn.extents[cur:])
-				e := &memExtent{}
-				e.Truncate(len(cando))
-				fn.extents[cur] = e
-				cur++
-				prev++
 			}
 
 			if cur == len(fn.extents) {
-				// There is no cur.
-			} else if el := int(fn.extents[cur].Len()); el <= len(cando) {
-				// Drop cur.
+				// ptr is at EOF, filesize is changing.
+				fn.fileinfo.size += int64(len(cando))
+			} else if el := fn.extents[cur].Len(); el <= len(cando) {
+				// cando is long enough that we won't
+				// need cur any more. shrink cando to
+				// be exactly as long as cur
+				// (otherwise we'd accidentally shift
+				// the effective position of all
+				// extents after cur).
 				cando = cando[:el]
 				copy(fn.extents[cur:], fn.extents[cur+1:])
 				fn.extents = fn.extents[:len(fn.extents)-1]
 			} else {
-				// Shrink cur.
+				// shrink cur by the same #bytes we're growing prev
 				fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
 			}
 
-			ptr.repacked++
-			fn.repacked++
+			if prevAppendable {
+				// Grow prev.
+				ptr.extentIdx--
+				ptr.extentOff = fn.extents[prev].Len()
+				fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
+				ptr.repacked++
+				fn.repacked++
+			} else {
+				// Insert an extent between prev and cur, and advance prev/cur.
+				fn.extents = append(fn.extents, nil)
+				if cur < len(fn.extents) {
+					copy(fn.extents[cur+1:], fn.extents[cur:])
+					ptr.repacked++
+					fn.repacked++
+				} else {
+					// appending a new extent does
+					// not invalidate any ptrs
+				}
+				e := &memExtent{}
+				e.Truncate(len(cando))
+				fn.extents[cur] = e
+				cur++
+				prev++
+			}
 		}
 
 		// Finally we can copy bytes from cando to the current extent.
@@ -359,7 +373,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 
 		ptr.off += int64(len(cando))
 		ptr.extentOff += len(cando)
-		if fn.extents[ptr.extentIdx].Len() == int64(ptr.extentOff) {
+		if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
 			ptr.extentOff = 0
 			ptr.extentIdx++
 		}
@@ -528,10 +542,14 @@ func (dn *dirnode) loadManifest(txt string) {
 				// FIXME: broken manifest
 				continue
 			}
+			// Map the stream offset/range coordinates to
+			// block/offset/range coordinates and add
+			// corresponding storedExtents to the filenode
 			var pos int64
 			for _, e := range extents {
-				if pos+e.Len() < offset {
-					pos += e.Len()
+				next := pos + int64(e.Len())
+				if next < offset {
+					pos = next
 					continue
 				}
 				if pos > offset+length {
@@ -541,7 +559,7 @@ func (dn *dirnode) loadManifest(txt string) {
 				if pos < offset {
 					blkOff = int(offset - pos)
 				}
-				blkLen := int(e.Len()) - blkOff
+				blkLen := e.Len() - blkOff
 				if pos+int64(blkOff+blkLen) > offset+length {
 					blkLen = int(offset + length - pos - int64(blkOff))
 				}
@@ -551,7 +569,7 @@ func (dn *dirnode) loadManifest(txt string) {
 					offset:  blkOff,
 					length:  blkLen,
 				})
-				pos += e.Len()
+				pos = next
 			}
 			f.Close()
 		}
@@ -664,8 +682,10 @@ func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, err
 
 type extent interface {
 	io.ReaderAt
-	Len() int64
-	Slice(int, int) extent
+	Len() int
+	// Return a new extent with a subsection of the data from this
+	// one. length<0 means length=Len()-off.
+	Slice(off int, length int) extent
 }
 
 type writableExtent interface {
@@ -678,15 +698,17 @@ type memExtent struct {
 	buf []byte
 }
 
-func (me *memExtent) Len() int64 {
-	return int64(len(me.buf))
+func (me *memExtent) Len() int {
+	return len(me.buf)
 }
 
-func (me *memExtent) Slice(n, size int) extent {
-	if size < 0 {
-		size = len(me.buf) - n
+func (me *memExtent) Slice(off, length int) extent {
+	if length < 0 {
+		length = len(me.buf) - off
 	}
-	return &memExtent{buf: me.buf[n : n+size]}
+	buf := make([]byte, length)
+	copy(buf, me.buf[off:])
+	return &memExtent{buf: buf}
 }
 
 func (me *memExtent) Truncate(n int) {
@@ -710,7 +732,7 @@ func (me *memExtent) WriteAt(p []byte, off int) {
 }
 
 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
-	if off > me.Len() {
+	if off > int64(me.Len()) {
 		err = io.EOF
 		return
 	}
@@ -728,8 +750,8 @@ type storedExtent struct {
 	length  int
 }
 
-func (se storedExtent) Len() int64 {
-	return int64(se.length)
+func (se storedExtent) Len() int {
+	return se.length
 }
 
 func (se storedExtent) Slice(n, size int) extent {
@@ -742,20 +764,23 @@ func (se storedExtent) Slice(n, size int) extent {
 }
 
 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
-	maxlen := int(int64(se.length) - off)
+	if off > int64(se.length) {
+		return 0, io.EOF
+	}
+	maxlen := se.length - int(off)
 	if len(p) > maxlen {
 		p = p[:maxlen]
-		n, err = se.cache.ReadAt(se.locator, p, off+int64(se.offset))
+		n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
 		if err == nil {
 			err = io.EOF
 		}
 		return
 	}
-	return se.cache.ReadAt(se.locator, p, off+int64(se.offset))
+	return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
 type blockCache interface {
-	ReadAt(locator string, p []byte, off int64) (n int, err error)
+	ReadAt(locator string, p []byte, off int) (n int, err error)
 }
 
 type keepBlockCache struct {
@@ -764,7 +789,7 @@ type keepBlockCache struct {
 
 var scratch = make([]byte, 2<<26)
 
-func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
 	return kbc.kc.ReadAt(locator, p, off)
 }
 
diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go
index ae57078..e4eeeff 100644
--- a/sdk/go/arvados/collection_fs_test.go
+++ b/sdk/go/arvados/collection_fs_test.go
@@ -21,12 +21,12 @@ type keepClientStub struct {
 	blocks map[string][]byte
 }
 
-func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
 	buf := kcs.blocks[locator[:32]]
 	if buf == nil {
 		return 0, os.ErrNotExist
 	}
-	return copy(p, buf[int(off):]), nil
+	return copy(p, buf[off:]), nil
 }
 
 type CollectionFSSuite struct {
@@ -177,13 +177,21 @@ func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+	maxBlockSize = 8
+	defer func() { maxBlockSize = 2 << 26 }()
+
 	f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
 	c.Assert(err, check.IsNil)
+	defer f.Close()
 	st, err := f.Stat()
 	c.Assert(err, check.IsNil)
 	c.Check(st.Size(), check.Equals, int64(3))
 
-	buf := make([]byte, 4)
+	f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+	c.Assert(err, check.IsNil)
+	defer f2.Close()
+
+	buf := make([]byte, 64)
 	n, err := f.Read(buf)
 	c.Check(n, check.Equals, 3)
 	c.Check(err, check.Equals, io.EOF)
@@ -193,6 +201,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(pos, check.Equals, int64(1))
 	c.Check(err, check.IsNil)
 
+	// Split a storedExtent in two, and insert a memExtent
 	n, err = f.Write([]byte("*"))
 	c.Check(n, check.Equals, 1)
 	c.Check(err, check.IsNil)
@@ -205,11 +214,28 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(pos, check.Equals, int64(0))
 	c.Check(err, check.IsNil)
 
-	buf = make([]byte, 4)
-	buf, err = ioutil.ReadAll(f)
-	c.Check(len(buf), check.Equals, 3)
+	rbuf, err := ioutil.ReadAll(f)
+	c.Check(len(rbuf), check.Equals, 3)
+	c.Check(err, check.IsNil)
+	c.Check(string(rbuf), check.Equals, "f*o")
+
+	// Write multiple blocks in one call
+	f.Seek(1, os.SEEK_SET)
+	n, err = f.Write([]byte("0123456789abcdefg"))
+	c.Check(n, check.Equals, 17)
+	c.Check(err, check.IsNil)
+	pos, err = f.Seek(0, os.SEEK_CUR)
+	c.Check(pos, check.Equals, int64(18))
+	pos, err = f.Seek(-18, os.SEEK_CUR)
+	c.Check(err, check.IsNil)
+	n, err = io.ReadFull(f, buf)
+	c.Check(n, check.Equals, 18)
+	c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+	c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+	buf2, err := ioutil.ReadAll(f2)
 	c.Check(err, check.IsNil)
-	c.Check(string(buf), check.Equals, "f*o")
+	c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
 }
 
 // Gocheck boilerplate
diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go
index c1138fa..539975e 100644
--- a/sdk/go/keepclient/block_cache.go
+++ b/sdk/go/keepclient/block_cache.go
@@ -51,15 +51,15 @@ func (c *BlockCache) Sweep() {
 
 // ReadAt returns data from the cache, first retrieving it from Keep if
 // necessary.
-func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int64) (int, error) {
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
 	buf, err := c.Get(kc, locator)
 	if err != nil {
 		return 0, err
 	}
-	if off > int64(len(buf)) {
+	if off > len(buf) {
 		return 0, io.ErrUnexpectedEOF
 	}
-	return copy(p, buf[int(off):]), nil
+	return copy(p, buf[off:]), nil
 }
 
 // Get returns data from the cache, first retrieving it from Keep if
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index c3d63ed..4bc0fc5 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -294,7 +294,7 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
 
 // ReadAt() retrieves a portion of block from the cache if it's
 // present, otherwise from the network.
-func (kc *KeepClient) ReadAt(locator string, p []byte, off int64) (int, error) {
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
 	return kc.cache().ReadAt(kc, locator, p, off)
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list