[ARVADOS] created: de69c4e7c1dbfff2d0adadf7ca602d7f02d2326c

Git user git at public.curoverse.com
Fri Nov 4 21:26:09 EDT 2016


        at  de69c4e7c1dbfff2d0adadf7ca602d7f02d2326c (commit)


commit de69c4e7c1dbfff2d0adadf7ca602d7f02d2326c
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 4 21:25:59 2016 -0400

    10467: Abort S3 request and release buffer if caller disconnects while server is waiting for S3.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index d2163f6..2011596 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"flag"
 	"fmt"
@@ -163,7 +164,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	trashed, _, err := v.checkTrashed(loc)
 	if err != nil {
 		return 0, err
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c8c898f..bb57dcd 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"encoding/base64"
 	"encoding/xml"
@@ -459,7 +460,7 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
 			t.Error(err)
 		}
 		gotData := make([]byte, len(data))
-		gotLen, err := v.Get(hash, gotData)
+		gotLen, err := v.Get(context.TODO(), hash, gotData)
 		if err != nil {
 			t.Error(err)
 		}
@@ -510,7 +511,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
 	v.azHandler.race <- continuePut
 	go func() {
 		buf := make([]byte, len(TestBlock))
-		_, err := v.Get(TestHash, buf)
+		_, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Error(err)
 		}
@@ -553,7 +554,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 	go func() {
 		defer close(allDone)
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Error(err)
 			return
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index dc9bcb1..e254853 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -11,6 +11,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -564,7 +565,7 @@ func TestDeleteHandler(t *testing.T) {
 	}
 	// Confirm the block has been deleted
 	buf := make([]byte, BlockSize)
-	_, err := vols[0].Get(TestHash, buf)
+	_, err := vols[0].Get(context.TODO(), TestHash, buf)
 	var blockDeleted = os.IsNotExist(err)
 	if !blockDeleted {
 		t.Error("superuserExistingBlockReq: block not deleted")
@@ -588,7 +589,7 @@ func TestDeleteHandler(t *testing.T) {
 			expectedDc, responseDc)
 	}
 	// Confirm the block has NOT been deleted.
-	_, err = vols[0].Get(TestHash, buf)
+	_, err = vols[0].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("testing delete on new block: %s\n", err)
 	}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 54b8b48..143f925 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -9,6 +9,7 @@ package main
 
 import (
 	"container/list"
+	"context"
 	"crypto/md5"
 	"encoding/json"
 	"fmt"
@@ -93,7 +94,14 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 	defer bufs.Put(buf)
 
-	size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+	ctx, cancel := context.WithCancel(context.TODO())
+	if resp, ok := resp.(http.CloseNotifier); ok {
+		go func() {
+			<-resp.CloseNotify()
+			cancel()
+		}()
+	}
+	size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
 	if err != nil {
 		code := http.StatusInternalServerError
 		if err, ok := err.(*KeepError); ok {
@@ -548,12 +556,12 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
 	// Attempt to read the requested hash from a keep volume.
 	errorToCaller := NotFoundError
 
 	for _, vol := range KeepVM.AllReadable() {
-		size, err := vol.Get(hash, buf)
+		size, err := vol.Get(ctx, hash, buf)
 		if err != nil {
 			// IsNotExist is an expected error and may be
 			// ignored. All other errors are logged. In
diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go
index dda7edc..8abf8e0 100644
--- a/services/keepstore/handlers_with_generic_volume_test.go
+++ b/services/keepstore/handlers_with_generic_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 )
 
 // A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
@@ -46,7 +47,7 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 
 	// Get should pass
 	buf := make([]byte, len(testBlock))
-	n, err := GetBlock(testHash, buf, nil)
+	n, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error while getting block %s", err)
 	}
@@ -66,7 +67,7 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
 
 	// Get should fail
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err == nil {
 		t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
 	}
@@ -88,7 +89,7 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 
 	// Check that PutBlock stored the data as expected
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
 	} else if bytes.Compare(buf[:size], testBlock) != 0 {
@@ -113,7 +114,7 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 	// Put succeeded and overwrote the badData in one volume,
 	// and Get should return the testBlock now, ignoring the bad data.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
 	} else if bytes.Compare(buf[:size], testBlock) != 0 {
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index dc6af0f..8413b7d 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"fmt"
 	"io/ioutil"
 	"os"
@@ -67,7 +68,7 @@ func TestGetBlock(t *testing.T) {
 
 	// Check that GetBlock returns success.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != nil {
 		t.Errorf("GetBlock error: %s", err)
 	}
@@ -88,7 +89,7 @@ func TestGetBlockMissing(t *testing.T) {
 
 	// Check that GetBlock returns failure.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != NotFoundError {
 		t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
 	}
@@ -110,7 +111,7 @@ func TestGetBlockCorrupt(t *testing.T) {
 
 	// Check that GetBlock returns failure.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != DiskHashError {
 		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
 	}
@@ -137,7 +138,7 @@ func TestPutBlockOK(t *testing.T) {
 
 	vols := KeepVM.AllReadable()
 	buf := make([]byte, BlockSize)
-	n, err := vols[1].Get(TestHash, buf)
+	n, err := vols[1].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatalf("Volume #0 Get returned error: %v", err)
 	}
@@ -167,7 +168,7 @@ func TestPutBlockOneVol(t *testing.T) {
 	}
 
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != nil {
 		t.Fatalf("GetBlock: %v", err)
 	}
@@ -195,7 +196,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	}
 
 	// Confirm that GetBlock fails to return anything.
-	if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+	if result, err := GetBlock(context.TODO(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
 		t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
 			string(result), err)
 	}
@@ -221,7 +222,7 @@ func TestPutBlockCorrupt(t *testing.T) {
 
 	// The block on disk should now match TestBlock.
 	buf := make([]byte, BlockSize)
-	if size, err := GetBlock(TestHash, buf, nil); err != nil {
+	if size, err := GetBlock(context.TODO(), TestHash, buf, nil); err != nil {
 		t.Errorf("GetBlock: %v", err)
 	} else if bytes.Compare(buf[:size], TestBlock) != 0 {
 		t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
@@ -296,7 +297,7 @@ func TestPutBlockTouchFails(t *testing.T) {
 			oldMtime, newMtime)
 	}
 	buf := make([]byte, BlockSize)
-	n, err := vols[1].Get(TestHash, buf)
+	n, err := vols[1].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatalf("vols[1]: %v", err)
 	}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index caed35b..cdd38bf 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"encoding/base64"
 	"encoding/hex"
 	"flag"
@@ -242,18 +243,48 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-	rdr, err := v.getReader(loc)
-	if err != nil {
-		return 0, err
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+	ready := make(chan bool)
+	var rdr io.ReadCloser
+	var err error
+	go func() {
+		rdr, err = v.getReader(loc)
+		close(ready)
+	}()
+	select {
+	case <-ctx.Done():
+		// Client hung up before we could even send our S3 request
+		return 0, ctx.Err()
+	case <-ready:
+		if err != nil {
+			return 0, err
+		}
 	}
-	defer rdr.Close()
-	n, err := io.ReadFull(rdr, buf)
-	switch err {
-	case nil, io.EOF, io.ErrUnexpectedEOF:
-		return n, nil
-	default:
-		return 0, v.translateError(err)
+
+	var n int
+	ready = make(chan bool)
+	go func() {
+		defer close(ready)
+
+		defer rdr.Close()
+		n, err = io.ReadFull(rdr, buf)
+
+		switch err {
+		case nil, io.EOF, io.ErrUnexpectedEOF:
+			err = nil
+		default:
+			err = v.translateError(err)
+		}
+	}()
+	select {
+	case <-ctx.Done():
+		rdr.Close()
+		// Must wait for ReadFull to return, to ensure it
+		// doesn't write to buf after we return.
+		<-ready
+		return 0, ctx.Err()
+	case <-ready:
+		return n, err
 	}
 }
 
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 76dcbc9..db3f4c6 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io/ioutil"
@@ -223,7 +224,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		// Check canGet
 		loc, blk := setupScenario()
 		buf := make([]byte, len(blk))
-		_, err := v.Get(loc, buf)
+		_, err := v.Get(context.TODO(), loc, buf)
 		c.Check(err == nil, check.Equals, scenario.canGet)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
@@ -233,7 +234,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		loc, blk = setupScenario()
 		err = v.Trash(loc)
 		c.Check(err == nil, check.Equals, scenario.canTrash)
-		_, err = v.Get(loc, buf)
+		_, err = v.Get(context.TODO(), loc, buf)
 		c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
@@ -248,7 +249,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			// should be able to Get after Untrash --
 			// regardless of timestamps, errors, race
 			// conditions, etc.
-			_, err = v.Get(loc, buf)
+			_, err = v.Get(context.TODO(), loc, buf)
 			c.Check(err, check.IsNil)
 		}
 
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 5ec413d..267175d 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"container/list"
+	"context"
 	"testing"
 	"time"
 )
@@ -291,7 +292,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator1 to be un/deleted as expected
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testData.Locator1, buf, nil)
+	size, err := GetBlock(context.TODO(), testData.Locator1, buf, nil)
 	if testData.ExpectLocator1 {
 		if size == 0 || err != nil {
 			t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -304,7 +305,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator2 to be un/deleted as expected
 	if testData.Locator1 != testData.Locator2 {
-		size, err = GetBlock(testData.Locator2, buf, nil)
+		size, err = GetBlock(context.TODO(), testData.Locator2, buf, nil)
 		if testData.ExpectLocator2 {
 			if size == 0 || err != nil {
 				t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
@@ -323,7 +324,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 		locatorFoundIn := 0
 		for _, volume := range KeepVM.AllReadable() {
 			buf := make([]byte, BlockSize)
-			if _, err := volume.Get(testData.Locator1, buf); err == nil {
+			if _, err := volume.Get(context.TODO(), testData.Locator1, buf); err == nil {
 				locatorFoundIn = locatorFoundIn + 1
 			}
 		}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 6e01e75..19a5996 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"io"
 	"sync/atomic"
 	"time"
@@ -47,7 +48,7 @@ type Volume interface {
 	// any of the data.
 	//
 	// len(buf) will not exceed BlockSize.
-	Get(loc string, buf []byte) (int, error)
+	Get(ctx context.Context, loc string, buf []byte) (int, error)
 
 	// Compare the given data with the stored data (i.e., what Get
 	// would return). If equal, return nil. If not, return
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 1738fe9..a0fd3e1 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"os"
@@ -92,7 +93,7 @@ func testGet(t TB, factory TestableVolumeFactory) {
 	v.PutRaw(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -109,7 +110,7 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
 	defer v.Teardown()
 
 	buf := make([]byte, BlockSize)
-	if _, err := v.Get(TestHash2, buf); err == nil {
+	if _, err := v.Get(context.TODO(), TestHash2, buf); err == nil {
 		t.Errorf("Expected error while getting non-existing block %v", TestHash2)
 	}
 }
@@ -211,7 +212,7 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
 
 	putErr := v.Put(testHash, testDataB)
 	buf := make([]byte, BlockSize)
-	n, getErr := v.Get(testHash, buf)
+	n, getErr := v.Get(context.TODO(), testHash, buf)
 	if putErr == nil {
 		// Put must not return a nil error unless it has
 		// overwritten the existing data.
@@ -254,7 +255,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 	}
 
 	data := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, data)
+	n, err := v.Get(context.TODO(), TestHash, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -263,7 +264,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		}
 	}
 
-	n, err = v.Get(TestHash2, data)
+	n, err = v.Get(context.TODO(), TestHash2, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -272,7 +273,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		}
 	}
 
-	n, err = v.Get(TestHash3, data)
+	n, err = v.Get(context.TODO(), TestHash3, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -443,7 +444,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 		t.Error(err)
 	}
 	data := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, data)
+	n, err := v.Get(context.TODO(), TestHash, data)
 	if err != nil {
 		t.Error(err)
 	} else if bytes.Compare(data[:n], TestBlock) != 0 {
@@ -470,7 +471,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 		t.Error(err)
 	}
 	data := make([]byte, BlockSize)
-	if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+	if _, err := v.Get(context.TODO(), TestHash, data); err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -553,7 +554,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	buf := make([]byte, BlockSize)
 
 	// Get from read-only volume should succeed
-	_, err := v.Get(TestHash, buf)
+	_, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}
@@ -563,7 +564,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	if err == nil {
 		t.Errorf("Expected error when putting block in a read-only volume")
 	}
-	_, err = v.Get(TestHash2, buf)
+	_, err = v.Get(context.TODO(), TestHash2, buf)
 	if err == nil {
 		t.Errorf("Expected error when getting block whose put in read-only volume failed")
 	}
@@ -600,7 +601,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 	sem := make(chan int)
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Errorf("err1: %v", err)
 		}
@@ -612,7 +613,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash2, buf)
+		n, err := v.Get(context.TODO(), TestHash2, buf)
 		if err != nil {
 			t.Errorf("err2: %v", err)
 		}
@@ -624,7 +625,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash3, buf)
+		n, err := v.Get(context.TODO(), TestHash3, buf)
 		if err != nil {
 			t.Errorf("err3: %v", err)
 		}
@@ -682,7 +683,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 
 	// Double check that we actually wrote the blocks we expected to write.
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("Get #1: %v", err)
 	}
@@ -690,7 +691,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 		t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
 	}
 
-	n, err = v.Get(TestHash2, buf)
+	n, err = v.Get(context.TODO(), TestHash2, buf)
 	if err != nil {
 		t.Errorf("Get #2: %v", err)
 	}
@@ -698,7 +699,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 		t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
 	}
 
-	n, err = v.Get(TestHash3, buf)
+	n, err = v.Get(context.TODO(), TestHash3, buf)
 	if err != nil {
 		t.Errorf("Get #3: %v", err)
 	}
@@ -725,7 +726,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(hash, buf)
+	n, err := v.Get(context.TODO(), hash, buf)
 	if err != nil {
 		t.Error(err)
 	}
@@ -752,7 +753,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -771,7 +772,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 			t.Fatal(err)
 		}
 	} else {
-		_, err = v.Get(TestHash, buf)
+		_, err = v.Get(context.TODO(), TestHash, buf)
 		if err == nil || !os.IsNotExist(err) {
 			t.Errorf("os.IsNotExist(%v) should have been true", err)
 		}
@@ -784,7 +785,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Get the block - after trash and untrash sequence
-	n, err = v.Get(TestHash, buf)
+	n, err = v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -802,7 +803,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	checkGet := func() error {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			return err
 		}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 6ab386a..917942e 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"errors"
 	"fmt"
@@ -113,7 +114,7 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
 	}
 }
 
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	v.gotCall("Get")
 	<-v.Gate
 	if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index b5753de..02f0f9f 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bufio"
+	"context"
 	"flag"
 	"fmt"
 	"io"
@@ -210,7 +211,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 887247d..72fa819 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"fmt"
 	"io"
@@ -120,7 +121,7 @@ func TestGetNotFound(t *testing.T) {
 	v.Put(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash2, buf)
+	n, err := v.Get(context.TODO(), TestHash2, buf)
 	switch {
 	case os.IsNotExist(err):
 		break
@@ -166,7 +167,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
 	v.PutRaw(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	_, err := v.Get(TestHash, buf)
+	_, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list