[ARVADOS] updated: 069efc4726e3ba4f6c9adda9974437d23ed3d2d7

Git user git at public.curoverse.com
Wed May 4 11:09:23 EDT 2016


Summary of changes:
 build/run-build-packages-python-and-ruby.sh        |   8 +-
 build/run-build-packages.sh                        |   2 +-
 build/run-tests.sh                                 |   2 +
 doc/install/install-keepstore.html.textile.liquid  |   2 +
 sdk/cli/arvados-cli.gemspec                        |   3 +-
 sdk/cli/bin/arv                                    |   2 +-
 sdk/cli/bin/crunch-job                             |   2 +-
 sdk/cli/test/test_arv-collection-create.rb         |  18 +-
 sdk/cwl/arvados_cwl/__init__.py                    | 150 +++++++++--
 sdk/cwl/setup.py                                   |   2 +-
 sdk/cwl/tests/matcher.py                           |  23 ++
 sdk/cwl/tests/order/empty_order.json               |   1 +
 sdk/cwl/tests/order/inputs_test_order.json         |   9 +
 sdk/cwl/tests/test_job.py                          |  23 +-
 sdk/cwl/tests/test_submit.py                       | 299 +++++++++++++++------
 sdk/cwl/tests/wf/inputs_test.cwl                   |  27 ++
 sdk/go/httpserver/request_limiter.go               |  29 ++
 sdk/go/httpserver/request_limiter_test.go          | 106 ++++++++
 sdk/python/arvados/commands/keepdocker.py          |  41 +--
 sdk/python/arvados/commands/put.py                 |  28 +-
 sdk/python/arvados/keep.py                         |  46 +++-
 sdk/python/tests/arvados_testutil.py               |   3 +-
 sdk/python/tests/keepstub.py                       |  14 +
 sdk/python/tests/test_api.py                       |   2 +-
 sdk/python/tests/test_arv_put.py                   |  53 +++-
 sdk/python/tests/test_keep_client.py               | 130 +++++++++
 .../api/app/controllers/application_controller.rb  |   4 +-
 .../v1/api_client_authorizations_controller.rb     |   2 +-
 services/api/config/initializers/fix_www_decode.rb |  16 ++
 services/api/lib/eventbus.rb                       |   2 +-
 services/api/lib/load_param.rb                     |   8 +-
 services/api/lib/record_filters.rb                 |   2 +
 services/api/test/helpers/time_block.rb            |  12 +
 .../integration/collections_performance_test.rb    |  16 +-
 services/api/test/integration/websocket_test.rb    |  36 +--
 services/api/test/test_helper.rb                   |   6 +-
 services/keepstore/handler_test.go                 |  20 +-
 services/keepstore/handlers.go                     |   6 +-
 services/keepstore/keepstore.go                    |  20 +-
 services/keepstore/logging_router.go               |  56 ++--
 .../arvnodeman/computenode/driver/__init__.py      |   6 +-
 .../nodemanager/tests/test_computenode_driver.py   |  61 +++++
 tools/arvbox/lib/arvbox/docker/crunch-setup.sh     |   6 +-
 43 files changed, 1081 insertions(+), 223 deletions(-)
 create mode 100644 sdk/cwl/tests/matcher.py
 create mode 100644 sdk/cwl/tests/order/empty_order.json
 create mode 100644 sdk/cwl/tests/order/inputs_test_order.json
 create mode 100644 sdk/cwl/tests/wf/inputs_test.cwl
 create mode 100644 sdk/go/httpserver/request_limiter.go
 create mode 100644 sdk/go/httpserver/request_limiter_test.go
 create mode 100644 services/api/config/initializers/fix_www_decode.rb
 create mode 100644 services/nodemanager/tests/test_computenode_driver.py

  discards  1b5e165e9d4e803844be8c2c41eea76e92d3887e (commit)
  discards  a33399926a8a3491ebb5341615b65dd088bbc275 (commit)
  discards  13bd4f67d9a344ba9852338d1dfa80b89dcf0007 (commit)
       via  069efc4726e3ba4f6c9adda9974437d23ed3d2d7 (commit)
       via  0f241a75c642d01d74f00d8b18133959fd2370c1 (commit)
       via  c8c4236aee859423c2aa5850d1c1c34ebfff94bf (commit)
       via  fba69e99ccafb4c956e688e8fef3d2f71100ed95 (commit)
       via  bd8aa1d7990b241639de4f200223da7794f14643 (commit)
       via  497fdb2505efa9a3231c39ec696da6b749d30af2 (commit)
       via  149957a7a86cd9fae98edfdc9c797d16656b3684 (commit)
       via  721bd171547f1c1ddcce5532a96f9bb801800757 (commit)
       via  96aad215edcc0f314216c07df9246799665dd19d (commit)
       via  71cca58c580f919c01e0f5fc7b6f469cef3b03c6 (commit)
       via  e8ccb474e5dbfee3d600fdd5ac3218ccb4625eb6 (commit)
       via  12b5d95d9d4f062539c3d7d7ac8dd5421dd6b6b0 (commit)
       via  7ea58af581ad8d1af39d5cbcc18ebb69100e0a5c (commit)
       via  8b097ea832516b3f5104c62a40a6d9cf4826232a (commit)
       via  44764dd0eaf01c7a24539629c727ff31affa3b7e (commit)
       via  513c30b5c5c2b3dd72894667eb0e51e24c9d4182 (commit)
       via  25505fc5105bc776ed2d6d898f18f91d6451088b (commit)
       via  01cf080f9983313a50b902e477d7b30b03afa131 (commit)
       via  0278f7f56ed5ce972803079bdeb4ba031676a171 (commit)
       via  7fecb3e2a7fc7aa5b7230f12b3cc3878872fff68 (commit)
       via  c9c39efa9118a4d30a422c26726076bc54e6da17 (commit)
       via  a829e7e14aa6380616337e0e7dda47c4f9f7022c (commit)
       via  306747a2b8971f095a13a507b0155ea64d53c98d (commit)
       via  e62a18f3786d0f3c12865f865294a0f4d39ff548 (commit)
       via  d8c84ea91f8c4a26860e637197f234f2dd909abb (commit)
       via  5e991889ca6c18cdd901b98b5083c4cee7d260c9 (commit)
       via  e09af1d4876cb1295785db9abb5f3d2d10323097 (commit)
       via  6599824f22cdb45ab30657ac95071aa0beeee08b (commit)
       via  b6e5a10028551a1c2b1379c5c8ed039582434f25 (commit)
       via  ec9df4864de8033b4efd8b1cecfd1875fefc303e (commit)
       via  f22692a8fdf6610045db0c34c4827a0ebcb0ae0a (commit)
       via  4c360c5a3a9564f584dac973810059d2d45d08ef (commit)
       via  fd57cb09ccf75c49653f8fb66fe6f4c1da49b687 (commit)
       via  3ede45e668eaad4cb23cc11f11a1a7ea6e688c88 (commit)
       via  16ec502827d038e4afe61faae53c64b17e0a0767 (commit)
       via  1535c8d141b07f246a501d288a0d3a903ce41a56 (commit)
       via  0dc0c5650ddcd8376aea84d32d2b81b1cdba0946 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (1b5e165e9d4e803844be8c2c41eea76e92d3887e)
            \
             N -- N -- N (069efc4726e3ba4f6c9adda9974437d23ed3d2d7)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 069efc4726e3ba4f6c9adda9974437d23ed3d2d7
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed May 4 10:16:32 2016 -0400

    9068: Fix inconsistent receiver names.

diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 8f547a4..0f556b5 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -19,8 +19,9 @@ type LoggingResponseWriter struct {
 	sentHdr      time.Time
 }
 
-func (w *LoggingResponseWriter) CloseNotify() <-chan bool {
-	wrapped, ok := w.ResponseWriter.(http.CloseNotifier)
+// CloseNotify implements http.CloseNotifier.
+func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
+	wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
 	if !ok {
 		// If upstream doesn't implement CloseNotifier, we can
 		// satisfy the interface by returning a channel that
@@ -33,25 +34,25 @@ func (w *LoggingResponseWriter) CloseNotify() <-chan bool {
 }
 
 // WriteHeader writes header to ResponseWriter
-func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
-	if loggingWriter.sentHdr == zeroTime {
-		loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) WriteHeader(code int) {
+	if resp.sentHdr == zeroTime {
+		resp.sentHdr = time.Now()
 	}
-	loggingWriter.Status = code
-	loggingWriter.ResponseWriter.WriteHeader(code)
+	resp.Status = code
+	resp.ResponseWriter.WriteHeader(code)
 }
 
 var zeroTime time.Time
 
-func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
-	if loggingWriter.Length == 0 && len(data) > 0 && loggingWriter.sentHdr == zeroTime {
-		loggingWriter.sentHdr = time.Now()
+func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
+	if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
+		resp.sentHdr = time.Now()
 	}
-	loggingWriter.Length += len(data)
-	if loggingWriter.Status >= 400 {
-		loggingWriter.ResponseBody += string(data)
+	resp.Length += len(data)
+	if resp.Status >= 400 {
+		resp.ResponseBody += string(data)
 	}
-	return loggingWriter.ResponseWriter.Write(data)
+	return resp.ResponseWriter.Write(data)
 }
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
@@ -59,18 +60,18 @@ type LoggingRESTRouter struct {
 	router http.Handler
 }
 
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
 	t0 := time.Now()
-	loggingWriter := LoggingResponseWriter{http.StatusOK, 0, resp, "", zeroTime}
-	loggingRouter.router.ServeHTTP(&loggingWriter, req)
-	statusText := http.StatusText(loggingWriter.Status)
-	if loggingWriter.Status >= 400 {
-		statusText = strings.Replace(loggingWriter.ResponseBody, "\n", "", -1)
+	resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
+	loggingRouter.router.ServeHTTP(&resp, req)
+	statusText := http.StatusText(resp.Status)
+	if resp.Status >= 400 {
+		statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
 	}
 	now := time.Now()
 	tTotal := now.Sub(t0)
-	tLatency := loggingWriter.sentHdr.Sub(t0)
-	tResponse := now.Sub(loggingWriter.sentHdr)
-	log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), loggingWriter.Status, loggingWriter.Length, statusText)
+	tLatency := resp.sentHdr.Sub(t0)
+	tResponse := now.Sub(resp.sentHdr)
+	log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText)
 
 }

commit 0f241a75c642d01d74f00d8b18133959fd2370c1
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Apr 29 12:57:09 2016 -0400

    9068: Do not use coverage tools when using non-default test flags ({gostuff}_test=...)

diff --git a/build/run-tests.sh b/build/run-tests.sh
index c94f831..d656d91 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -494,22 +494,23 @@ do_test_once() {
             # before trying "go test". Otherwise, coverage-reporting
             # mode makes Go show the wrong line numbers when reporting
             # compilation errors.
+            go get -t "git.curoverse.com/arvados.git/$1" || return 1
             if [[ -n "${testargs[$1]}" ]]
             then
                 # "go test -check.vv giturl" doesn't work, but this
                 # does:
-                cd "$WORKSPACE/$1" && \
-                    go get -t "git.curoverse.com/arvados.git/$1" && \
-                    go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+                cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
             else
                 # The above form gets verbose even when testargs is
                 # empty, so use this form in such cases:
-                go get -t "git.curoverse.com/arvados.git/$1" && \
-                    go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
+                go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
             fi
             result="$?"
-            go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
-            rm "$WORKSPACE/tmp/.$covername.tmp"
+            if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
+            then
+                go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
+                rm "$WORKSPACE/tmp/.$covername.tmp"
+            fi
         elif [[ "$2" == "pip" ]]
         then
             # $3 can name a path directory for us to use, including trailing

commit c8c4236aee859423c2aa5850d1c1c34ebfff94bf
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Apr 29 12:55:24 2016 -0400

    9068: Move buffer allocation from volumes to GetBlockHandler.
    
    This makes the Volume interface more idiomatic: Get() accepts a buffer
    to read into, and returns a number of bytes read, much like the Read()
    method of an io.Reader.
    
    It also makes it possible for GetBlockHandler to notice, while waiting
    for a buffer, that the client has disconnected: In this case, it
    releases the network socket and never asks any volumes to do any work.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index f08cebf..a6b98bd 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error {
 // If the block is younger than azureWriteRaceInterval and is
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
 	var deadline time.Time
 	haveDeadline := false
-	buf, err := v.get(loc)
-	for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+	size, err := v.get(loc, buf)
+	for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
 		// Seeing a brand new empty block probably means we're
 		// in a race with CreateBlob, which under the hood
 		// (apparently) does "CreateEmpty" and "CommitData"
@@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
 		} else if time.Now().After(deadline) {
 			break
 		}
-		bufs.Put(buf)
 		time.Sleep(azureWriteRacePollTime)
-		buf, err = v.get(loc)
+		size, err = v.get(loc, buf)
 	}
 	if haveDeadline {
-		log.Printf("Race ended with len(buf)==%d", len(buf))
+		log.Printf("Race ended with size==%d", size)
 	}
-	return buf, err
+	return size, err
 }
 
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
-	expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+	expectSize := len(buf)
 	if azureMaxGetBytes < BlockSize {
 		// Unfortunately the handler doesn't tell us how long the blob
 		// is expected to be, so we have to ask Azure.
 		props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
 		if err != nil {
-			return nil, v.translateError(err)
+			return 0, v.translateError(err)
 		}
 		if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
-			return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+			return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
 		}
 		expectSize = int(props.ContentLength)
 	}
 
-	buf := bufs.Get(expectSize)
 	if expectSize == 0 {
-		return buf, nil
+		return 0, nil
 	}
 
 	// We'll update this actualSize if/when we get the last piece.
@@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
 	wg.Wait()
 	for _, err := range errors {
 		if err != nil {
-			bufs.Put(buf)
-			return nil, v.translateError(err)
+			return 0, v.translateError(err)
 		}
 	}
-	return buf[:actualSize], nil
+	return actualSize, nil
 }
 
 // Compare the given data with existing stored data.
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 439b402..e3c0e27 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
 		if err != nil {
 			t.Error(err)
 		}
-		gotData, err := v.Get(hash)
+		gotData := make([]byte, len(data))
+		gotLen, err := v.Get(hash, gotData)
 		if err != nil {
 			t.Error(err)
 		}
 		gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
-		gotLen := len(gotData)
-		bufs.Put(gotData)
 		if gotLen != size {
 			t.Error("length mismatch: got %d != %d", gotLen, size)
 		}
@@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
 	// Wait for the stub's Put to create the empty blob
 	v.azHandler.race <- continuePut
 	go func() {
-		buf, err := v.Get(TestHash)
+		buf := make([]byte, len(TestBlock))
+		_, err := v.Get(TestHash, buf)
 		if err != nil {
 			t.Error(err)
-		} else {
-			bufs.Put(buf)
 		}
 		close(allDone)
 	}()
@@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 	allDone := make(chan struct{})
 	go func() {
 		defer close(allDone)
-		buf, err := v.Get(TestHash)
+		buf := make([]byte, BlockSize)
+		n, err := v.Get(TestHash, buf)
 		if err != nil {
 			t.Error(err)
 			return
 		}
-		if len(buf) != 0 {
-			t.Errorf("Got %+q, expected empty buf", buf)
+		if n != 0 {
+			t.Errorf("Got %+q, expected empty buf", buf[:n])
 		}
-		bufs.Put(buf)
 	}()
 	select {
 	case <-allDone:
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 33d585a..7c17424 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
 			expectedDc, responseDc)
 	}
 	// Confirm the block has been deleted
-	_, err := vols[0].Get(TestHash)
+	buf := make([]byte, BlockSize)
+	_, err := vols[0].Get(TestHash, buf)
 	var blockDeleted = os.IsNotExist(err)
 	if !blockDeleted {
 		t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
 			expectedDc, responseDc)
 	}
 	// Confirm the block has NOT been deleted.
-	_, err = vols[0].Get(TestHash)
+	_, err = vols[0].Get(TestHash, buf)
 	if err != nil {
 		t.Errorf("testing delete on new block: %s\n", err)
 	}
@@ -913,6 +914,65 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
 	}
 }
 
+type notifyingResponseRecorder struct {
+	*httptest.ResponseRecorder
+	closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+	return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+	defer func(was bool) {
+		enforcePermissions = was
+	}(enforcePermissions)
+	enforcePermissions = false
+
+	defer func(orig *bufferPool) {
+		bufs = orig
+	}(bufs)
+	bufs = newBufferPool(1, BlockSize)
+	defer bufs.Put(bufs.Get(BlockSize))
+
+	KeepVM = MakeTestVolumeManager(2)
+	defer KeepVM.Close()
+
+	if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+		t.Error(err)
+	}
+
+	resp := &notifyingResponseRecorder{
+		ResponseRecorder: httptest.NewRecorder(),
+		closer:           make(chan bool, 1),
+	}
+	if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+		t.Fatal("notifyingResponseRecorder is broken")
+	}
+	// If anyone asks, the client has disconnected.
+	resp.closer <- true
+
+	ok := make(chan struct{})
+	go func() {
+		req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+		(&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+		ok <- struct{}{}
+	}()
+
+	select {
+	case <-time.After(20 * time.Second):
+		t.Fatal("request took >20s, close notifier must be broken")
+	case <-ok:
+	}
+
+	ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+	for i, v := range KeepVM.AllWritable() {
+		if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+			t.Errorf("volume %d got %d calls, expected 0", i, calls)
+		}
+	}
+}
+
 // Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestGetHandlerNoBufferleak(t *testing.T) {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a188c47..f698982 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -79,26 +79,39 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		}
 	}
 
-	block, err := GetBlock(mux.Vars(req)["hash"])
+	// TODO: Probe volumes to check whether the block _might_
+	// exist. Some volumes/types could support a quick existence
+	// check without causing other operations to suffer. If all
+	// volumes support that, and assure us the block definitely
+	// isn't here, we can return 404 now instead of waiting for a
+	// buffer.
+
+	buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
 	if err != nil {
-		// This type assertion is safe because the only errors
-		// GetBlock can return are DiskHashError or NotFoundError.
-		http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return
 	}
-	defer bufs.Put(block)
+	defer bufs.Put(buf)
 
-	resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+	size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+	if err != nil {
+		code := http.StatusInternalServerError
+		if err, ok := err.(*KeepError); ok {
+			code = err.HTTPCode
+		}
+		http.Error(resp, err.Error(), code)
+		return
+	}
+
+	resp.Header().Set("Content-Length", strconv.Itoa(size))
 	resp.Header().Set("Content-Type", "application/octet-stream")
-	resp.Write(block)
+	resp.Write(buf[:size])
 }
 
-var errClientDisconnected = fmt.Errorf("client disconnected")
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if resp implements http.CloseNotifier and tells us that the
 // client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) {
+func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
 	var closeNotifier <-chan bool
 	if resp, ok := resp.(http.CloseNotifier); ok {
 		closeNotifier = resp.CloseNotify()
@@ -119,7 +132,7 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte,
 			// return it to the pool.
 			bufs.Put(<-bufReady)
 		}()
-		return nil, errClientDisconnected
+		return nil, ErrClientDisconnect
 	}
 }
 
@@ -146,7 +159,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	buf, err := getBufferForResponseWriter(resp, int(req.ContentLength))
+	buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
 		return
@@ -516,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 }
 
-// ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
 // Once the handler has determined that system policy permits the
@@ -527,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // should be the only part of the code that cares about which volume a
 // block is stored on, so it should be responsible for figuring out
 // which volume to check for fetching blocks, storing blocks, etc.
-// ==============================
 
-// GetBlock fetches and returns the block identified by "hash".
-//
-// On success, GetBlock returns a byte slice with the block data, and
-// a nil error.
+// GetBlock fetches the block identified by "hash" into the provided
+// buf, and returns the data size.
 //
 // If the block cannot be found on any volume, returns NotFoundError.
 //
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(hash string) ([]byte, error) {
+func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
 	// Attempt to read the requested hash from a keep volume.
 	errorToCaller := NotFoundError
 
 	for _, vol := range KeepVM.AllReadable() {
-		buf, err := vol.Get(hash)
+		size, err := vol.Get(hash, buf)
 		if err != nil {
 			// IsNotExist is an expected error and may be
 			// ignored. All other errors are logged. In
@@ -558,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) {
 		}
 		// Check the file checksum.
 		//
-		filehash := fmt.Sprintf("%x", md5.Sum(buf))
+		filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
 		if filehash != hash {
 			// TODO: Try harder to tell a sysadmin about
 			// this.
 			log.Printf("%s: checksum mismatch for request %s (actual %s)",
 				vol, hash, filehash)
 			errorToCaller = DiskHashError
-			bufs.Put(buf)
 			continue
 		}
 		if errorToCaller == DiskHashError {
 			log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
 				vol, hash)
 		}
-		return buf, nil
+		return size, nil
 	}
-	return nil, errorToCaller
+	return 0, errorToCaller
 }
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go
index c5349d3..dda7edc 100644
--- a/services/keepstore/handlers_with_generic_volume_test.go
+++ b/services/keepstore/handlers_with_generic_volume_test.go
@@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 	testableVolumes[1].PutRaw(testHash, testBlock)
 
 	// Get should pass
-	buf, err := GetBlock(testHash)
+	buf := make([]byte, len(testBlock))
+	n, err := GetBlock(testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error while getting block %s", err)
 	}
-	if bytes.Compare(buf, testBlock) != 0 {
-		t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock)
+	if bytes.Compare(buf[:n], testBlock) != 0 {
+		t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock)
 	}
 }
 
@@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
 	testableVolumes[1].PutRaw(testHash, badData)
 
 	// Get should fail
-	_, err := GetBlock(testHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(testHash, buf, nil)
 	if err == nil {
-		t.Fatalf("Expected error while getting corrupt block %v", testHash)
+		t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
 	}
 }
 
@@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
 	}
 
 	// Check that PutBlock stored the data as expected
-	buf, err := GetBlock(testHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-	} else if bytes.Compare(buf, testBlock) != 0 {
-		t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+	} else if bytes.Compare(buf[:size], testBlock) != 0 {
+		t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
 	}
 }
 
@@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
 
 	// Put succeeded and overwrote the badData in one volume,
 	// and Get should return the testBlock now, ignoring the bad data.
-	buf, err := GetBlock(testHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(testHash, buf, nil)
 	if err != nil {
 		t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
-	} else if bytes.Compare(buf, testBlock) != 0 {
-		t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+	} else if bytes.Compare(buf[:size], testBlock) != 0 {
+		t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
 	}
 }
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 93ee43c..80d8670 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -91,6 +91,7 @@ var (
 	TooLongError        = &KeepError{413, "Block is too large"}
 	MethodDisabledError = &KeepError{405, "Method disabled"}
 	ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+	ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 2a1c3d2..c0adbc0 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) {
 	}
 
 	// Check that GetBlock returns success.
-	result, err := GetBlock(TestHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(TestHash, buf, nil)
 	if err != nil {
 		t.Errorf("GetBlock error: %s", err)
 	}
-	if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
-		t.Errorf("expected %s, got %s", TestBlock, result)
+	if bytes.Compare(buf[:size], TestBlock) != 0 {
+		t.Errorf("got %v, expected %v", buf[:size], TestBlock)
 	}
 }
 
@@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) {
 	defer KeepVM.Close()
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TestHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(TestHash, buf, nil)
 	if err != NotFoundError {
-		t.Errorf("Expected NotFoundError, got %v", result)
+		t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
 	}
 }
 
@@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) {
 	vols[0].Put(TestHash, BadBlock)
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TestHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(TestHash, buf, nil)
 	if err != DiskHashError {
-		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
 	}
 }
 
@@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) {
 	}
 
 	vols := KeepVM.AllReadable()
-	result, err := vols[1].Get(TestHash)
+	buf := make([]byte, BlockSize)
+	n, err := vols[1].Get(TestHash, buf)
 	if err != nil {
 		t.Fatalf("Volume #0 Get returned error: %v", err)
 	}
-	if string(result) != string(TestBlock) {
+	if string(buf[:n]) != string(TestBlock) {
 		t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
-			string(TestBlock), string(result))
+			string(TestBlock), string(buf[:n]))
 	}
 }
 
@@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) {
 		t.Fatalf("PutBlock: n %d err %v", n, err)
 	}
 
-	result, err := GetBlock(TestHash)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(TestHash, buf, nil)
 	if err != nil {
 		t.Fatalf("GetBlock: %v", err)
 	}
-	if string(result) != string(TestBlock) {
-		t.Error("PutBlock/GetBlock mismatch")
-		t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
-			string(TestBlock), string(result))
+	if bytes.Compare(buf[:size], TestBlock) != 0 {
+		t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
+			TestBlock, buf[:size])
 	}
 }
 
@@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	}
 
 	// Confirm that GetBlock fails to return anything.
-	if result, err := GetBlock(TestHash); err != NotFoundError {
+	if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
 		t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
 			string(result), err)
 	}
@@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) {
 	}
 
 	// The block on disk should now match TestBlock.
-	if block, err := GetBlock(TestHash); err != nil {
+	buf := make([]byte, BlockSize)
+	if size, err := GetBlock(TestHash, buf, nil); err != nil {
 		t.Errorf("GetBlock: %v", err)
-	} else if bytes.Compare(block, TestBlock) != 0 {
-		t.Errorf("GetBlock returned: '%s'", string(block))
+	} else if bytes.Compare(buf[:size], TestBlock) != 0 {
+		t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
 	}
 }
 
@@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) {
 		t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
 			oldMtime, newMtime)
 	}
-	result, err := vols[1].Get(TestHash)
+	buf := make([]byte, BlockSize)
+	n, err := vols[1].Get(TestHash, buf)
 	if err != nil {
 		t.Fatalf("vols[1]: %v", err)
 	}
-	if bytes.Compare(result, TestBlock) != 0 {
-		t.Errorf("new block does not match test block\nnew block = %v\n", result)
+	if bytes.Compare(buf[:n], TestBlock) != 0 {
+		t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
 	}
 }
 
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index a93b72c..8f547a4 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -19,6 +19,19 @@ type LoggingResponseWriter struct {
 	sentHdr      time.Time
 }
 
+func (w *LoggingResponseWriter) CloseNotify() <-chan bool {
+	wrapped, ok := w.ResponseWriter.(http.CloseNotifier)
+	if !ok {
+		// If upstream doesn't implement CloseNotifier, we can
+		// satisfy the interface by returning a channel that
+		// never sends anything (the interface doesn't
+		// guarantee that anything will ever be sent on the
+		// channel even if the client disconnects).
+		return nil
+	}
+	return wrapped.CloseNotify()
+}
+
 // WriteHeader writes header to ResponseWriter
 func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
 	if loggingWriter.sentHdr == zeroTime {
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
new file mode 100644
index 0000000..aa88556
--- /dev/null
+++ b/services/keepstore/logging_router_test.go
@@ -0,0 +1,10 @@
+package main
+
+import (
+	"net/http"
+	"testing"
+)
+
+func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
+	http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
+}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 79a680d..d068b2a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -153,20 +153,18 @@ func (v *S3Volume) Check() error {
 	return nil
 }
 
-func (v *S3Volume) Get(loc string) ([]byte, error) {
+func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
 	rdr, err := v.Bucket.GetReader(loc)
 	if err != nil {
-		return nil, v.translateError(err)
+		return 0, v.translateError(err)
 	}
 	defer rdr.Close()
-	buf := bufs.Get(BlockSize)
 	n, err := io.ReadFull(rdr, buf)
 	switch err {
 	case nil, io.EOF, io.ErrUnexpectedEOF:
-		return buf[:n], nil
+		return n, nil
 	default:
-		bufs.Put(buf)
-		return nil, v.translateError(err)
+		return 0, v.translateError(err)
 	}
 }
 
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index ac94061..d111cae 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
 	// Verify Locator1 to be un/deleted as expected
-	data, _ := GetBlock(testData.Locator1)
+	buf := make([]byte, BlockSize)
+	size, err := GetBlock(testData.Locator1, buf, nil)
 	if testData.ExpectLocator1 {
-		if len(data) == 0 {
+		if size == 0 || err != nil {
 			t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
 		}
 	} else {
-		if len(data) > 0 {
+		if size > 0 || err == nil {
 			t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
 		}
 	}
 
 	// Verify Locator2 to be un/deleted as expected
 	if testData.Locator1 != testData.Locator2 {
-		data, _ = GetBlock(testData.Locator2)
+		size, err = GetBlock(testData.Locator2, buf, nil)
 		if testData.ExpectLocator2 {
-			if len(data) == 0 {
+			if size == 0 || err != nil {
 				t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
 			}
 		} else {
-			if len(data) > 0 {
+			if size > 0 || err == nil {
 				t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
 			}
 		}
@@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	if testData.DifferentMtimes {
 		locatorFoundIn := 0
 		for _, volume := range KeepVM.AllReadable() {
-			if _, err := volume.Get(testData.Locator1); err == nil {
+			buf := make([]byte, BlockSize)
+			if _, err := volume.Get(testData.Locator1, buf); err == nil {
 				locatorFoundIn = locatorFoundIn + 1
 			}
 		}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 17da54f..8c7e9a4 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -10,17 +10,14 @@ import (
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
-	// Get a block. IFF the returned error is nil, the caller must
-	// put the returned slice back into the buffer pool when it's
-	// finished with it. (Otherwise, the buffer pool will be
-	// depleted and eventually -- when all available buffers are
-	// used and not returned -- operations will reach deadlock.)
+	// Get a block: copy the block data into buf, and return the
+	// number of bytes copied.
 	//
 	// loc is guaranteed to consist of 32 or more lowercase hex
 	// digits.
 	//
-	// Get should not verify the integrity of the returned data:
-	// it should just return whatever was found in its backing
+	// Get should not verify the integrity of the data: it should
+	// just return whatever was found in its backing
 	// store. (Integrity checking is the caller's responsibility.)
 	//
 	// If an error is encountered that prevents it from
@@ -36,10 +33,10 @@ type Volume interface {
 	// access log if the block is not found on any other volumes
 	// either).
 	//
-	// If the data in the backing store is bigger than BlockSize,
-	// Get is permitted to return an error without reading any of
-	// the data.
-	Get(loc string) ([]byte, error)
+	// If the data in the backing store is bigger than len(buf),
+	// which will not exceed BlockSize, then Get is permitted to
+	// return an error without reading any of the data.
+	Get(loc string, buf []byte) (int, error)
 
 	// Compare the given data with the stored data (i.e., what Get
 	// would return). If equal, return nil. If not, return
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 95166c2..105795c 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) {
 
 	v.PutRaw(TestHash, TestBlock)
 
-	buf, err := v.Get(TestHash)
+	buf := make([]byte, BlockSize)
+	n, err := v.Get(TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	bufs.Put(buf)
-
-	if bytes.Compare(buf, TestBlock) != 0 {
+	if bytes.Compare(buf[:n], TestBlock) != 0 {
 		t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
 	}
 }
@@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 
-	if _, err := v.Get(TestHash2); err == nil {
+	buf := make([]byte, BlockSize)
+	if _, err := v.Get(TestHash2, buf); err == nil {
 		t.Errorf("Expected error while getting non-existing block %v", TestHash2)
 	}
 }
@@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
 	v.PutRaw(testHash, testDataA)
 
 	putErr := v.Put(testHash, testDataB)
-	buf, getErr := v.Get(testHash)
+	buf := make([]byte, BlockSize)
+	n, getErr := v.Get(testHash, buf)
 	if putErr == nil {
 		// Put must not return a nil error unless it has
 		// overwritten the existing data.
-		if bytes.Compare(buf, testDataB) != 0 {
-			t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
+		if bytes.Compare(buf[:n], testDataB) != 0 {
+			t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
 		}
 	} else {
 		// It is permissible for Put to fail, but it must
 		// leave us with either the original data, the new
 		// data, or nothing at all.
-		if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
-			t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
+		if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
+			t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
 		}
 	}
-	if getErr == nil {
-		bufs.Put(buf)
-	}
 }
 
 // Put and get multiple blocks
@@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
 		t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
 	}
 
-	data, err := v.Get(TestHash)
+	data := make([]byte, BlockSize)
+	n, err := v.Get(TestHash, data)
 	if err != nil {
 		t.Error(err)
 	} else {
-		if bytes.Compare(data, TestBlock) != 0 {
-			t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+		if bytes.Compare(data[:n], TestBlock) != 0 {
+			t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
 		}
-		bufs.Put(data)
 	}
 
-	data, err = v.Get(TestHash2)
+	n, err = v.Get(TestHash2, data)
 	if err != nil {
 		t.Error(err)
 	} else {
-		if bytes.Compare(data, TestBlock2) != 0 {
-			t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+		if bytes.Compare(data[:n], TestBlock2) != 0 {
+			t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
 		}
-		bufs.Put(data)
 	}
 
-	data, err = v.Get(TestHash3)
+	n, err = v.Get(TestHash3, data)
 	if err != nil {
 		t.Error(err)
 	} else {
-		if bytes.Compare(data, TestBlock3) != 0 {
-			t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+		if bytes.Compare(data[:n], TestBlock3) != 0 {
+			t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
 		}
-		bufs.Put(data)
 	}
 }
 
@@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
 	}
-	data, err := v.Get(TestHash)
+	data := make([]byte, BlockSize)
+	n, err := v.Get(TestHash, data)
 	if err != nil {
 		t.Error(err)
-	} else {
-		if bytes.Compare(data, TestBlock) != 0 {
-			t.Errorf("Got data %+q, expected %+q", data, TestBlock)
-		}
-		bufs.Put(data)
+	} else if bytes.Compare(data[:n], TestBlock) != 0 {
+		t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
 	}
 }
 
@@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
 	}
-	if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+	data := make([]byte, BlockSize)
+	if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
 }
@@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	}
 
 	v.PutRaw(TestHash, TestBlock)
+	buf := make([]byte, BlockSize)
 
 	// Get from read-only volume should succeed
-	_, err := v.Get(TestHash)
+	_, err := v.Get(TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}
@@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
 	if err == nil {
 		t.Errorf("Expected error when putting block in a read-only volume")
 	}
-	_, err = v.Get(TestHash2)
+	_, err = v.Get(TestHash2, buf)
 	if err == nil {
 		t.Errorf("Expected error when getting block whose put in read-only volume failed")
 	}
@@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
 	v.PutRaw(TestHash3, TestBlock3)
 
 	sem := make(chan int)
-	go func(sem chan int) {
-		buf, err := v.Get(TestHash)
+	go func() {
+		buf := make([]byte, BlockSize)
+		n, err := v.Get(TestHash, buf)
 		if err != nil {
 			t.Errorf("err1: %v", err)
 		}
-		bufs.Put(buf)
-		if bytes.Compare(buf, TestBlock) != 0 {
-			t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+		if bytes.Compare(buf[:n], TestBlock) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
 		}
 		sem <- 1
-	}(sem)
+	}()
 
-	go func(sem chan int) {
-		buf, err := v.Get(TestHash2)
+	go func() {
+		buf := make([]byte, BlockSize)
+		n, err := v.Get(TestHash2, buf)
 		if err != nil {
 			t.Errorf("err2: %v", err)
 		}
-		bufs.Put(buf)
-		if bytes.Compare(buf, TestBlock2) != 0 {
-			t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+		if bytes.Compare(buf[:n], TestBlock2) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
 		}
 		sem <- 1
-	}(sem)
+	}()
 
-	go func(sem chan int) {
-		buf, err := v.Get(TestHash3)
+	go func() {
+		buf := make([]byte, BlockSize)
+		n, err := v.Get(TestHash3, buf)
 		if err != nil {
 			t.Errorf("err3: %v", err)
 		}
-		bufs.Put(buf)
-		if bytes.Compare(buf, TestBlock3) != 0 {
-			t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+		if bytes.Compare(buf[:n], TestBlock3) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
 		}
 		sem <- 1
-	}(sem)
+	}()
 
 	// Wait for all goroutines to finish
-	for done := 0; done < 3; {
-		done += <-sem
+	for done := 0; done < 3; done++ {
+		<-sem
 	}
 }
 
@@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
 	}(sem)
 
 	// Wait for all goroutines to finish
-	for done := 0; done < 3; {
-		done += <-sem
+	for done := 0; done < 3; done++ {
+		<-sem
 	}
 
 	// Double check that we actually wrote the blocks we expected to write.
-	buf, err := v.Get(TestHash)
+	buf := make([]byte, BlockSize)
+	n, err := v.Get(TestHash, buf)
 	if err != nil {
 		t.Errorf("Get #1: %v", err)
 	}
-	bufs.Put(buf)
-	if bytes.Compare(buf, TestBlock) != 0 {
-		t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+	if bytes.Compare(buf[:n], TestBlock) != 0 {
+		t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
 	}
 
-	buf, err = v.Get(TestHash2)
+	n, err = v.Get(TestHash2, buf)
 	if err != nil {
 		t.Errorf("Get #2: %v", err)
 	}
-	bufs.Put(buf)
-	if bytes.Compare(buf, TestBlock2) != 0 {
-		t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+	if bytes.Compare(buf[:n], TestBlock2) != 0 {
+		t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
 	}
 
-	buf, err = v.Get(TestHash3)
+	n, err = v.Get(TestHash3, buf)
 	if err != nil {
 		t.Errorf("Get #3: %v", err)
 	}
-	bufs.Put(buf)
-	if bytes.Compare(buf, TestBlock3) != 0 {
-		t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+	if bytes.Compare(buf[:n], TestBlock3) != 0 {
+		t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
 	}
 }
 
@@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	rdata, err := v.Get(hash)
+	buf := make([]byte, BlockSize)
+	n, err := v.Get(hash, buf)
 	if err != nil {
 		t.Error(err)
-	} else {
-		defer bufs.Put(rdata)
 	}
-	if bytes.Compare(rdata, wdata) != 0 {
-		t.Error("rdata != wdata")
+	if bytes.Compare(buf[:n], wdata) != 0 {
+		t.Error("buf %+q != wdata %+q", buf[:n], wdata)
 	}
 }
 
@@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v.PutRaw(TestHash, TestBlock)
 	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-	buf, err := v.Get(TestHash)
+	buf := make([]byte, BlockSize)
+	n, err := v.Get(TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if bytes.Compare(buf, TestBlock) != 0 {
-		t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+	if bytes.Compare(buf[:n], TestBlock) != 0 {
+		t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
 	}
-	bufs.Put(buf)
 
 	// Trash
 	err = v.Trash(TestHash)
@@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 			t.Error(err)
 		}
 	} else {
-		_, err = v.Get(TestHash)
+		_, err = v.Get(TestHash, buf)
 		if err == nil || !os.IsNotExist(err) {
 			t.Errorf("os.IsNotExist(%v) should have been true", err)
 		}
@@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}
 
 	// Get the block - after trash and untrash sequence
-	buf, err = v.Get(TestHash)
+	n, err = v.Get(TestHash, buf)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if bytes.Compare(buf, TestBlock) != 0 {
-		t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+	if bytes.Compare(buf[:n], TestBlock) != 0 {
+		t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
 	}
-	bufs.Put(buf)
 }
 
 func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
@@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}(trashLifetime)
 
 	checkGet := func() error {
-		buf, err := v.Get(TestHash)
+		buf := make([]byte, BlockSize)
+		n, err := v.Get(TestHash, buf)
 		if err != nil {
 			return err
 		}
-		if bytes.Compare(buf, TestBlock) != 0 {
-			t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+		if bytes.Compare(buf[:n], TestBlock) != 0 {
+			t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
 		}
-		bufs.Put(buf)
 		return nil
 	}
 
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index e8a5a33..5671b8d 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
 	}
 }
 
-func (v *MockVolume) Get(loc string) ([]byte, error) {
+func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
 	v.gotCall("Get")
 	<-v.Gate
 	if v.Bad {
-		return nil, errors.New("Bad volume")
+		return 0, errors.New("Bad volume")
 	} else if block, ok := v.Store[loc]; ok {
-		buf := bufs.Get(len(block))
-		copy(buf, block)
-		return buf, nil
+		copy(buf[:len(block)], block)
+		return len(block), nil
 	}
-	return nil, os.ErrNotExist
+	return 0, os.ErrNotExist
 }
 
 func (v *MockVolume) Put(loc string, block []byte) error {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 996068c..edec048 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 	return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
-		return nil, v.translateError(err)
+		return 0, v.translateError(err)
+	}
+	if stat.Size() > int64(len(buf)) {
+		return 0, TooLongError
 	}
-	buf := bufs.Get(int(stat.Size()))
+	var read int
+	size := int(stat.Size())
 	err = v.getFunc(path, func(rdr io.Reader) error {
-		_, err = io.ReadFull(rdr, buf)
+		read, err = io.ReadFull(rdr, buf[:size])
 		return err
 	})
-	if err != nil {
-		bufs.Put(buf)
-		return nil, err
-	}
-	return buf, nil
+	return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 0775e89..c95538b 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) {
 	defer v.Teardown()
 	v.Put(TestHash, TestBlock)
 
-	buf, err := v.Get(TestHash2)
+	buf := make([]byte, BlockSize)
+	n, err := v.Get(TestHash2, buf)
 	switch {
 	case os.IsNotExist(err):
 		break
 	case err == nil:
-		t.Errorf("Read should have failed, returned %s", string(buf))
+		t.Errorf("Read should have failed, returned %+q", buf[:n])
 	default:
 		t.Errorf("Read expected ErrNotExist, got: %s", err)
 	}
@@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) {
 
 	v.PutRaw(TestHash, TestBlock)
 
-	_, err := v.Get(TestHash)
+	buf := make([]byte, BlockSize)
+	_, err := v.Get(TestHash, buf)
 	if err != nil {
 		t.Errorf("got err %v, expected nil", err)
 	}

commit fba69e99ccafb4c956e688e8fef3d2f71100ed95
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Apr 29 10:02:39 2016 -0400

    9068: Drop PUT requests if the client disconnects before we get a buffer.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 043ab69..a188c47 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -93,6 +93,36 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	resp.Write(block)
 }
 
+var errClientDisconnected = fmt.Errorf("client disconnected")
+
+// Get a buffer from the pool -- but give up and return a non-nil
+// error if resp implements http.CloseNotifier and tells us that the
+// client has disconnected before we get a buffer.
+func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) {
+	var closeNotifier <-chan bool
+	if resp, ok := resp.(http.CloseNotifier); ok {
+		closeNotifier = resp.CloseNotify()
+	}
+	var buf []byte
+	bufReady := make(chan []byte)
+	go func() {
+		bufReady <- bufs.Get(bufSize)
+		close(bufReady)
+	}()
+	select {
+	case buf = <-bufReady:
+		return buf, nil
+	case <-closeNotifier:
+		go func() {
+			// Even if closeNotifier happened first, we
+			// need to keep waiting for our buf so we can
+			// return it to the pool.
+			bufs.Put(<-bufReady)
+		}()
+		return nil, errClientDisconnected
+	}
+}
+
 // PutBlockHandler is a HandleFunc to address Put block requests.
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	hash := mux.Vars(req)["hash"]
@@ -116,8 +146,13 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	buf := bufs.Get(int(req.ContentLength))
-	_, err := io.ReadFull(req.Body, buf)
+	buf, err := getBufferForResponseWriter(resp, int(req.ContentLength))
+	if err != nil {
+		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
+		return
+	}
+
+	_, err = io.ReadFull(req.Body, buf)
 	if err != nil {
 		http.Error(resp, err.Error(), 500)
 		bufs.Put(buf)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list