[ARVADOS] updated: 1b5e165e9d4e803844be8c2c41eea76e92d3887e
Git user
git at public.curoverse.com
Fri Apr 29 12:57:15 EDT 2016
Summary of changes:
services/keepstore/handler_test.go | 47 +++++++++++++++++++++++++++++++
services/keepstore/logging_router.go | 13 +++++++++
services/keepstore/logging_router_test.go | 10 +++++++
3 files changed, 70 insertions(+)
create mode 100644 services/keepstore/logging_router_test.go
discards 27b3d991cccca7b20762e55d6aff7c8f409485b9 (commit)
via 1b5e165e9d4e803844be8c2c41eea76e92d3887e (commit)
via a33399926a8a3491ebb5341615b65dd088bbc275 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (27b3d991cccca7b20762e55d6aff7c8f409485b9)
\
N -- N -- N (1b5e165e9d4e803844be8c2c41eea76e92d3887e)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 1b5e165e9d4e803844be8c2c41eea76e92d3887e
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Apr 29 12:57:09 2016 -0400
9068: Do not use coverage tools when using non-default test flags ({gostuff}_test=...)
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 53df93c..629ce97 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -493,22 +493,23 @@ do_test_once() {
# before trying "go test". Otherwise, coverage-reporting
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
+ go get -t "git.curoverse.com/arvados.git/$1" || return 1
if [[ -n "${testargs[$1]}" ]]
then
# "go test -check.vv giturl" doesn't work, but this
# does:
- cd "$WORKSPACE/$1" && \
- go get -t "git.curoverse.com/arvados.git/$1" && \
- go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
+ cd "$WORKSPACE/$1" && go test ${short:+-short} ${coverflags[@]} ${testargs[$1]}
else
# The above form gets verbose even when testargs is
# empty, so use this form in such cases:
- go get -t "git.curoverse.com/arvados.git/$1" && \
- go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
+ go test ${short:+-short} ${coverflags[@]} "git.curoverse.com/arvados.git/$1"
fi
result="$?"
- go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
- rm "$WORKSPACE/tmp/.$covername.tmp"
+ if [[ -f "$WORKSPACE/tmp/.$covername.tmp" ]]
+ then
+ go tool cover -html="$WORKSPACE/tmp/.$covername.tmp" -o "$WORKSPACE/tmp/$covername.html"
+ rm "$WORKSPACE/tmp/.$covername.tmp"
+ fi
elif [[ "$2" == "pip" ]]
then
# $3 can name a path directory for us to use, including trailing
commit a33399926a8a3491ebb5341615b65dd088bbc275
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Apr 29 12:55:24 2016 -0400
9068: Move buffer allocation from volumes to GetBlockHandler.
This makes the Volume interface more idiomatic: Get() accepts a buffer
to read into, and returns a number of bytes read, much like the Read()
method of an io.Reader.
It also makes it possible for GetBlockHandler to notice, while waiting
for a buffer, that the client has disconnected: In this case, it
releases the network socket and never asks any volumes to do any work.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index f08cebf..a6b98bd 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -139,11 +139,11 @@ func (v *AzureBlobVolume) Check() error {
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
var deadline time.Time
haveDeadline := false
- buf, err := v.get(loc)
- for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
+ size, err := v.get(loc, buf)
+ for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
// (apparently) does "CreateEmpty" and "CommitData"
@@ -163,34 +163,32 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
} else if time.Now().After(deadline) {
break
}
- bufs.Put(buf)
time.Sleep(azureWriteRacePollTime)
- buf, err = v.get(loc)
+ size, err = v.get(loc, buf)
}
if haveDeadline {
- log.Printf("Race ended with len(buf)==%d", len(buf))
+ log.Printf("Race ended with size==%d", size)
}
- return buf, err
+ return size, err
}
-func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
- expectSize := BlockSize
+func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+ expectSize := len(buf)
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
- return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+ return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
}
expectSize = int(props.ContentLength)
}
- buf := bufs.Get(expectSize)
if expectSize == 0 {
- return buf, nil
+ return 0, nil
}
// We'll update this actualSize if/when we get the last piece.
@@ -235,11 +233,10 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
wg.Wait()
for _, err := range errors {
if err != nil {
- bufs.Put(buf)
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
}
- return buf[:actualSize], nil
+ return actualSize, nil
}
// Compare the given data with existing stored data.
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 439b402..e3c0e27 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -425,13 +425,12 @@ func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
if err != nil {
t.Error(err)
}
- gotData, err := v.Get(hash)
+ gotData := make([]byte, len(data))
+ gotLen, err := v.Get(hash, gotData)
if err != nil {
t.Error(err)
}
gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
- gotLen := len(gotData)
- bufs.Put(gotData)
if gotLen != size {
t.Error("length mismatch: got %d != %d", gotLen, size)
}
@@ -477,11 +476,10 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
// Wait for the stub's Put to create the empty blob
v.azHandler.race <- continuePut
go func() {
- buf, err := v.Get(TestHash)
+ buf := make([]byte, len(TestBlock))
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Error(err)
- } else {
- bufs.Put(buf)
}
close(allDone)
}()
@@ -521,15 +519,15 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
allDone := make(chan struct{})
go func() {
defer close(allDone)
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Error(err)
return
}
- if len(buf) != 0 {
- t.Errorf("Got %+q, expected empty buf", buf)
+ if n != 0 {
+ t.Errorf("Got %+q, expected empty buf", buf[:n])
}
- bufs.Put(buf)
}()
select {
case <-allDone:
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a7675fb..fe9a3d3 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -561,7 +561,8 @@ func TestDeleteHandler(t *testing.T) {
expectedDc, responseDc)
}
// Confirm the block has been deleted
- _, err := vols[0].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ _, err := vols[0].Get(TestHash, buf)
var blockDeleted = os.IsNotExist(err)
if !blockDeleted {
t.Error("superuserExistingBlockReq: block not deleted")
@@ -585,7 +586,7 @@ func TestDeleteHandler(t *testing.T) {
expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TestHash)
+ _, err = vols[0].Get(TestHash, buf)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
@@ -913,6 +914,53 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
}
}
+type notifyingResponseRecorder struct {
+ *httptest.ResponseRecorder
+ closer chan bool
+}
+
+func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
+ return r.closer
+}
+
+func TestGetHandlerClientDisconnect(t *testing.T) {
+ defer func(was bool) {
+ enforcePermissions = was
+ }(enforcePermissions)
+ enforcePermissions = false
+
+ defer func(orig *bufferPool) {
+ bufs = orig
+ }(bufs)
+ bufs = newBufferPool(1, BlockSize)
+ defer bufs.Put(bufs.Get(BlockSize))
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+ t.Error(err)
+ }
+
+ resp := ¬ifyingResponseRecorder{
+ ResponseRecorder: httptest.NewRecorder(),
+ closer: make(chan bool, 1),
+ }
+ if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
+ t.Fatal("notifyingResponseRecorder is broken")
+ }
+ // If anyone asks, the client has disconnected.
+ resp.closer <- true
+ req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ MakeLoggingRESTRouter().ServeHTTP(resp, req)
+ ExpectStatusCode(t, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+ for i, v := range KeepVM.AllWritable() {
+ if calls := v.(*MockVolume).called["GET"]; calls != 0 {
+ t.Errorf("volume %d got %d calls, expected 0", i, calls)
+ }
+ }
+}
+
// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
// leak.
func TestGetHandlerNoBufferleak(t *testing.T) {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a188c47..80291f2 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -79,22 +79,35 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
}
}
- block, err := GetBlock(mux.Vars(req)["hash"])
+ // TODO: Probe volumes to check whether the block _might_
+ // exist. Some volumes/types could support a quick existence
+ // check without causing other operations to suffer. If all
+ // volumes support that, and assure us the block definitely
+ // isn't here, we can return 404 now instead of waiting for a
+ // buffer.
+
+ buf, err := getBufferForResponseWriter(resp, BlockSize)
if err != nil {
- // This type assertion is safe because the only errors
- // GetBlock can return are DiskHashError or NotFoundError.
- http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+ http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
- defer bufs.Put(block)
+ defer bufs.Put(buf)
- resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+ if err != nil {
+ code := http.StatusInternalServerError
+ if err, ok := err.(*KeepError); ok {
+ code = err.HTTPCode
+ }
+ http.Error(resp, err.Error(), code)
+ return
+ }
+
+ resp.Header().Set("Content-Length", strconv.Itoa(size))
resp.Header().Set("Content-Type", "application/octet-stream")
- resp.Write(block)
+ resp.Write(buf[:size])
}
-var errClientDisconnected = fmt.Errorf("client disconnected")
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if resp implements http.CloseNotifier and tells us that the
// client has disconnected before we get a buffer.
@@ -119,7 +132,7 @@ func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte,
// return it to the pool.
bufs.Put(<-bufReady)
}()
- return nil, errClientDisconnected
+ return nil, ErrClientDisconnect
}
}
@@ -516,7 +529,6 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
}
}
-// ==============================
// GetBlock and PutBlock implement lower-level code for handling
// blocks by rooting through volumes connected to the local machine.
// Once the handler has determined that system policy permits the
@@ -527,24 +539,21 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// should be the only part of the code that cares about which volume a
// block is stored on, so it should be responsible for figuring out
// which volume to check for fetching blocks, storing blocks, etc.
-// ==============================
-// GetBlock fetches and returns the block identified by "hash".
-//
-// On success, GetBlock returns a byte slice with the block data, and
-// a nil error.
+// GetBlock fetches the block identified by "hash" into the provided
+// buf, and returns the data size.
//
// If the block cannot be found on any volume, returns NotFoundError.
//
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(hash string) ([]byte, error) {
+func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
for _, vol := range KeepVM.AllReadable() {
- buf, err := vol.Get(hash)
+ size, err := vol.Get(hash, buf)
if err != nil {
// IsNotExist is an expected error and may be
// ignored. All other errors are logged. In
@@ -558,23 +567,22 @@ func GetBlock(hash string) ([]byte, error) {
}
// Check the file checksum.
//
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
log.Printf("%s: checksum mismatch for request %s (actual %s)",
vol, hash, filehash)
errorToCaller = DiskHashError
- bufs.Put(buf)
continue
}
if errorToCaller == DiskHashError {
log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
vol, hash)
}
- return buf, nil
+ return size, nil
}
- return nil, errorToCaller
+ return 0, errorToCaller
}
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
diff --git a/services/keepstore/handlers_with_generic_volume_test.go b/services/keepstore/handlers_with_generic_volume_test.go
index c5349d3..dda7edc 100644
--- a/services/keepstore/handlers_with_generic_volume_test.go
+++ b/services/keepstore/handlers_with_generic_volume_test.go
@@ -45,12 +45,13 @@ func testGetBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
testableVolumes[1].PutRaw(testHash, testBlock)
// Get should pass
- buf, err := GetBlock(testHash)
+ buf := make([]byte, len(testBlock))
+ n, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error while getting block %s", err)
}
- if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, testBlock)
+ if bytes.Compare(buf[:n], testBlock) != 0 {
+ t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf[:n], testBlock)
}
}
@@ -64,9 +65,10 @@ func testPutRawBadDataGetBlock(t TB, factory TestableVolumeManagerFactory,
testableVolumes[1].PutRaw(testHash, badData)
// Get should fail
- _, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err == nil {
- t.Fatalf("Expected error while getting corrupt block %v", testHash)
+ t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
}
}
@@ -85,11 +87,12 @@ func testPutBlock(t TB, factory TestableVolumeManagerFactory, testHash string, t
}
// Check that PutBlock stored the data as expected
- buf, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
- } else if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+ } else if bytes.Compare(buf[:size], testBlock) != 0 {
+ t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
}
}
@@ -109,10 +112,11 @@ func testPutBlockCorrupt(t TB, factory TestableVolumeManagerFactory,
// Put succeeded and overwrote the badData in one volume,
// and Get should return the testBlock now, ignoring the bad data.
- buf, err := GetBlock(testHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
- } else if bytes.Compare(buf, testBlock) != 0 {
- t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf)
+ } else if bytes.Compare(buf[:size], testBlock) != 0 {
+ t.Errorf("Get response incorrect. Expected %q; found %q", testBlock, buf[:size])
}
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index b17cc79..6117177 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -90,6 +90,7 @@ var (
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{503, "Client disconnected"}
)
func (e *KeepError) Error() string {
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 2a1c3d2..c0adbc0 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -66,12 +66,13 @@ func TestGetBlock(t *testing.T) {
}
// Check that GetBlock returns success.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
- if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
- t.Errorf("expected %s, got %s", TestBlock, result)
+ if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Errorf("got %v, expected %v", buf[:size], TestBlock)
}
}
@@ -86,9 +87,10 @@ func TestGetBlockMissing(t *testing.T) {
defer KeepVM.Close()
// Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != NotFoundError {
- t.Errorf("Expected NotFoundError, got %v", result)
+ t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
}
}
@@ -107,9 +109,10 @@ func TestGetBlockCorrupt(t *testing.T) {
vols[0].Put(TestHash, BadBlock)
// Check that GetBlock returns failure.
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != DiskHashError {
- t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
+ t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
}
}
@@ -133,13 +136,14 @@ func TestPutBlockOK(t *testing.T) {
}
vols := KeepVM.AllReadable()
- result, err := vols[1].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := vols[1].Get(TestHash, buf)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
- if string(result) != string(TestBlock) {
+ if string(buf[:n]) != string(TestBlock) {
t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
- string(TestBlock), string(result))
+ string(TestBlock), string(buf[:n]))
}
}
@@ -162,14 +166,14 @@ func TestPutBlockOneVol(t *testing.T) {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
- result, err := GetBlock(TestHash)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(TestHash, buf, nil)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
- if string(result) != string(TestBlock) {
- t.Error("PutBlock/GetBlock mismatch")
- t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
- string(TestBlock), string(result))
+ if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
+ TestBlock, buf[:size])
}
}
@@ -191,7 +195,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash); err != NotFoundError {
+ if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
@@ -216,10 +220,11 @@ func TestPutBlockCorrupt(t *testing.T) {
}
// The block on disk should now match TestBlock.
- if block, err := GetBlock(TestHash); err != nil {
+ buf := make([]byte, BlockSize)
+ if size, err := GetBlock(TestHash, buf, nil); err != nil {
t.Errorf("GetBlock: %v", err)
- } else if bytes.Compare(block, TestBlock) != 0 {
- t.Errorf("GetBlock returned: '%s'", string(block))
+ } else if bytes.Compare(buf[:size], TestBlock) != 0 {
+ t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
}
}
@@ -290,12 +295,13 @@ func TestPutBlockTouchFails(t *testing.T) {
t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
oldMtime, newMtime)
}
- result, err := vols[1].Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := vols[1].Get(TestHash, buf)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
- if bytes.Compare(result, TestBlock) != 0 {
- t.Errorf("new block does not match test block\nnew block = %v\n", result)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
}
}
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 9edfb6e..73b8324 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -20,6 +20,19 @@ type LoggingResponseWriter struct {
sentHdr time.Time
}
+func (w *LoggingResponseWriter) CloseNotify() <-chan bool {
+ wrapped, ok := w.ResponseWriter.(http.CloseNotifier)
+ if !ok {
+ // If upstream doesn't implement CloseNotifier, we can
+ // satisfy the interface by returning a channel that
+ // never sends anything (the interface doesn't
+ // guarantee that anything will ever be sent on the
+ // channel even if the client disconnects).
+ return nil
+ }
+ return wrapped.CloseNotify()
+}
+
// WriteHeader writes header to ResponseWriter
func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
if loggingWriter.sentHdr == zeroTime {
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
new file mode 100644
index 0000000..aa88556
--- /dev/null
+++ b/services/keepstore/logging_router_test.go
@@ -0,0 +1,10 @@
+package main
+
+import (
+ "net/http"
+ "testing"
+)
+
+func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
+ http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
+}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 79a680d..d068b2a 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -153,20 +153,18 @@ func (v *S3Volume) Check() error {
return nil
}
-func (v *S3Volume) Get(loc string) ([]byte, error) {
+func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
rdr, err := v.Bucket.GetReader(loc)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
defer rdr.Close()
- buf := bufs.Get(BlockSize)
n, err := io.ReadFull(rdr, buf)
switch err {
case nil, io.EOF, io.ErrUnexpectedEOF:
- return buf[:n], nil
+ return n, nil
default:
- bufs.Put(buf)
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
}
}
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index ac94061..d111cae 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -290,26 +290,27 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
}
}
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2)
+ size, err = GetBlock(testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
}
}
@@ -321,7 +322,8 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
if testData.DifferentMtimes {
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
- if _, err := volume.Get(testData.Locator1); err == nil {
+ buf := make([]byte, BlockSize)
+ if _, err := volume.Get(testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 17da54f..8c7e9a4 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -10,17 +10,14 @@ import (
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
- // Get a block. IFF the returned error is nil, the caller must
- // put the returned slice back into the buffer pool when it's
- // finished with it. (Otherwise, the buffer pool will be
- // depleted and eventually -- when all available buffers are
- // used and not returned -- operations will reach deadlock.)
+ // Get a block: copy the block data into buf, and return the
+ // number of bytes copied.
//
// loc is guaranteed to consist of 32 or more lowercase hex
// digits.
//
- // Get should not verify the integrity of the returned data:
- // it should just return whatever was found in its backing
+ // Get should not verify the integrity of the data: it should
+ // just return whatever was found in its backing
// store. (Integrity checking is the caller's responsibility.)
//
// If an error is encountered that prevents it from
@@ -36,10 +33,10 @@ type Volume interface {
// access log if the block is not found on any other volumes
// either).
//
- // If the data in the backing store is bigger than BlockSize,
- // Get is permitted to return an error without reading any of
- // the data.
- Get(loc string) ([]byte, error)
+ // If the data in the backing store is bigger than len(buf),
+ // which will not exceed BlockSize, then Get is permitted to
+ // return an error without reading any of the data.
+ Get(loc string, buf []byte) (int, error)
// Compare the given data with the stored data (i.e., what Get
// would return). If equal, return nil. If not, return
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 95166c2..105795c 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -89,14 +89,13 @@ func testGet(t TB, factory TestableVolumeFactory) {
v.PutRaw(TestHash, TestBlock)
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- bufs.Put(buf)
-
- if bytes.Compare(buf, TestBlock) != 0 {
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
t.Errorf("expected %s, got %s", string(TestBlock), string(buf))
}
}
@@ -107,7 +106,8 @@ func testGetNoSuchBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- if _, err := v.Get(TestHash2); err == nil {
+ buf := make([]byte, BlockSize)
+ if _, err := v.Get(TestHash2, buf); err == nil {
t.Errorf("Expected error while getting non-existing block %v", TestHash2)
}
}
@@ -208,24 +208,22 @@ func testPutBlockWithDifferentContent(t TB, factory TestableVolumeFactory, testH
v.PutRaw(testHash, testDataA)
putErr := v.Put(testHash, testDataB)
- buf, getErr := v.Get(testHash)
+ buf := make([]byte, BlockSize)
+ n, getErr := v.Get(testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
- if bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, testDataB)
+ if bytes.Compare(buf[:n], testDataB) != 0 {
+ t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf[:n], testDataB)
}
} else {
// It is permissible for Put to fail, but it must
// leave us with either the original data, the new
// data, or nothing at all.
- if getErr == nil && bytes.Compare(buf, testDataA) != 0 && bytes.Compare(buf, testDataB) != 0 {
- t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, testDataA, testDataB)
+ if getErr == nil && bytes.Compare(buf[:n], testDataA) != 0 && bytes.Compare(buf[:n], testDataB) != 0 {
+ t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf[:n], testDataA, testDataB)
}
}
- if getErr == nil {
- bufs.Put(buf)
- }
}
// Put and get multiple blocks
@@ -253,34 +251,32 @@ func testPutMultipleBlocks(t TB, factory TestableVolumeFactory) {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
- data, err := v.Get(TestHash)
+ data := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+ if bytes.Compare(data[:n], TestBlock) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock)
}
- bufs.Put(data)
}
- data, err = v.Get(TestHash2)
+ n, err = v.Get(TestHash2, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock2) != 0 {
- t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+ if bytes.Compare(data[:n], TestBlock2) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data[:n], TestBlock2)
}
- bufs.Put(data)
}
- data, err = v.Get(TestHash3)
+ n, err = v.Get(TestHash3, data)
if err != nil {
t.Error(err)
} else {
- if bytes.Compare(data, TestBlock3) != 0 {
- t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+ if bytes.Compare(data[:n], TestBlock3) != 0 {
+ t.Errorf("Block present, but to %+q, expected %+q", data[:n], TestBlock3)
}
- bufs.Put(data)
}
}
@@ -426,14 +422,12 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
- data, err := v.Get(TestHash)
+ data := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, data)
if err != nil {
t.Error(err)
- } else {
- if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", data, TestBlock)
- }
- bufs.Put(data)
+ } else if bytes.Compare(data[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", data[:n], TestBlock)
}
}
@@ -455,7 +449,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
- if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+ data := make([]byte, BlockSize)
+ if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
@@ -514,9 +509,10 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
}
v.PutRaw(TestHash, TestBlock)
+ buf := make([]byte, BlockSize)
// Get from read-only volume should succeed
- _, err := v.Get(TestHash)
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
@@ -526,7 +522,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
- _, err = v.Get(TestHash2)
+ _, err = v.Get(TestHash2, buf)
if err == nil {
t.Errorf("Expected error when getting block whose put in read-only volume failed")
}
@@ -561,45 +557,45 @@ func testGetConcurrent(t TB, factory TestableVolumeFactory) {
v.PutRaw(TestHash3, TestBlock3)
sem := make(chan int)
- go func(sem chan int) {
- buf, err := v.Get(TestHash)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf))
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
- go func(sem chan int) {
- buf, err := v.Get(TestHash2)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf))
+ if bytes.Compare(buf[:n], TestBlock2) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock2), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
- go func(sem chan int) {
- buf, err := v.Get(TestHash3)
+ go func() {
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock3) != 0 {
- t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf))
+ if bytes.Compare(buf[:n], TestBlock3) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TestBlock3), string(buf[:n]))
}
sem <- 1
- }(sem)
+ }()
// Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
+ for done := 0; done < 3; done++ {
+ <-sem
}
}
@@ -639,36 +635,34 @@ func testPutConcurrent(t TB, factory TestableVolumeFactory) {
}(sem)
// Wait for all goroutines to finish
- for done := 0; done < 3; {
- done += <-sem
+ for done := 0; done < 3; done++ {
+ <-sem
}
// Double check that we actually wrote the blocks we expected to write.
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("Get #1: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf))
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
}
- buf, err = v.Get(TestHash2)
+ n, err = v.Get(TestHash2, buf)
if err != nil {
t.Errorf("Get #2: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf))
+ if bytes.Compare(buf[:n], TestBlock2) != 0 {
+ t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
}
- buf, err = v.Get(TestHash3)
+ n, err = v.Get(TestHash3, buf)
if err != nil {
t.Errorf("Get #3: %v", err)
}
- bufs.Put(buf)
- if bytes.Compare(buf, TestBlock3) != 0 {
- t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf))
+ if bytes.Compare(buf[:n], TestBlock3) != 0 {
+ t.Errorf("Get #3: expected %s, got %s", string(TestBlock3), string(buf[:n]))
}
}
@@ -689,14 +683,13 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
if err != nil {
t.Fatal(err)
}
- rdata, err := v.Get(hash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(hash, buf)
if err != nil {
t.Error(err)
- } else {
- defer bufs.Put(rdata)
}
- if bytes.Compare(rdata, wdata) != 0 {
- t.Error("rdata != wdata")
+ if bytes.Compare(buf[:n], wdata) != 0 {
+ t.Error("buf %+q != wdata %+q", buf[:n], wdata)
}
}
@@ -717,14 +710,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
v.PutRaw(TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
// Trash
err = v.Trash(TestHash)
@@ -737,7 +730,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
t.Error(err)
}
} else {
- _, err = v.Get(TestHash)
+ _, err = v.Get(TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
@@ -750,14 +743,13 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
}
// Get the block - after trash and untrash sequence
- buf, err = v.Get(TestHash)
+ n, err = v.Get(TestHash, buf)
if err != nil {
t.Fatal(err)
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
}
func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
@@ -768,14 +760,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
}(trashLifetime)
checkGet := func() error {
- buf, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash, buf)
if err != nil {
return err
}
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+ if bytes.Compare(buf[:n], TestBlock) != 0 {
+ t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
}
- bufs.Put(buf)
return nil
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index e8a5a33..5671b8d 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -113,17 +113,16 @@ func (v *MockVolume) Compare(loc string, buf []byte) error {
}
}
-func (v *MockVolume) Get(loc string) ([]byte, error) {
+func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
v.gotCall("Get")
<-v.Gate
if v.Bad {
- return nil, errors.New("Bad volume")
+ return 0, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- buf := bufs.Get(len(block))
- copy(buf, block)
- return buf, nil
+ copy(buf[:len(block)], block)
+ return len(block), nil
}
- return nil, os.ErrNotExist
+ return 0, os.ErrNotExist
}
func (v *MockVolume) Put(loc string, block []byte) error {
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 996068c..edec048 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
return stat, err
}
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
- return nil, v.translateError(err)
+ return 0, v.translateError(err)
+ }
+ if stat.Size() > int64(len(buf)) {
+ return 0, TooLongError
}
- buf := bufs.Get(int(stat.Size()))
+ var read int
+ size := int(stat.Size())
err = v.getFunc(path, func(rdr io.Reader) error {
- _, err = io.ReadFull(rdr, buf)
+ read, err = io.ReadFull(rdr, buf[:size])
return err
})
- if err != nil {
- bufs.Put(buf)
- return nil, err
- }
- return buf, nil
+ return read, err
}
// Compare returns nil if Get(loc) would return the same content as
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 0775e89..c95538b 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -106,12 +106,13 @@ func TestGetNotFound(t *testing.T) {
defer v.Teardown()
v.Put(TestHash, TestBlock)
- buf, err := v.Get(TestHash2)
+ buf := make([]byte, BlockSize)
+ n, err := v.Get(TestHash2, buf)
switch {
case os.IsNotExist(err):
break
case err == nil:
- t.Errorf("Read should have failed, returned %s", string(buf))
+ t.Errorf("Read should have failed, returned %+q", buf[:n])
default:
t.Errorf("Read expected ErrNotExist, got: %s", err)
}
@@ -151,7 +152,8 @@ func TestUnixVolumeReadonly(t *testing.T) {
v.PutRaw(TestHash, TestBlock)
- _, err := v.Get(TestHash)
+ buf := make([]byte, BlockSize)
+ _, err := v.Get(TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list