[ARVADOS] updated: 0b5306a5bf134dcc6da2dcab40ed45277c566d28

Git user git at public.curoverse.com
Sun Nov 6 23:43:24 EST 2016


Summary of changes:
 services/keepstore/azure_blob_volume.go   |  4 +--
 services/keepstore/collision.go           | 16 +++++++++--
 services/keepstore/handlers.go            | 11 ++++++--
 services/keepstore/s3_volume.go           | 47 ++++++++++++++++++-------------
 services/keepstore/volume.go              |  2 +-
 services/keepstore/volume_generic_test.go | 12 ++++----
 services/keepstore/volume_test.go         |  2 +-
 services/keepstore/volume_unix.go         | 13 +++++----
 services/keepstore/volume_unix_test.go    | 14 ++++-----
 9 files changed, 75 insertions(+), 46 deletions(-)

       via  0b5306a5bf134dcc6da2dcab40ed45277c566d28 (commit)
      from  dccf36ede6f880d8fcec936f684fb8d5f06771d6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 0b5306a5bf134dcc6da2dcab40ed45277c566d28
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 23:41:00 2016 -0500

    10467: Interrupt Compare operation if caller disconnects.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 08958b2..59e7cae 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -272,7 +272,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
 }
 
 // Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
 	trashed, _, err := v.checkTrashed(loc)
 	if err != nil {
 		return err
@@ -285,7 +285,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
 		return v.translateError(err)
 	}
 	defer rdr.Close()
-	return compareReaderWithBuf(rdr, expect, loc[:32])
+	return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
 }
 
 // Put stores a Keep block as a block blob in the container.
diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
index a4af563..82cb789 100644
--- a/services/keepstore/collision.go
+++ b/services/keepstore/collision.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io"
@@ -49,7 +50,7 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
 	return <-outcome
 }
 
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
 	bufLen := 1 << 20
 	if bufLen > len(expect) && len(expect) > 0 {
 		// No need for bufLen to be longer than
@@ -67,7 +68,18 @@ func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
 	// expected to equal the next N bytes read from
 	// rdr.
 	for {
-		n, err := rdr.Read(buf)
+		ready := make(chan bool)
+		var n int
+		var err error
+		go func() {
+			n, err = rdr.Read(buf)
+			close(ready)
+		}()
+		select {
+		case <-ready:
+		case <-ctx.Done():
+			return ctx.Err()
+		}
 		if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
 			return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
 		}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2c680d3..289dce1 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -647,8 +647,10 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	// If we already have this data, it's intact on disk, and we
 	// can update its timestamp, return success. If we have
 	// different data with the same hash, return failure.
-	if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+	if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
 		return n, err
+	} else if ctx.Err() != nil {
+		return 0, ErrClientDisconnect
 	}
 
 	// Choose a Keep volume to write to.
@@ -699,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
 	var bestErr error = NotFoundError
 	for _, vol := range KeepVM.AllWritable() {
-		if err := vol.Compare(hash, buf); err == CollisionError {
+		err := vol.Compare(ctx, hash, buf)
+		if ctx.Err() != nil {
+			return 0, ctx.Err()
+		} else if err == CollisionError {
 			// Stop if we have a block with same hash but
 			// different content. (It will be impossible
 			// to tell which one is wanted if we have
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 82e321e..f174bd8 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -222,6 +222,27 @@ func (v *S3Volume) Start() error {
 	return nil
 }
 
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+	ready := make(chan bool)
+	go func() {
+		rdr, err = v.getReader(loc)
+		close(ready)
+	}()
+	select {
+	case <-ready:
+		return
+	case <-ctx.Done():
+		theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+		go func() {
+			<-ready
+			if err == nil {
+				rdr.Close()
+			}
+		}()
+		return nil, ctx.Err()
+	}
+}
+
 // getReader wraps (Bucket)GetReader.
 //
 // In situations where (Bucket)GetReader would fail because the block
@@ -255,25 +276,13 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-	ready := make(chan bool)
-	var rdr io.ReadCloser
-	var err error
-	go func() {
-		rdr, err = v.getReader(loc)
-		close(ready)
-	}()
-	select {
-	case <-ctx.Done():
-		theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
-		return 0, ctx.Err()
-	case <-ready:
-		if err != nil {
-			return 0, err
-		}
+	rdr, err := v.getReaderWithContext(ctx, loc)
+	if err != nil {
+		return 0, err
 	}
 
 	var n int
-	ready = make(chan bool)
+	ready := make(chan bool)
 	go func() {
 		defer close(ready)
 
@@ -302,13 +311,13 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
 }
 
 // Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
-	rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+	rdr, err := v.getReaderWithContext(ctx, loc)
 	if err != nil {
 		return err
 	}
 	defer rdr.Close()
-	return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+	return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
 }
 
 // Put writes a block.
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 01bb6e2..57e18ab 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -55,7 +55,7 @@ type Volume interface {
 	// CollisionError or DiskHashError (depending on whether the
 	// data on disk matches the expected hash), or whatever error
 	// was encountered opening/reading the stored data.
-	Compare(loc string, data []byte) error
+	Compare(ctx context.Context, loc string, data []byte) error
 
 	// Put writes a block to an underlying storage device.
 	//
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4c26335..d910926 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -122,7 +122,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 
-	err := v.Compare(TestHash, TestBlock)
+	err := v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != os.ErrNotExist {
 		t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
 	}
@@ -137,7 +137,7 @@ func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string
 	v.PutRaw(testHash, testData)
 
 	// Compare the block locator with same content
-	err := v.Compare(testHash, testData)
+	err := v.Compare(context.TODO(), testHash, testData)
 	if err != nil {
 		t.Errorf("Got err %q, expected nil", err)
 	}
@@ -155,7 +155,7 @@ func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash stri
 	v.PutRaw(testHash, testDataA)
 
 	// Compare the block locator with different content; collision
-	err := v.Compare(TestHash, testDataB)
+	err := v.Compare(context.TODO(), TestHash, testDataB)
 	if err == nil {
 		t.Errorf("Got err nil, expected error due to collision")
 	}
@@ -171,7 +171,7 @@ func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testH
 
 	v.PutRaw(TestHash, testDataB)
 
-	err := v.Compare(testHash, testDataA)
+	err := v.Compare(context.TODO(), testHash, testDataA)
 	if err == nil || err == CollisionError {
 		t.Errorf("Got err %+v, expected non-collision error", err)
 	}
@@ -480,7 +480,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err == nil || !os.IsNotExist(err) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
@@ -816,7 +816,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 			return err
 		}
 
-		err = v.Compare(TestHash, TestBlock)
+		err = v.Compare(context.TODO(), TestHash, TestBlock)
 		if err != nil {
 			return err
 		}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index acbd7c9..931c10e 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -96,7 +96,7 @@ func (v *MockVolume) gotCall(method string) {
 	}
 }
 
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
 	v.gotCall("Compare")
 	<-v.Gate
 	if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 1c676b1..5239ed3 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -183,11 +183,14 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
 	if v.locker != nil {
 		v.locker.Lock()
 		defer v.locker.Unlock()
 	}
+	if ctx.Err() != nil {
+		return ctx.Err()
+	}
 	f, err := os.Open(path)
 	if err != nil {
 		return err
@@ -222,7 +225,7 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 	}
 	var read int
 	size := int(stat.Size())
-	err = v.getFunc(path, func(rdr io.Reader) error {
+	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
 		read, err = io.ReadFull(rdr, buf[:size])
 		return err
 	})
@@ -232,13 +235,13 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 // Compare returns nil if Get(loc) would return the same content as
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
 	path := v.blockPath(loc)
 	if _, err := v.stat(path); err != nil {
 		return v.translateError(err)
 	}
-	return v.getFunc(path, func(rdr io.Reader) error {
-		return compareReaderWithBuf(rdr, expect, loc[:32])
+	return v.getFunc(ctx, path, func(rdr io.Reader) error {
+		return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
 	})
 }
 
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index fad1f12..870489a 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -234,7 +234,7 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
 
 	v.Put(context.TODO(), TestHash, TestBlock)
 	mockErr := errors.New("Mock error")
-	err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		return mockErr
 	})
 	if err != mockErr {
@@ -247,7 +247,7 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
 	defer v.Teardown()
 
 	funcCalled := false
-	err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		funcCalled = true
 		return nil
 	})
@@ -269,7 +269,7 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
 	v.locker = mtx
 
 	funcCalled := make(chan struct{})
-	go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	go v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		funcCalled <- struct{}{}
 		return nil
 	})
@@ -299,25 +299,25 @@ func TestUnixVolumeCompare(t *testing.T) {
 	defer v.Teardown()
 
 	v.Put(context.TODO(), TestHash, TestBlock)
-	err := v.Compare(TestHash, TestBlock)
+	err := v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != nil {
 		t.Errorf("Got err %q, expected nil", err)
 	}
 
-	err = v.Compare(TestHash, []byte("baddata"))
+	err = v.Compare(context.TODO(), TestHash, []byte("baddata"))
 	if err != CollisionError {
 		t.Errorf("Got err %q, expected %q", err, CollisionError)
 	}
 
 	v.Put(context.TODO(), TestHash, []byte("baddata"))
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != DiskHashError {
 		t.Errorf("Got err %q, expected %q", err, DiskHashError)
 	}
 
 	p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
 	os.Chmod(p, 000)
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
 		t.Errorf("Got err %q, expected %q", err, "permission denied")
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list