[ARVADOS] updated: 9411197dfa8ff4c7d935a395a04b5846c7b52ffd

Git user git at public.curoverse.com
Mon Nov 7 10:43:11 EST 2016


Summary of changes:
 services/keepstore/azure_blob_volume.go | 13 +++++++++++++
 services/keepstore/s3_volume.go         | 10 ++++++++++
 2 files changed, 23 insertions(+)

  discards  b09ef955c31f026c7c15e81038543283bef5742d (commit)
  discards  3c672b9c30b598ab3585acfd2716586cda245990 (commit)
  discards  0b5306a5bf134dcc6da2dcab40ed45277c566d28 (commit)
  discards  dccf36ede6f880d8fcec936f684fb8d5f06771d6 (commit)
  discards  1df4ab124c117a232c5360f6bac95d5409a5b598 (commit)
  discards  cbeb6e5991ac022252310ef61bbd44c6a3d60de8 (commit)
  discards  f8e257317bdd584737e419fabc9ff74203d07680 (commit)
  discards  03dbaf0ae698188507b9beb4f4e590d7da862bcb (commit)
  discards  e3210822c04c9fb1028a63664658110918e35d70 (commit)
  discards  7d53152a4998c8d6c087fa8699cb0866532a9a30 (commit)
  discards  da3bfbf9df9201d6d8ee5444a60dd027721f079f (commit)
  discards  7a7bff01c9270fdadf09d3a13f113a6b80b094ac (commit)
  discards  f52561a96bcc133d85a9d1b15fca26d441911199 (commit)
  discards  d50f618aa74c82afb71d8bc3fc9f926fa5a26d0b (commit)
  discards  b56f0ca375f3e7785824dcb22f4e2967164fd44c (commit)
  discards  de69c4e7c1dbfff2d0adadf7ca602d7f02d2326c (commit)
       via  9411197dfa8ff4c7d935a395a04b5846c7b52ffd (commit)
       via  49a0efed4d774c060db94b9702760d33a4134a17 (commit)
       via  fecb5eb18b9cf15459de8eba44b6e545962d8cd4 (commit)
       via  fa774e69987932acfcabe81ca44d4d6c4fb596bf (commit)
       via  d3512add65497d1af8b8bbceff2296c803873f95 (commit)
       via  d3c5a48fddf2f07d93667f9fa8ca2456f1d8f63c (commit)
       via  4f42bd3f3b2c0526690c3368c9172ed89773e6f1 (commit)
       via  f3b231c69407299133a6eb5ff6066ae6136608e9 (commit)
       via  97b8ba6c2d2023f66cab62b7062cd0dbff837c67 (commit)
       via  ec27d93c1d8918ec509ec3c64ed11dcd51f28374 (commit)
       via  da13bb400f87fdd4157146e2d0b171b730fa3208 (commit)
       via  8040d45d59041859350c56cae195eb09a65a8dde (commit)
       via  9b1a9a3a7de01dc07270b950101d11ae96786de4 (commit)
       via  24c98a345046c650247e6515eeb6d3389e54b68c (commit)
       via  863570108a2c901a8eff22dc8a9bc72635ba7b95 (commit)
       via  39536d8dd7f0a6ab89e106cd065830f1cbb067b1 (commit)
       via  36bbbee25ab89a499f4015fb39845cc2d911aa63 (commit)
       via  1c1f12b1f2c32cdee5fab278f38a65ec246cbbf0 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (b09ef955c31f026c7c15e81038543283bef5742d)
            \
             N -- N -- N (9411197dfa8ff4c7d935a395a04b5846c7b52ffd)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 9411197dfa8ff4c7d935a395a04b5846c7b52ffd
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 7 00:51:35 2016 -0500

    10467: Check for Go 1.7.

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 2797ec3..e326c22 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -158,8 +158,8 @@ sanity_checks() {
     echo -n 'go: '
     go version \
         || fatal "No go binary. See http://golang.org/doc/install"
-    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
-        || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
+    [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 7 ]] \
+        || fatal "Go >= 1.7 required. See http://golang.org/doc/install"
     echo -n 'gcc: '
     gcc --version | egrep ^gcc \
         || fatal "No gcc. Try: apt-get install build-essential"

commit 49a0efed4d774c060db94b9702760d33a4134a17
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Nov 7 00:40:16 2016 -0500

    10467: Update keep-exercise to new SDK.

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 9dc8f94..4131d75 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -47,7 +47,7 @@ func main() {
 	if err != nil {
 		log.Fatal(err)
 	}
-	kc, err := keepclient.MakeKeepClient(&arv)
+	kc, err := keepclient.MakeKeepClient(arv)
 	if err != nil {
 		log.Fatal(err)
 	}

commit fecb5eb18b9cf15459de8eba44b6e545962d8cd4
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 23:41:00 2016 -0500

    10467: Interrupt Compare operation if caller disconnects.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 542a9ca..6ca31c3 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -285,7 +285,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
 }
 
 // Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
 	trashed, _, err := v.checkTrashed(loc)
 	if err != nil {
 		return err
@@ -298,7 +298,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
 		return v.translateError(err)
 	}
 	defer rdr.Close()
-	return compareReaderWithBuf(rdr, expect, loc[:32])
+	return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
 }
 
 // Put stores a Keep block as a block blob in the container.
diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
index a4af563..82cb789 100644
--- a/services/keepstore/collision.go
+++ b/services/keepstore/collision.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io"
@@ -49,7 +50,7 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
 	return <-outcome
 }
 
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
 	bufLen := 1 << 20
 	if bufLen > len(expect) && len(expect) > 0 {
 		// No need for bufLen to be longer than
@@ -67,7 +68,18 @@ func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
 	// expected to equal the next N bytes read from
 	// rdr.
 	for {
-		n, err := rdr.Read(buf)
+		ready := make(chan bool)
+		var n int
+		var err error
+		go func() {
+			n, err = rdr.Read(buf)
+			close(ready)
+		}()
+		select {
+		case <-ready:
+		case <-ctx.Done():
+			return ctx.Err()
+		}
 		if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
 			return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
 		}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2c680d3..289dce1 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -647,8 +647,10 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	// If we already have this data, it's intact on disk, and we
 	// can update its timestamp, return success. If we have
 	// different data with the same hash, return failure.
-	if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+	if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
 		return n, err
+	} else if ctx.Err() != nil {
+		return 0, ErrClientDisconnect
 	}
 
 	// Choose a Keep volume to write to.
@@ -699,10 +701,13 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
 	var bestErr error = NotFoundError
 	for _, vol := range KeepVM.AllWritable() {
-		if err := vol.Compare(hash, buf); err == CollisionError {
+		err := vol.Compare(ctx, hash, buf)
+		if ctx.Err() != nil {
+			return 0, ctx.Err()
+		} else if err == CollisionError {
 			// Stop if we have a block with same hash but
 			// different content. (It will be impossible
 			// to tell which one is wanted if we have
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 4f5b3b1..33919a3 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -232,6 +232,27 @@ func (v *S3Volume) Start() error {
 	return nil
 }
 
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+	ready := make(chan bool)
+	go func() {
+		rdr, err = v.getReader(loc)
+		close(ready)
+	}()
+	select {
+	case <-ready:
+		return
+	case <-ctx.Done():
+		theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+		go func() {
+			<-ready
+			if err == nil {
+				rdr.Close()
+			}
+		}()
+		return nil, ctx.Err()
+	}
+}
+
 // getReader wraps (Bucket)GetReader.
 //
 // In situations where (Bucket)GetReader would fail because the block
@@ -265,25 +286,13 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-	ready := make(chan bool)
-	var rdr io.ReadCloser
-	var err error
-	go func() {
-		rdr, err = v.getReader(loc)
-		close(ready)
-	}()
-	select {
-	case <-ctx.Done():
-		theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
-		return 0, ctx.Err()
-	case <-ready:
-		if err != nil {
-			return 0, err
-		}
+	rdr, err := v.getReaderWithContext(ctx, loc)
+	if err != nil {
+		return 0, err
 	}
 
 	var n int
-	ready = make(chan bool)
+	ready := make(chan bool)
 	go func() {
 		defer close(ready)
 
@@ -312,13 +321,13 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
 }
 
 // Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
-	rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+	rdr, err := v.getReaderWithContext(ctx, loc)
 	if err != nil {
 		return err
 	}
 	defer rdr.Close()
-	return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+	return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
 }
 
 // Put writes a block.
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 01bb6e2..57e18ab 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -55,7 +55,7 @@ type Volume interface {
 	// CollisionError or DiskHashError (depending on whether the
 	// data on disk matches the expected hash), or whatever error
 	// was encountered opening/reading the stored data.
-	Compare(loc string, data []byte) error
+	Compare(ctx context.Context, loc string, data []byte) error
 
 	// Put writes a block to an underlying storage device.
 	//
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 4c26335..d910926 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -122,7 +122,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 
-	err := v.Compare(TestHash, TestBlock)
+	err := v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != os.ErrNotExist {
 		t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
 	}
@@ -137,7 +137,7 @@ func testCompareSameContent(t TB, factory TestableVolumeFactory, testHash string
 	v.PutRaw(testHash, testData)
 
 	// Compare the block locator with same content
-	err := v.Compare(testHash, testData)
+	err := v.Compare(context.TODO(), testHash, testData)
 	if err != nil {
 		t.Errorf("Got err %q, expected nil", err)
 	}
@@ -155,7 +155,7 @@ func testCompareWithCollision(t TB, factory TestableVolumeFactory, testHash stri
 	v.PutRaw(testHash, testDataA)
 
 	// Compare the block locator with different content; collision
-	err := v.Compare(TestHash, testDataB)
+	err := v.Compare(context.TODO(), TestHash, testDataB)
 	if err == nil {
 		t.Errorf("Got err nil, expected error due to collision")
 	}
@@ -171,7 +171,7 @@ func testCompareWithCorruptStoredData(t TB, factory TestableVolumeFactory, testH
 
 	v.PutRaw(TestHash, testDataB)
 
-	err := v.Compare(testHash, testDataA)
+	err := v.Compare(context.TODO(), testHash, testDataA)
 	if err == nil || err == CollisionError {
 		t.Errorf("Got err %+v, expected non-collision error", err)
 	}
@@ -480,7 +480,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err == nil || !os.IsNotExist(err) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
@@ -816,7 +816,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 			return err
 		}
 
-		err = v.Compare(TestHash, TestBlock)
+		err = v.Compare(context.TODO(), TestHash, TestBlock)
 		if err != nil {
 			return err
 		}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index acbd7c9..931c10e 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -96,7 +96,7 @@ func (v *MockVolume) gotCall(method string) {
 	}
 }
 
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
 	v.gotCall("Compare")
 	<-v.Gate
 	if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 1c676b1..5239ed3 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -183,11 +183,14 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
 	if v.locker != nil {
 		v.locker.Lock()
 		defer v.locker.Unlock()
 	}
+	if ctx.Err() != nil {
+		return ctx.Err()
+	}
 	f, err := os.Open(path)
 	if err != nil {
 		return err
@@ -222,7 +225,7 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 	}
 	var read int
 	size := int(stat.Size())
-	err = v.getFunc(path, func(rdr io.Reader) error {
+	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
 		read, err = io.ReadFull(rdr, buf[:size])
 		return err
 	})
@@ -232,13 +235,13 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 // Compare returns nil if Get(loc) would return the same content as
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
 	path := v.blockPath(loc)
 	if _, err := v.stat(path); err != nil {
 		return v.translateError(err)
 	}
-	return v.getFunc(path, func(rdr io.Reader) error {
-		return compareReaderWithBuf(rdr, expect, loc[:32])
+	return v.getFunc(ctx, path, func(rdr io.Reader) error {
+		return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
 	})
 }
 
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index fad1f12..870489a 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -234,7 +234,7 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
 
 	v.Put(context.TODO(), TestHash, TestBlock)
 	mockErr := errors.New("Mock error")
-	err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		return mockErr
 	})
 	if err != mockErr {
@@ -247,7 +247,7 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
 	defer v.Teardown()
 
 	funcCalled := false
-	err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	err := v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		funcCalled = true
 		return nil
 	})
@@ -269,7 +269,7 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
 	v.locker = mtx
 
 	funcCalled := make(chan struct{})
-	go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+	go v.getFunc(context.TODO(), v.blockPath(TestHash), func(rdr io.Reader) error {
 		funcCalled <- struct{}{}
 		return nil
 	})
@@ -299,25 +299,25 @@ func TestUnixVolumeCompare(t *testing.T) {
 	defer v.Teardown()
 
 	v.Put(context.TODO(), TestHash, TestBlock)
-	err := v.Compare(TestHash, TestBlock)
+	err := v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != nil {
 		t.Errorf("Got err %q, expected nil", err)
 	}
 
-	err = v.Compare(TestHash, []byte("baddata"))
+	err = v.Compare(context.TODO(), TestHash, []byte("baddata"))
 	if err != CollisionError {
 		t.Errorf("Got err %q, expected %q", err, CollisionError)
 	}
 
 	v.Put(context.TODO(), TestHash, []byte("baddata"))
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err != DiskHashError {
 		t.Errorf("Got err %q, expected %q", err, DiskHashError)
 	}
 
 	p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
 	os.Chmod(p, 000)
-	err = v.Compare(TestHash, TestBlock)
+	err = v.Compare(context.TODO(), TestHash, TestBlock)
 	if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
 		t.Errorf("Got err %q, expected %q", err, "permission denied")
 	}

commit fa774e69987932acfcabe81ca44d4d6c4fb596bf
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 22:11:52 2016 -0500

    10467: Use ErrClientDisconnect. Convert type assertion panic to 500 error.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 588ee04..2c680d3 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -192,8 +192,11 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	bufs.Put(buf)
 
 	if err != nil {
-		ke := err.(*KeepError)
-		http.Error(resp, ke.Error(), ke.HTTPCode)
+		code := http.StatusInternalServerError
+		if err, ok := err.(*KeepError); ok {
+			code = err.HTTPCode
+		}
+		http.Error(resp, err.Error(), code)
 		return
 	}
 
@@ -572,7 +575,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 		size, err := vol.Get(ctx, hash, buf)
 		select {
 		case <-ctx.Done():
-			return 0, ctx.Err()
+			return 0, ErrClientDisconnect
 		default:
 		}
 		if err != nil {
@@ -655,7 +658,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 			return vol.Replication(), nil // success!
 		}
 		if ctx.Err() != nil {
-			return 0, ctx.Err()
+			return 0, ErrClientDisconnect
 		}
 	}
 
@@ -669,7 +672,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	for _, vol := range writables {
 		err := vol.Put(ctx, hash, block)
 		if ctx.Err() != nil {
-			return 0, ctx.Err()
+			return 0, ErrClientDisconnect
 		}
 		if err == nil {
 			return vol.Replication(), nil // success!

commit d3512add65497d1af8b8bbceff2296c803873f95
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 19:44:22 2016 -0500

    10467: Fix context error not propagated.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 141cca6..588ee04 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -654,6 +654,9 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 		if err := vol.Put(ctx, hash, block); err == nil {
 			return vol.Replication(), nil // success!
 		}
+		if ctx.Err() != nil {
+			return 0, ctx.Err()
+		}
 	}
 
 	writables := KeepVM.AllWritable()
@@ -665,10 +668,8 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	allFull := true
 	for _, vol := range writables {
 		err := vol.Put(ctx, hash, block)
-		select {
-		case <-ctx.Done():
+		if ctx.Err() != nil {
 			return 0, ctx.Err()
-		default:
 		}
 		if err == nil {
 			return vol.Replication(), nil // success!

commit d3c5a48fddf2f07d93667f9fa8ca2456f1d8f63c
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 19:41:02 2016 -0500

    10467: Fix ctx not propagated.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index ba2078f..141cca6 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -651,7 +651,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	// Choose a Keep volume to write to.
 	// If this volume fails, try all of the volumes in order.
 	if vol := KeepVM.NextWritable(); vol != nil {
-		if err := vol.Put(context.TODO(), hash, block); err == nil {
+		if err := vol.Put(ctx, hash, block); err == nil {
 			return vol.Replication(), nil // success!
 		}
 	}

commit 4f42bd3f3b2c0526690c3368c9172ed89773e6f1
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 19:35:05 2016 -0500

    10467: Fix context usage: ensure cancel always gets called.

diff --git a/services/keepstore/config_test.go b/services/keepstore/config_test.go
new file mode 100644
index 0000000..eaa0904
--- /dev/null
+++ b/services/keepstore/config_test.go
@@ -0,0 +1,9 @@
+package main
+
+import (
+	"log"
+)
+
+func init() {
+	theConfig.debugLogf = log.Printf
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 210e2b4..ba2078f 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -72,7 +72,8 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
-	ctx := contextForResponse(context.TODO(), resp)
+	ctx, cancel := contextForResponse(context.TODO(), resp)
+	defer cancel()
 
 	if theConfig.RequireSignatures {
 		locator := req.URL.Path[1:] // strip leading slash
@@ -111,20 +112,20 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's
-// CloseNotifier. If resp does not implement http.CloseNotifier,
-// return parent.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) context.Context {
-	cn, ok := resp.(http.CloseNotifier)
-	if !ok {
-		return parent
-	}
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
 	ctx, cancel := context.WithCancel(parent)
-	go func(c <-chan bool) {
-		<-c
-		cancel()
-	}(cn.CloseNotify())
-	return ctx
+	if cn, ok := resp.(http.CloseNotifier); ok {
+		go func(c <-chan bool) {
+			select {
+			case <-c:
+				theConfig.debugLogf("cancel context")
+				cancel()
+			case <-ctx.Done():
+			}
+		}(cn.CloseNotify())
+	}
+	return ctx, cancel
 }
 
 // Get a buffer from the pool -- but give up and return a non-nil
@@ -150,7 +151,8 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
-	ctx := contextForResponse(context.TODO(), resp)
+	ctx, cancel := contextForResponse(context.TODO(), resp)
+	defer cancel()
 
 	hash := mux.Vars(req)["hash"]
 

commit f3b231c69407299133a6eb5ff6066ae6136608e9
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 19:23:08 2016 -0500

    10467: Tidy up s3 early-cancel.

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 7ef590f..4f5b3b1 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -350,20 +350,16 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 	ready := make(chan bool)
 	go func() {
 		defer func() {
-			select {
-			case <-ctx.Done():
+			if ctx.Err() != nil {
 				theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
-			default:
 			}
 		}()
 		defer close(ready)
 		err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
 		if err != nil {
-			err = v.translateError(err)
 			return
 		}
 		err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
-		err = v.translateError(err)
 	}()
 	select {
 	case <-ctx.Done():
@@ -378,7 +374,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 		theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
 		return ctx.Err()
 	case <-ready:
-		return err
+		return v.translateError(err)
 	}
 }
 

commit 97b8ba6c2d2023f66cab62b7062cd0dbff837c67
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 19:04:00 2016 -0500

    10467: Use context instead of http.CloseNotifier to interrupt buffer waits.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 69807d9..210e2b4 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -89,7 +89,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	// isn't here, we can return 404 now instead of waiting for a
 	// buffer.
 
-	buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
+	buf, err := getBufferWithContext(ctx, bufs, BlockSize)
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return
@@ -128,23 +128,16 @@ func contextForResponse(parent context.Context, resp http.ResponseWriter) contex
 }
 
 // Get a buffer from the pool -- but give up and return a non-nil
-// error if resp implements http.CloseNotifier and tells us that the
-// client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
-	var closeNotifier <-chan bool
-	if resp, ok := resp.(http.CloseNotifier); ok {
-		closeNotifier = resp.CloseNotify()
-	}
-	var buf []byte
+// error if ctx ends before we get a buffer.
+func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
 	bufReady := make(chan []byte)
 	go func() {
 		bufReady <- bufs.Get(bufSize)
-		close(bufReady)
 	}()
 	select {
-	case buf = <-bufReady:
+	case buf := <-bufReady:
 		return buf, nil
-	case <-closeNotifier:
+	case <-ctx.Done():
 		go func() {
 			// Even if closeNotifier happened first, we
 			// need to keep waiting for our buf so we can
@@ -180,7 +173,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+	buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return

commit ec27d93c1d8918ec509ec3c64ed11dcd51f28374
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 6 18:47:53 2016 -0500

    10467: Fix panic: cannot call CloseNotify() after ServeHTTP finishes.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 5dc68df..69807d9 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -120,10 +120,10 @@ func contextForResponse(parent context.Context, resp http.ResponseWriter) contex
 		return parent
 	}
 	ctx, cancel := context.WithCancel(parent)
-	go func() {
-		<-cn.CloseNotify()
+	go func(c <-chan bool) {
+		<-c
 		cancel()
-	}()
+	}(cn.CloseNotify())
 	return ctx
 }
 

commit da13bb400f87fdd4157146e2d0b171b730fa3208
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 5 17:17:28 2016 -0400

    10467: Abort S3 and release buffer if caller disconnects during S3 PUT request.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index b21f68d..542a9ca 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -302,7 +302,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
 }
 
 // Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index bb57dcd..0123bfb 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -455,7 +455,7 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
 			data[i] = byte((i + 7) & 0xff)
 		}
 		hash := fmt.Sprintf("%x", md5.Sum(data))
-		err := v.Put(hash, data)
+		err := v.Put(context.TODO(), hash, data)
 		if err != nil {
 			t.Error(err)
 		}
@@ -501,7 +501,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
 	allDone := make(chan struct{})
 	v.azHandler.race = make(chan chan struct{})
 	go func() {
-		err := v.Put(TestHash, TestBlock)
+		err := v.Put(context.TODO(), TestHash, TestBlock)
 		if err != nil {
 			t.Error(err)
 		}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index e254853..1821383 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -49,7 +49,7 @@ func TestGetHandler(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllWritable()
-	if err := vols[0].Put(TestHash, TestBlock); err != nil {
+	if err := vols[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -289,10 +289,10 @@ func TestIndexHandler(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllWritable()
-	vols[0].Put(TestHash, TestBlock)
-	vols[1].Put(TestHash2, TestBlock2)
-	vols[0].Put(TestHash+".meta", []byte("metadata"))
-	vols[1].Put(TestHash2+".meta", []byte("metadata"))
+	vols[0].Put(context.TODO(), TestHash, TestBlock)
+	vols[1].Put(context.TODO(), TestHash2, TestBlock2)
+	vols[0].Put(context.TODO(), TestHash+".meta", []byte("metadata"))
+	vols[1].Put(context.TODO(), TestHash2+".meta", []byte("metadata"))
 
 	theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
@@ -478,7 +478,7 @@ func TestDeleteHandler(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllWritable()
-	vols[0].Put(TestHash, TestBlock)
+	vols[0].Put(context.TODO(), TestHash, TestBlock)
 
 	// Explicitly set the BlobSignatureTTL to 0 for these
 	// tests, to ensure the MockVolume deletes the blocks
@@ -573,7 +573,7 @@ func TestDeleteHandler(t *testing.T) {
 
 	// A DELETE request on a block newer than BlobSignatureTTL
 	// should return success but leave the block on the volume.
-	vols[0].Put(TestHash, TestBlock)
+	vols[0].Put(context.TODO(), TestHash, TestBlock)
 	theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
 
 	response = IssueRequest(superuserExistingBlockReq)
@@ -941,7 +941,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
 	KeepVM = MakeTestVolumeManager(2)
 	defer KeepVM.Close()
 
-	if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+	if err := KeepVM.AllWritable()[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -986,7 +986,7 @@ func TestGetHandlerNoBufferLeak(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllWritable()
-	if err := vols[0].Put(TestHash, TestBlock); err != nil {
+	if err := vols[0].Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -1041,7 +1041,7 @@ func TestUntrashHandler(t *testing.T) {
 	KeepVM = MakeTestVolumeManager(2)
 	defer KeepVM.Close()
 	vols := KeepVM.AllWritable()
-	vols[0].Put(TestHash, TestBlock)
+	vols[0].Put(context.TODO(), TestHash, TestBlock)
 
 	theConfig.systemAuthToken = "DATA MANAGER TOKEN"
 
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index ac2d712..5dc68df 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -157,6 +157,8 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufS
 
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+	ctx := contextForResponse(context.TODO(), resp)
+
 	hash := mux.Vars(req)["hash"]
 
 	// Detect as many error conditions as possible before reading
@@ -191,7 +193,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	replication, err := PutBlock(buf, hash)
+	replication, err := PutBlock(ctx, buf, hash)
 	bufs.Put(buf)
 
 	if err != nil {
@@ -611,7 +613,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
 //   Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 //   The MD5 checksum of the block must be identical to the content id HASH.
@@ -636,7 +638,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 	// Check that BLOCK's checksum matches HASH.
 	blockhash := fmt.Sprintf("%x", md5.Sum(block))
 	if blockhash != hash {
@@ -654,7 +656,7 @@ func PutBlock(block []byte, hash string) (int, error) {
 	// Choose a Keep volume to write to.
 	// If this volume fails, try all of the volumes in order.
 	if vol := KeepVM.NextWritable(); vol != nil {
-		if err := vol.Put(hash, block); err == nil {
+		if err := vol.Put(context.TODO(), hash, block); err == nil {
 			return vol.Replication(), nil // success!
 		}
 	}
@@ -667,7 +669,12 @@ func PutBlock(block []byte, hash string) (int, error) {
 
 	allFull := true
 	for _, vol := range writables {
-		err := vol.Put(hash, block)
+		err := vol.Put(ctx, hash, block)
+		select {
+		case <-ctx.Done():
+			return 0, ctx.Err()
+		default:
+		}
 		if err == nil {
 			return vol.Replication(), nil // success!
 		}
diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go
index 8abf8e0..2c273ae 100644
--- a/services/keepstore/handlers_with_generic_volume_test.go
+++ b/services/keepstore/handlers_with_generic_volume_test.go
@@ -78,12 +78,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 	setupHandlersWithGenericVolumeTest(t, factory)
 
 	// PutBlock
-	if _, err := PutBlock(testBlock, testHash); err != nil {
+	if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
 		t.Fatalf("Error during PutBlock: %s", err)
 	}
 
 	// Check that PutBlock succeeds again even after CompareAndTouch
-	if _, err := PutBlock(testBlock, testHash); err != nil {
+	if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
 		t.Fatalf("Error during PutBlock: %s", err)
 	}
 
@@ -107,7 +107,7 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 	testableVolumes[1].PutRaw(testHash, badData)
 
 	// Check that PutBlock with good data succeeds
-	if _, err := PutBlock(testBlock, testHash); err != nil {
+	if _, err := PutBlock(context.TODO(), testBlock, testHash); err != nil {
 		t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
 	}
 
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 8413b7d..a2e8044 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -62,7 +62,7 @@ func TestGetBlock(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllReadable()
-	if err := vols[1].Put(TestHash, TestBlock); err != nil {
+	if err := vols[1].Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -107,7 +107,7 @@ func TestGetBlockCorrupt(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllReadable()
-	vols[0].Put(TestHash, BadBlock)
+	vols[0].Put(context.TODO(), TestHash, BadBlock)
 
 	// Check that GetBlock returns failure.
 	buf := make([]byte, BlockSize)
@@ -132,7 +132,7 @@ func TestPutBlockOK(t *testing.T) {
 	defer KeepVM.Close()
 
 	// Check that PutBlock stores the data as expected.
-	if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+	if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
 		t.Fatalf("PutBlock: n %d err %v", n, err)
 	}
 
@@ -163,7 +163,7 @@ func TestPutBlockOneVol(t *testing.T) {
 	vols[0].(*MockVolume).Bad = true
 
 	// Check that PutBlock stores the data as expected.
-	if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+	if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
 		t.Fatalf("PutBlock: n %d err %v", n, err)
 	}
 
@@ -191,7 +191,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 
 	// Check that PutBlock returns the expected error when the hash does
 	// not match the block.
-	if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+	if _, err := PutBlock(context.TODO(), BadBlock, TestHash); err != RequestHashError {
 		t.Errorf("Expected RequestHashError, got %v", err)
 	}
 
@@ -215,8 +215,8 @@ func TestPutBlockCorrupt(t *testing.T) {
 
 	// Store a corrupted block under TestHash.
 	vols := KeepVM.AllWritable()
-	vols[0].Put(TestHash, BadBlock)
-	if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+	vols[0].Put(context.TODO(), TestHash, BadBlock)
+	if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
 		t.Errorf("PutBlock: n %d err %v", n, err)
 	}
 
@@ -247,10 +247,10 @@ func TestPutBlockCollision(t *testing.T) {
 
 	// Store one block, then attempt to store the other. Confirm that
 	// PutBlock reported a CollisionError.
-	if _, err := PutBlock(b1, locator); err != nil {
+	if _, err := PutBlock(context.TODO(), b1, locator); err != nil {
 		t.Error(err)
 	}
-	if _, err := PutBlock(b2, locator); err == nil {
+	if _, err := PutBlock(context.TODO(), b2, locator); err == nil {
 		t.Error("PutBlock did not report a collision")
 	} else if err != CollisionError {
 		t.Errorf("PutBlock returned %v", err)
@@ -272,7 +272,7 @@ func TestPutBlockTouchFails(t *testing.T) {
 	// Store a block and then make the underlying volume bad,
 	// so a subsequent attempt to update the file timestamp
 	// will fail.
-	vols[0].Put(TestHash, BadBlock)
+	vols[0].Put(context.TODO(), TestHash, BadBlock)
 	oldMtime, err := vols[0].Mtime(TestHash)
 	if err != nil {
 		t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
@@ -281,7 +281,7 @@ func TestPutBlockTouchFails(t *testing.T) {
 	// vols[0].Touch will fail on the next call, so the volume
 	// manager will store a copy on vols[1] instead.
 	vols[0].(*MockVolume).Touchable = false
-	if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+	if n, err := PutBlock(context.TODO(), TestBlock, TestHash); err != nil || n < 1 {
 		t.Fatalf("PutBlock: n %d err %v", n, err)
 	}
 	vols[0].(*MockVolume).Touchable = true
@@ -401,11 +401,11 @@ func TestIndex(t *testing.T) {
 	defer KeepVM.Close()
 
 	vols := KeepVM.AllReadable()
-	vols[0].Put(TestHash, TestBlock)
-	vols[1].Put(TestHash2, TestBlock2)
-	vols[0].Put(TestHash3, TestBlock3)
-	vols[0].Put(TestHash+".meta", []byte("metadata"))
-	vols[1].Put(TestHash2+".meta", []byte("metadata"))
+	vols[0].Put(context.TODO(), TestHash, TestBlock)
+	vols[1].Put(context.TODO(), TestHash2, TestBlock2)
+	vols[0].Put(context.TODO(), TestHash3, TestBlock3)
+	vols[0].Put(context.TODO(), TestHash+".meta", []byte("metadata"))
+	vols[1].Put(context.TODO(), TestHash2+".meta", []byte("metadata"))
 
 	buf := new(bytes.Buffer)
 	vols[0].IndexTo("", buf)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index d53d106..e42b6e4 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"crypto/rand"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -94,6 +95,6 @@ func GenerateRandomAPIToken() string {
 
 // Put block
 var PutContent = func(content []byte, locator string) (err error) {
-	_, err = PutBlock(content, locator)
+	_, err = PutBlock(context.TODO(), content, locator)
 	return
 }
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 6339cf8..7ef590f 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -1,12 +1,14 @@
 package main
 
 import (
+	"bytes"
 	"context"
 	"encoding/base64"
 	"encoding/hex"
 	"flag"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"log"
 	"net/http"
 	"os"
@@ -320,24 +322,64 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
 }
 
 // Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
 	var opts s3.Options
-	if len(block) > 0 {
+	size := len(block)
+	if size > 0 {
 		md5, err := hex.DecodeString(loc)
 		if err != nil {
 			return err
 		}
 		opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
 	}
-	err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
-	if err != nil {
-		return v.translateError(err)
+
+	// Send the block data through a pipe, so that (if we need to)
+	// we can close the pipe early and abandon our PutReader()
+	// goroutine, without worrying about PutReader() accessing our
+	// block buffer after we release it.
+	bufr, bufw := io.Pipe()
+	go func() {
+		io.Copy(bufw, bytes.NewReader(block))
+		bufw.Close()
+	}()
+
+	var err error
+	ready := make(chan bool)
+	go func() {
+		defer func() {
+			select {
+			case <-ctx.Done():
+				theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+			default:
+			}
+		}()
+		defer close(ready)
+		err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+		if err != nil {
+			err = v.translateError(err)
+			return
+		}
+		err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+		err = v.translateError(err)
+	}()
+	select {
+	case <-ctx.Done():
+		theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+		// Our pipe might be stuck in Write(), waiting for
+		// io.Copy() to read. If so, un-stick it. This means
+		// PutReader will get corrupt data, but that's OK: the
+		// size and MD5 won't match, so the write will fail.
+		go io.Copy(ioutil.Discard, bufr)
+		// CloseWithError() will return once pending I/O is done.
+		bufw.CloseWithError(ctx.Err())
+		theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+		return ctx.Err()
+	case <-ready:
+		return err
 	}
-	err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
-	return v.translateError(err)
 }
 
 // Touch sets the timestamp for the given locator to the current time.
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index db3f4c6..b720777 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -270,7 +270,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		// Check for current Mtime after Put (applies to all
 		// scenarios)
 		loc, blk = setupScenario()
-		err = v.Put(loc, blk)
+		err = v.Put(context.TODO(), loc, blk)
 		c.Check(err, check.IsNil)
 		t, err := v.Mtime(loc)
 		c.Check(err, check.IsNil)
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 267175d..857f86a 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -220,15 +220,15 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	// Put test content
 	vols := KeepVM.AllWritable()
 	if testData.CreateData {
-		vols[0].Put(testData.Locator1, testData.Block1)
-		vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+		vols[0].Put(context.TODO(), testData.Locator1, testData.Block1)
+		vols[0].Put(context.TODO(), testData.Locator1+".meta", []byte("metadata"))
 
 		if testData.CreateInVolume1 {
-			vols[0].Put(testData.Locator2, testData.Block2)
-			vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+			vols[0].Put(context.TODO(), testData.Locator2, testData.Block2)
+			vols[0].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata"))
 		} else {
-			vols[1].Put(testData.Locator2, testData.Block2)
-			vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+			vols[1].Put(context.TODO(), testData.Locator2, testData.Block2)
+			vols[1].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata"))
 		}
 	}
 
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 19a5996..01bb6e2 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -85,7 +85,7 @@ type Volume interface {
 	//
 	// Put should not verify that loc==hash(block): this is the
 	// caller's responsibility.
-	Put(loc string, block []byte) error
+	Put(ctx context.Context, loc string, block []byte) error
 
 	// Touch sets the timestamp for the given locator to the
 	// current time.
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index a0fd3e1..4c26335 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -187,12 +187,12 @@ func testPutBlockWithSameContent(t TB, factory TestableVolumeFactory, testHash s
 		return
 	}
 
-	err := v.Put(testHash, testData)
+	err := v.Put(context.TODO(), testHash, testData)
 	if err != nil {
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
 	}
 
-	err = v.Put(testHash, testData)
+	err = v.Put(context.TODO(), testHash, testData)
 	if err != nil {
 		t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
 	}
@@ -210,7 +210,7 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
 
 	v.PutRaw(testHash, testDataA)
 
-	putErr := v.Put(testHash, testDataB)
+	putErr := v.Put(context.TODO(), testHash, testDataB)
 	buf := make([]byte, BlockSize)
 	n, getErr := v.Get(context.TODO(), testHash, buf)
 	if putErr == nil {
@@ -239,17 +239,17 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		return
 	}
 
-	err := v.Put(TestHash, TestBlock)
+	err := v.Put(context.TODO(), TestHash, TestBlock)
 	if err != nil {
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
 	}
 
-	err = v.Put(TestHash2, TestBlock2)
+	err = v.Put(context.TODO(), TestHash2, TestBlock2)
 	if err != nil {
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
 	}
 
-	err = v.Put(TestHash3, TestBlock3)
+	err = v.Put(context.TODO(), TestHash3, TestBlock3)
 	if err != nil {
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
 	}
@@ -295,7 +295,7 @@ func testPutAndTouch(t TB, factory TestableVolumeFactory) {
 		return
 	}
 
-	if err := v.Put(TestHash, TestBlock); err != nil {
+	if err := v.Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -315,7 +315,7 @@ func testPutAndTouch(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Write the same block again.
-	if err := v.Put(TestHash, TestBlock); err != nil {
+	if err := v.Put(context.TODO(), TestHash, TestBlock); err != nil {
 		t.Error(err)
 	}
 
@@ -438,7 +438,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 		return
 	}
 
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 
 	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
@@ -464,7 +464,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 		return
 	}
 
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
 	if err := v.Trash(TestHash); err != nil {
@@ -560,7 +560,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Put a new block to read-only volume should result in error
-	err = v.Put(TestHash2, TestBlock2)
+	err = v.Put(context.TODO(), TestHash2, TestBlock2)
 	if err == nil {
 		t.Errorf("Expected error when putting block in a read-only volume")
 	}
@@ -582,7 +582,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Overwriting an existing block in read-only volume should result in error
-	err = v.Put(TestHash, TestBlock)
+	err = v.Put(context.TODO(), TestHash, TestBlock)
 	if err == nil {
 		t.Errorf("Expected error when putting block in a read-only volume")
 	}
@@ -653,7 +653,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 
 	sem := make(chan int)
 	go func(sem chan int) {
-		err := v.Put(TestHash, TestBlock)
+		err := v.Put(context.TODO(), TestHash, TestBlock)
 		if err != nil {
 			t.Errorf("err1: %v", err)
 		}
@@ -661,7 +661,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 	}(sem)
 
 	go func(sem chan int) {
-		err := v.Put(TestHash2, TestBlock2)
+		err := v.Put(context.TODO(), TestHash2, TestBlock2)
 		if err != nil {
 			t.Errorf("err2: %v", err)
 		}
@@ -669,7 +669,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 	}(sem)
 
 	go func(sem chan int) {
-		err := v.Put(TestHash3, TestBlock3)
+		err := v.Put(context.TODO(), TestHash3, TestBlock3)
 		if err != nil {
 			t.Errorf("err3: %v", err)
 		}
@@ -721,7 +721,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
 	wdata[0] = 'a'
 	wdata[BlockSize-1] = 'z'
 	hash := fmt.Sprintf("%x", md5.Sum(wdata))
-	err := v.Put(hash, wdata)
+	err := v.Put(context.TODO(), hash, wdata)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 917942e..acbd7c9 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -126,7 +126,7 @@ func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 	return 0, os.ErrNotExist
 }
 
-func (v *MockVolume) Put(loc string, block []byte) error {
+func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
 	v.gotCall("Put")
 	<-v.Gate
 	if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 02f0f9f..1c676b1 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -246,7 +246,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
@@ -271,6 +271,11 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 		v.locker.Lock()
 		defer v.locker.Unlock()
 	}
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+	}
 	if _, err := tmpfile.Write(block); err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
 		tmpfile.Close()
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 72fa819..fad1f12 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -46,7 +46,7 @@ func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
 		v.ReadOnly = orig
 	}(v.ReadOnly)
 	v.ReadOnly = false
-	err := v.Put(locator, data)
+	err := v.Put(context.TODO(), locator, data)
 	if err != nil {
 		v.t.Fatal(err)
 	}
@@ -118,7 +118,7 @@ func TestReplicationDefault1(t *testing.T) {
 func TestGetNotFound(t *testing.T) {
 	v := NewTestableUnixVolume(t, false, false)
 	defer v.Teardown()
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
 	n, err := v.Get(context.TODO(), TestHash2, buf)
@@ -136,7 +136,7 @@ func TestPut(t *testing.T) {
 	v := NewTestableUnixVolume(t, false, false)
 	defer v.Teardown()
 
-	err := v.Put(TestHash, TestBlock)
+	err := v.Put(context.TODO(), TestHash, TestBlock)
 	if err != nil {
 		t.Error(err)
 	}
@@ -154,7 +154,7 @@ func TestPutBadVolume(t *testing.T) {
 	defer v.Teardown()
 
 	os.Chmod(v.Root, 000)
-	err := v.Put(TestHash, TestBlock)
+	err := v.Put(context.TODO(), TestHash, TestBlock)
 	if err == nil {
 		t.Error("Write should have failed")
 	}
@@ -172,7 +172,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
 		t.Errorf("got err %v, expected nil", err)
 	}
 
-	err = v.Put(TestHash, TestBlock)
+	err = v.Put(context.TODO(), TestHash, TestBlock)
 	if err != MethodDisabledError {
 		t.Errorf("got err %v, expected MethodDisabledError", err)
 	}
@@ -232,7 +232,7 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
 	v := NewTestableUnixVolume(t, false, false)
 	defer v.Teardown()
 
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 	mockErr := errors.New("Mock error")
 	err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
 		return mockErr
@@ -263,7 +263,7 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
 	v := NewTestableUnixVolume(t, false, false)
 	defer v.Teardown()
 
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 
 	mtx := NewMockMutex()
 	v.locker = mtx
@@ -298,7 +298,7 @@ func TestUnixVolumeCompare(t *testing.T) {
 	v := NewTestableUnixVolume(t, false, false)
 	defer v.Teardown()
 
-	v.Put(TestHash, TestBlock)
+	v.Put(context.TODO(), TestHash, TestBlock)
 	err := v.Compare(TestHash, TestBlock)
 	if err != nil {
 		t.Errorf("Got err %q, expected nil", err)
@@ -309,7 +309,7 @@ func TestUnixVolumeCompare(t *testing.T) {
 		t.Errorf("Got err %q, expected %q", err, CollisionError)
 	}
 
-	v.Put(TestHash, []byte("baddata"))
+	v.Put(context.TODO(), TestHash, []byte("baddata"))
 	err = v.Compare(TestHash, TestBlock)
 	if err != DiskHashError {
 		t.Errorf("Got err %q, expected %q", err, DiskHashError)

commit 8040d45d59041859350c56cae195eb09a65a8dde
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 5 16:10:34 2016 -0400

    10467: Move http request context setup out to func.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 8e1ec7d..ac2d712 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -72,6 +72,8 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 
 // GetBlockHandler is a HandleFunc to address Get block requests.
 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+	ctx := contextForResponse(context.TODO(), resp)
+
 	if theConfig.RequireSignatures {
 		locator := req.URL.Path[1:] // strip leading slash
 		if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -94,13 +96,6 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 	defer bufs.Put(buf)
 
-	ctx, cancel := context.WithCancel(context.TODO())
-	if resp, ok := resp.(http.CloseNotifier); ok {
-		go func() {
-			<-resp.CloseNotify()
-			cancel()
-		}()
-	}
 	size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
 	if err != nil {
 		code := http.StatusInternalServerError
@@ -116,6 +111,22 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	resp.Write(buf[:size])
 }
 
+// Return a new context that gets cancelled by resp's
+// CloseNotifier. If resp does not implement http.CloseNotifier,
+// return parent.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) context.Context {
+	cn, ok := resp.(http.CloseNotifier)
+	if !ok {
+		return parent
+	}
+	ctx, cancel := context.WithCancel(parent)
+	go func() {
+		<-cn.CloseNotify()
+		cancel()
+	}()
+	return ctx
+}
+
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if resp implements http.CloseNotifier and tells us that the
 // client has disconnected before we get a buffer.

commit 9b1a9a3a7de01dc07270b950101d11ae96786de4
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 5 15:56:45 2016 -0400

    10467: Return "context done" error instead of 404 if client hangs up during GET.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 143f925..8e1ec7d 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -562,6 +562,11 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 
 	for _, vol := range KeepVM.AllReadable() {
 		size, err := vol.Get(ctx, hash, buf)
+		select {
+		case <-ctx.Done():
+			return 0, ctx.Err()
+		default:
+		}
 		if err != nil {
 			// IsNotExist is an expected error and may be
 			// ignored. All other errors are logged. In

commit 24c98a345046c650247e6515eeb6d3389e54b68c
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Nov 5 14:53:59 2016 -0400

    10467: Add Debug flag to config.

diff --git a/services/keepstore/config.go b/services/keepstore/config.go
index 9c318d1..dc06ef5 100644
--- a/services/keepstore/config.go
+++ b/services/keepstore/config.go
@@ -13,6 +13,7 @@ import (
 )
 
 type Config struct {
+	Debug  bool
 	Listen string
 
 	PIDFile string
@@ -32,6 +33,7 @@ type Config struct {
 
 	blobSigningKey  []byte
 	systemAuthToken string
+	debugLogf       func(string, ...interface{})
 }
 
 var theConfig = DefaultConfig()
@@ -52,6 +54,13 @@ func DefaultConfig() *Config {
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
 func (cfg *Config) Start() error {
+	if cfg.Debug {
+		cfg.debugLogf = log.Printf
+		cfg.debugLogf("debugging enabled")
+	} else {
+		cfg.debugLogf = func(string, ...interface{}) {}
+	}
+
 	if cfg.MaxBuffers < 0 {
 		return fmt.Errorf("MaxBuffers must be greater than zero")
 	}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 16269ae..6339cf8 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -272,7 +272,7 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
 	}()
 	select {
 	case <-ctx.Done():
-		// Client hung up before we could even send our S3 request
+		theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
 		return 0, ctx.Err()
 	case <-ready:
 		if err != nil {
@@ -297,9 +297,11 @@ func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error)
 	}()
 	select {
 	case <-ctx.Done():
+		theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
 		rdr.Close()
 		// Must wait for ReadFull to return, to ensure it
 		// doesn't write to buf after we return.
+		theConfig.debugLogf("s3: waiting for ReadFull() to fail")
 		<-ready
 		return 0, ctx.Err()
 	case <-ready:

commit 863570108a2c901a8eff22dc8a9bc72635ba7b95
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Nov 4 21:25:59 2016 -0400

    10467: Abort S3 request and release buffer if caller disconnects while server is waiting for S3.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index d2c4620..b21f68d 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"flag"
 	"fmt"
@@ -176,7 +177,7 @@ func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, err
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	trashed, _, err := v.checkTrashed(loc)
 	if err != nil {
 		return 0, err
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c8c898f..bb57dcd 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"encoding/base64"
 	"encoding/xml"
@@ -459,7 +460,7 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
 			t.Error(err)
 		}
 		gotData := make([]byte, len(data))
-		gotLen, err := v.Get(hash, gotData)
+		gotLen, err := v.Get(context.TODO(), hash, gotData)
 		if err != nil {
 			t.Error(err)
 		}
@@ -510,7 +511,7 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
 	v.azHandler.race <- continuePut
 	go func() {
 		buf := make([]byte, len(TestBlock))
-		_, err := v.Get(TestHash, buf)
+		_, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Error(err)
 		}
@@ -553,7 +554,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 	go func() {
 		defer close(allDone)
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Error(err)
 			return
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index dc9bcb1..e254853 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -11,6 +11,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"encoding/json"
 	"fmt"
 	"net/http"
@@ -564,7 +565,7 @@ func TestDeleteHandler(t *testing.T) {
 	}
 	// Confirm the block has been deleted
 	buf := make([]byte, BlockSize)
-	_, err := vols[0].Get(TestHash, buf)
+	_, err := vols[0].Get(context.TODO(), TestHash, buf)
 	var blockDeleted = os.IsNotExist(err)
 	if !blockDeleted {
 		t.Error("superuserExistingBlockReq: block not deleted")
@@ -588,7 +589,7 @@ func TestDeleteHandler(t *testing.T) {
 			expectedDc, responseDc)
 	}
 	// Confirm the block has NOT been deleted.
-	_, err = vols[0].Get(TestHash, buf)
+	_, err = vols[0].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("testing delete on new block: %s\n", err)
 	}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 54b8b48..143f925 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -9,6 +9,7 @@ package main
 
 import (
 	"container/list"
+	"context"
 	"crypto/md5"
 	"encoding/json"
 	"fmt"
@@ -93,7 +94,14 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 	defer bufs.Put(buf)
 
-	size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+	ctx, cancel := context.WithCancel(context.TODO())
+	if resp, ok := resp.(http.CloseNotifier); ok {
+		go func() {
+			<-resp.CloseNotify()
+			cancel()
+		}()
+	}
+	size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
 	if err != nil {
 		code := http.StatusInternalServerError
 		if err, ok := err.(*KeepError); ok {
@@ -548,12 +556,12 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
 	// Attempt to read the requested hash from a keep volume.
 	errorToCaller := NotFoundError
 
 	for _, vol := range KeepVM.AllReadable() {
-		size, err := vol.Get(hash, buf)
+		size, err := vol.Get(ctx, hash, buf)
 		if err != nil {
 			// IsNotExist is an expected error and may be
 			// ignored. All other errors are logged. In
diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go
index dda7edc..8abf8e0 100644
--- a/services/keepstore/handlers_with_generic_volume_test.go
+++ b/services/keepstore/handlers_with_generic_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 )
 
 // A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
@@ -46,7 +47,7 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 
 	// Get should pass
 	buf := make([]byte, len(testBlock))
-	n, err := GetBlock(testHash, buf, nil)
+	n, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error while getting block %s", err)
 	}
@@ -66,7 +67,7 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
 
 	// Get should fail
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err == nil {
 		t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
 	}
@@ -88,7 +89,7 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 
 	// Check that PutBlock stored the data as expected
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
 	} else if bytes.Compare(buf[:size], testBlock) != 0 {
@@ -113,7 +114,7 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 	// Put succeeded and overwrote the badData in one volume,
 	// and Get should return the testBlock now, ignoring the bad data.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testHash, buf, nil)
+	size, err := GetBlock(context.TODO(), testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
 	} else if bytes.Compare(buf[:size], testBlock) != 0 {
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index dc6af0f..8413b7d 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"fmt"
 	"io/ioutil"
 	"os"
@@ -67,7 +68,7 @@ func TestGetBlock(t *testing.T) {
 
 	// Check that GetBlock returns success.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != nil {
 		t.Errorf("GetBlock error: %s", err)
 	}
@@ -88,7 +89,7 @@ func TestGetBlockMissing(t *testing.T) {
 
 	// Check that GetBlock returns failure.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != NotFoundError {
 		t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
 	}
@@ -110,7 +111,7 @@ func TestGetBlockCorrupt(t *testing.T) {
 
 	// Check that GetBlock returns failure.
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != DiskHashError {
 		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
 	}
@@ -137,7 +138,7 @@ func TestPutBlockOK(t *testing.T) {
 
 	vols := KeepVM.AllReadable()
 	buf := make([]byte, BlockSize)
-	n, err := vols[1].Get(TestHash, buf)
+	n, err := vols[1].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatalf("Volume #0 Get returned error: %v", err)
 	}
@@ -167,7 +168,7 @@ func TestPutBlockOneVol(t *testing.T) {
 	}
 
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(TestHash, buf, nil)
+	size, err := GetBlock(context.TODO(), TestHash, buf, nil)
 	if err != nil {
 		t.Fatalf("GetBlock: %v", err)
 	}
@@ -195,7 +196,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	}
 
 	// Confirm that GetBlock fails to return anything.
-	if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+	if result, err := GetBlock(context.TODO(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
 		t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
 			string(result), err)
 	}
@@ -221,7 +222,7 @@ func TestPutBlockCorrupt(t *testing.T) {
 
 	// The block on disk should now match TestBlock.
 	buf := make([]byte, BlockSize)
-	if size, err := GetBlock(TestHash, buf, nil); err != nil {
+	if size, err := GetBlock(context.TODO(), TestHash, buf, nil); err != nil {
 		t.Errorf("GetBlock: %v", err)
 	} else if bytes.Compare(buf[:size], TestBlock) != 0 {
 		t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
@@ -296,7 +297,7 @@ func TestPutBlockTouchFails(t *testing.T) {
 			oldMtime, newMtime)
 	}
 	buf := make([]byte, BlockSize)
-	n, err := vols[1].Get(TestHash, buf)
+	n, err := vols[1].Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatalf("vols[1]: %v", err)
 	}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index a0cf450..16269ae 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"encoding/base64"
 	"encoding/hex"
 	"flag"
@@ -261,18 +262,48 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
 // Get a block: copy the block data into buf, and return the number of
 // bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
-	rdr, err := v.getReader(loc)
-	if err != nil {
-		return 0, err
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+	ready := make(chan bool)
+	var rdr io.ReadCloser
+	var err error
+	go func() {
+		rdr, err = v.getReader(loc)
+		close(ready)
+	}()
+	select {
+	case <-ctx.Done():
+		// Client hung up before we could even send our S3 request
+		return 0, ctx.Err()
+	case <-ready:
+		if err != nil {
+			return 0, err
+		}
 	}
-	defer rdr.Close()
-	n, err := io.ReadFull(rdr, buf)
-	switch err {
-	case nil, io.EOF, io.ErrUnexpectedEOF:
-		return n, nil
-	default:
-		return 0, v.translateError(err)
+
+	var n int
+	ready = make(chan bool)
+	go func() {
+		defer close(ready)
+
+		defer rdr.Close()
+		n, err = io.ReadFull(rdr, buf)
+
+		switch err {
+		case nil, io.EOF, io.ErrUnexpectedEOF:
+			err = nil
+		default:
+			err = v.translateError(err)
+		}
+	}()
+	select {
+	case <-ctx.Done():
+		rdr.Close()
+		// Must wait for ReadFull to return, to ensure it
+		// doesn't write to buf after we return.
+		<-ready
+		return 0, ctx.Err()
+	case <-ready:
+		return n, err
 	}
 }
 
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 76dcbc9..db3f4c6 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"io/ioutil"
@@ -223,7 +224,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		// Check canGet
 		loc, blk := setupScenario()
 		buf := make([]byte, len(blk))
-		_, err := v.Get(loc, buf)
+		_, err := v.Get(context.TODO(), loc, buf)
 		c.Check(err == nil, check.Equals, scenario.canGet)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
@@ -233,7 +234,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 		loc, blk = setupScenario()
 		err = v.Trash(loc)
 		c.Check(err == nil, check.Equals, scenario.canTrash)
-		_, err = v.Get(loc, buf)
+		_, err = v.Get(context.TODO(), loc, buf)
 		c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
 		if err != nil {
 			c.Check(os.IsNotExist(err), check.Equals, true)
@@ -248,7 +249,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			// should be able to Get after Untrash --
 			// regardless of timestamps, errors, race
 			// conditions, etc.
-			_, err = v.Get(loc, buf)
+			_, err = v.Get(context.TODO(), loc, buf)
 			c.Check(err, check.IsNil)
 		}
 
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 5ec413d..267175d 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"container/list"
+	"context"
 	"testing"
 	"time"
 )
@@ -291,7 +292,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator1 to be un/deleted as expected
 	buf := make([]byte, BlockSize)
-	size, err := GetBlock(testData.Locator1, buf, nil)
+	size, err := GetBlock(context.TODO(), testData.Locator1, buf, nil)
 	if testData.ExpectLocator1 {
 		if size == 0 || err != nil {
 			t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -304,7 +305,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator2 to be un/deleted as expected
 	if testData.Locator1 != testData.Locator2 {
-		size, err = GetBlock(testData.Locator2, buf, nil)
+		size, err = GetBlock(context.TODO(), testData.Locator2, buf, nil)
 		if testData.ExpectLocator2 {
 			if size == 0 || err != nil {
 				t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
@@ -323,7 +324,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 		locatorFoundIn := 0
 		for _, volume := range KeepVM.AllReadable() {
 			buf := make([]byte, BlockSize)
-			if _, err := volume.Get(testData.Locator1, buf); err == nil {
+			if _, err := volume.Get(context.TODO(), testData.Locator1, buf); err == nil {
 				locatorFoundIn = locatorFoundIn + 1
 			}
 		}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 6e01e75..19a5996 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"io"
 	"sync/atomic"
 	"time"
@@ -47,7 +48,7 @@ type Volume interface {
 	// any of the data.
 	//
 	// len(buf) will not exceed BlockSize.
-	Get(loc string, buf []byte) (int, error)
+	Get(ctx context.Context, loc string, buf []byte) (int, error)
 
 	// Compare the given data with the stored data (i.e., what Get
 	// would return). If equal, return nil. If not, return
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 1738fe9..a0fd3e1 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"fmt"
 	"os"
@@ -92,7 +93,7 @@ func testGet(t TB, factory TestableVolumeFactory) {
 	v.PutRaw(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -109,7 +110,7 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
 	defer v.Teardown()
 
 	buf := make([]byte, BlockSize)
-	if _, err := v.Get(TestHash2, buf); err == nil {
+	if _, err := v.Get(context.TODO(), TestHash2, buf); err == nil {
 		t.Errorf("Expected error while getting non-existing block %v", TestHash2)
 	}
 }
@@ -211,7 +212,7 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
 
 	putErr := v.Put(testHash, testDataB)
 	buf := make([]byte, BlockSize)
-	n, getErr := v.Get(testHash, buf)
+	n, getErr := v.Get(context.TODO(), testHash, buf)
 	if putErr == nil {
 		// Put must not return a nil error unless it has
 		// overwritten the existing data.
@@ -254,7 +255,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 	}
 
 	data := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, data)
+	n, err := v.Get(context.TODO(), TestHash, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -263,7 +264,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		}
 	}
 
-	n, err = v.Get(TestHash2, data)
+	n, err = v.Get(context.TODO(), TestHash2, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -272,7 +273,7 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		}
 	}
 
-	n, err = v.Get(TestHash3, data)
+	n, err = v.Get(context.TODO(), TestHash3, data)
 	if err != nil {
 		t.Error(err)
 	} else {
@@ -443,7 +444,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 		t.Error(err)
 	}
 	data := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, data)
+	n, err := v.Get(context.TODO(), TestHash, data)
 	if err != nil {
 		t.Error(err)
 	} else if bytes.Compare(data[:n], TestBlock) != 0 {
@@ -470,7 +471,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 		t.Error(err)
 	}
 	data := make([]byte, BlockSize)
-	if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+	if _, err := v.Get(context.TODO(), TestHash, data); err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -553,7 +554,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	buf := make([]byte, BlockSize)
 
 	// Get from read-only volume should succeed
-	_, err := v.Get(TestHash, buf)
+	_, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}
@@ -563,7 +564,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	if err == nil {
 		t.Errorf("Expected error when putting block in a read-only volume")
 	}
-	_, err = v.Get(TestHash2, buf)
+	_, err = v.Get(context.TODO(), TestHash2, buf)
 	if err == nil {
 		t.Errorf("Expected error when getting block whose put in read-only volume failed")
 	}
@@ -600,7 +601,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 	sem := make(chan int)
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			t.Errorf("err1: %v", err)
 		}
@@ -612,7 +613,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash2, buf)
+		n, err := v.Get(context.TODO(), TestHash2, buf)
 		if err != nil {
 			t.Errorf("err2: %v", err)
 		}
@@ -624,7 +625,7 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 
 	go func() {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash3, buf)
+		n, err := v.Get(context.TODO(), TestHash3, buf)
 		if err != nil {
 			t.Errorf("err3: %v", err)
 		}
@@ -682,7 +683,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 
 	// Double check that we actually wrote the blocks we expected to write.
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("Get #1: %v", err)
 	}
@@ -690,7 +691,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 		t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
 	}
 
-	n, err = v.Get(TestHash2, buf)
+	n, err = v.Get(context.TODO(), TestHash2, buf)
 	if err != nil {
 		t.Errorf("Get #2: %v", err)
 	}
@@ -698,7 +699,7 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 		t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
 	}
 
-	n, err = v.Get(TestHash3, buf)
+	n, err = v.Get(context.TODO(), TestHash3, buf)
 	if err != nil {
 		t.Errorf("Get #3: %v", err)
 	}
@@ -725,7 +726,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(hash, buf)
+	n, err := v.Get(context.TODO(), hash, buf)
 	if err != nil {
 		t.Error(err)
 	}
@@ -752,7 +753,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash, buf)
+	n, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -771,7 +772,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 			t.Fatal(err)
 		}
 	} else {
-		_, err = v.Get(TestHash, buf)
+		_, err = v.Get(context.TODO(), TestHash, buf)
 		if err == nil || !os.IsNotExist(err) {
 			t.Errorf("os.IsNotExist(%v) should have been true", err)
 		}
@@ -784,7 +785,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Get the block - after trash and untrash sequence
-	n, err = v.Get(TestHash, buf)
+	n, err = v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -802,7 +803,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	checkGet := func() error {
 		buf := make([]byte, BlockSize)
-		n, err := v.Get(TestHash, buf)
+		n, err := v.Get(context.TODO(), TestHash, buf)
 		if err != nil {
 			return err
 		}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 6ab386a..917942e 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"crypto/md5"
 	"errors"
 	"fmt"
@@ -113,7 +114,7 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
 	}
 }
 
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	v.gotCall("Get")
 	<-v.Gate
 	if v.Bad {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index b5753de..02f0f9f 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bufio"
+	"context"
 	"flag"
 	"fmt"
 	"io"
@@ -210,7 +211,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 887247d..72fa819 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"fmt"
 	"io"
@@ -120,7 +121,7 @@ func TestGetNotFound(t *testing.T) {
 	v.Put(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	n, err := v.Get(TestHash2, buf)
+	n, err := v.Get(context.TODO(), TestHash2, buf)
 	switch {
 	case os.IsNotExist(err):
 		break
@@ -166,7 +167,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
 	v.PutRaw(TestHash, TestBlock)
 
 	buf := make([]byte, BlockSize)
-	_, err := v.Get(TestHash, buf)
+	_, err := v.Get(context.TODO(), TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list