[ARVADOS] updated: 0b5306a5bf134dcc6da2dcab40ed45277c566d28
Git user
git at public.curoverse.com
Sun Nov 6 23:43:24 EST 2016
Summary of changes:
services/keepstore/azure_blob_volume.go | 4 +--
services/keepstore/collision.go | 16 +++++++++--
services/keepstore/handlers.go | 11 ++++++--
services/keepstore/s3_volume.go | 47 ++++++++++++++++++-------------
services/keepstore/volume.go | 2 +-
services/keepstore/volume_generic_test.go | 12 ++++----
services/keepstore/volume_test.go | 2 +-
services/keepstore/volume_unix.go | 13 +++++----
services/keepstore/volume_unix_test.go | 14 ++++-----
9 files changed, 75 insertions(+), 46 deletions(-)
via 0b5306a5bf134dcc6da2dcab40ed45277c566d28 (commit)
from dccf36ede6f880d8fcec936f684fb8d5f06771d6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 0b5306a5bf134dcc6da2dcab40ed45277c566d28
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 6 23:41:00 2016 -0500
10467: Interrupt Compare operation if caller disconnects.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 08958b2..59e7cae 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -272,7 +272,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
}
// Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return err
@@ -285,7 +285,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
return v.translateError(err)
}
defer rdr.Close()
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
}
// Put stores a Keep block as a block blob in the container.
diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
index a4af563..82cb789 100644
--- a/services/keepstore/collision.go
+++ b/services/keepstore/collision.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
@@ -49,7 +50,7 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
return <-outcome
}
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
bufLen := 1 << 20
if bufLen > len(expect) && len(expect) > 0 {
// No need for bufLen to be longer than
@@ -67,7 +68,18 @@ func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
// expected to equal the next N bytes read from
// rdr.
for {
- n, err := rdr.Read(buf)
+ ready := make(chan bool)
+ var n int
+ var err error
+ go func() {
+ n, err = rdr.Read(buf)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2c680d3..289dce1 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -647,8 +647,10 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
return n, err
+ } else if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
// Choose a Keep volume to write to.
@@ -699,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
// the relevant block's modification time in order to protect it from
// premature garbage collection. Otherwise, it returns a non-nil
// error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Compare(hash, buf); err == CollisionError {
+ err := vol.Compare(ctx, hash, buf)
+ if ctx.Err() != nil {
+ return 0, ctx.Err()
+ } else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// to tell which one is wanted if we have
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 82e321e..f174bd8 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -222,6 +222,27 @@ func (v *S3Volume) Start() error {
return nil
}
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+ ready := make(chan bool)
+ go func() {
+ rdr, err = v.getReader(loc)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ return
+ case <-ctx.Done():
+ theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+ go func() {
+ <-ready
+ if err == nil {
+ rdr.Close()
+ }
+ }()
+ return nil, ctx.Err()
+ }
+}
+
// getReader wraps (Bucket)GetReader.
//
// In situations where (Bucket)GetReader would fail because the block
@@ -255,25 +276,13 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- ready := make(chan bool)
- var rdr io.ReadCloser
- var err error
- go func() {
- rdr, err = v.getReader(loc)
- close(ready)
- }()
- select {
- case <-ctx.Done():
- theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
- return 0, ctx.Err()
- case <-ready:
- if err != nil {
- return 0, err
- }
+ rdr, err := v.getReaderWithContext(ctx, loc)
+ if err != nil {
+ return 0, err
}
var n int
- ready = make(chan bool)
+ ready := make(chan bool)
go func() {
defer close(ready)
@@ -302,13 +311,13 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
}
// Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
- rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return err
}
defer rdr.Close()
- return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+ return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
}
// Put writes a block.
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 01bb6e2..57e18ab 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -55,7 +55,7 @@ type Volume interface {
// CollisionError or DiskHashError (depending on whether the
// data on disk matches the expected hash), or whatever error
// was encountered opening/reading the stored data.
- Compare(loc string, data []byte) error
+ Compare(ctx context.Context, loc string, data []byte) error
// Put writes a block to an underlying storage device.
//
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4c26335..d910926 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -122,7 +122,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- err := v.Compare(TestHash, TestBlock)
+ err := v.Compare(context.TODO(), TestHash, TestBlock)
if err != os.ErrNotExist {
t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
}
@@ -137,7 +137,7 @@ func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string
v.PutRaw(testHash, testData)
// Compare the block locator with same content
- err := v.Compare(testHash, testData)
+ err := v.Compare(context.TODO(), testHash, testData)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
@@ -155,7 +155,7 @@ func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash stri
v.PutRaw(testHash, testDataA)
// Compare the block locator with different content; collision
- err := v.Compare(TestHash, testDataB)
+ err := v.Compare(context.TODO(), TestHash, testDataB)
if err == nil {
t.Errorf("Got err nil, expected error due to collision")
}
@@ -171,7 +171,7 @@ func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testH
v.PutRaw(TestHash, testDataB)
- err := v.Compare(testHash, testDataA)
+ err := v.Compare(context.TODO(), testHash, testDataA)
if err == nil || err == CollisionError {
t.Errorf("Got err %+v, expected non-collision error", err)
}
@@ -480,7 +480,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.TODO(), TestHash, TestBlock)
if err == nil || !os.IsNotExist(err) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
@@ -816,7 +816,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
return err
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.TODO(), TestHash, TestBlock)
if err != nil {
return err
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index acbd7c9..931c10e 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -96,7 +96,7 @@ func (v *MockVolume) gotCall(method string) {
}
}
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
v.gotCall("Compare")
<-v.Gate
if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 1c676b1..5239ed3 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -183,11 +183,14 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
}
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
f, err := os.Open(path)
if err != nil {
return err
@@ -222,7 +225,7 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
}
var read int
size := int(stat.Size())
- err = v.getFunc(path, func(rdr io.Reader) error {
+ err = v.getFunc(ctx, path, func(rdr io.Reader) error {
read, err = io.ReadFull(rdr, buf[:size])
return err
})
@@ -232,13 +235,13 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
// Compare returns nil if Get(loc) would return the same content as
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
}
- return v.getFunc(path, func(rdr io.Reader) error {
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return v.getFunc(ctx, path, func(rdr io.Reader) error {
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
})
}
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index fad1f12..870489a 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -234,7 +234,7 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
v.Put(context.TODO(), TestHash, TestBlock)
mockErr := errors.New("Mock error")
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
return mockErr
})
if err != mockErr {
@@ -247,7 +247,7 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
defer v.Teardown()
funcCalled := false
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled = true
return nil
})
@@ -269,7 +269,7 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
v.locker = mtx
funcCalled := make(chan struct{})
- go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ go v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled <- struct{}{}
return nil
})
@@ -299,25 +299,25 @@ func TestUnixVolumeCompare(t *testing.T) {
defer v.Teardown()
v.Put(context.TODO(), TestHash, TestBlock)
- err := v.Compare(TestHash, TestBlock)
+ err := v.Compare(context.TODO(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
- err = v.Compare(TestHash, []byte("baddata"))
+ err = v.Compare(context.TODO(), TestHash, []byte("baddata"))
if err != CollisionError {
t.Errorf("Got err %q, expected %q", err, CollisionError)
}
v.Put(context.TODO(), TestHash, []byte("baddata"))
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.TODO(), TestHash, TestBlock)
if err != DiskHashError {
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.TODO(), TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
t.Errorf("Got err %q, expected %q", err, "permission denied")
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list