[ARVADOS] updated: 4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd
git at public.curoverse.com
git at public.curoverse.com
Tue Apr 8 00:06:30 EDT 2014
Summary of changes:
services/keep/keep.go | 64 +++++++++++++++++++++++++++++--------------
services/keep/keep_test.go | 36 ++++++++++++++++++++----
2 files changed, 73 insertions(+), 27 deletions(-)
via 4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd (commit)
from bc4eea1e54651c507b8add0f5edc80968673c6fc (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Apr 8 00:04:29 2014 -0400
Check for MD5 collisions and corrupt blocks. (refs #2292)
diff --git a/services/keep/keep.go b/services/keep/keep.go
index 4534860..5083e69 100644
--- a/services/keep/keep.go
+++ b/services/keep/keep.go
@@ -2,6 +2,7 @@ package main
import (
"bufio"
+ "bytes"
"crypto/md5"
"errors"
"fmt"
@@ -34,6 +35,15 @@ type KeepError struct {
Err error
}
+const (
+ ErrCollision = 400
+ ErrMD5Fail = 401
+ ErrCorrupt = 402
+ ErrNotFound = 404
+ ErrOther = 500
+ ErrFull = 503
+)
+
func (e *KeepError) Error() string {
return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
}
@@ -172,7 +182,7 @@ func GetBlock(hash string) ([]byte, error) {
//
log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
vol, blockFilename, filehash)
- continue
+ return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
}
// Success!
@@ -180,7 +190,7 @@ func GetBlock(hash string) ([]byte, error) {
}
log.Printf("%s: not found on any volumes, giving up\n", hash)
- return buf, &KeepError{404, errors.New("not found: " + hash)}
+ return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
}
/* PutBlock(block, hash)
@@ -195,15 +205,18 @@ func GetBlock(hash string) ([]byte, error) {
On success, PutBlock returns nil.
On failure, it returns a KeepError with one of the following codes:
+ 400 Collision
+ A different block with the same hash already exists on this
+ Keep server.
401 MD5Fail
- -- The MD5 hash of the BLOCK does not match the argument HASH.
+ The MD5 hash of the BLOCK does not match the argument HASH.
503 Full
- -- There was not enough space left in any Keep volume to store
- the object.
+ There was not enough space left in any Keep volume to store
+ the object.
500 Fail
- -- The object could not be stored for some other reason (e.g.
- all writes failed). The text of the error message should
- provide as much detail as possible.
+ The object could not be stored for some other reason (e.g.
+ all writes failed). The text of the error message should
+ provide as much detail as possible.
*/
func PutBlock(block []byte, hash string) error {
@@ -211,9 +224,25 @@ func PutBlock(block []byte, hash string) error {
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return &KeepError{401, errors.New("MD5Fail")}
+ return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
}
+ // If we already have a block on disk under this identifier, return
+ // success (but check for MD5 collisions, which may signify on-disk corruption).
+ if oldblock, err := GetBlock(hash); err == nil {
+ if bytes.Compare(block, oldblock) == 0 {
+ return nil
+ } else {
+ return &KeepError{ErrCollision, errors.New("Collision")}
+ }
+ } else {
+ ke := err.(*KeepError)
+ if ke.HTTPCode == ErrCorrupt {
+ return &KeepError{ErrCollision, errors.New("Collision")}
+ }
+ }
+
+ // Store the block on the first available Keep volume.
allFull := true
for _, vol := range KeepVolumes {
if IsFull(vol) {
@@ -228,18 +257,11 @@ func PutBlock(block []byte, hash string) error {
}
blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
+
f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
- // if the block already exists, just return success.
- // TODO(twp): should we check here whether the file on disk
- // matches the file we were asked to store?
- if os.IsExist(err) {
- return nil
- } else {
- // Open failed for some other reason.
- log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
- continue
- }
+ log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
+ continue
}
if _, err := f.Write(block); err == nil {
@@ -253,10 +275,10 @@ func PutBlock(block []byte, hash string) error {
if allFull {
log.Printf("all Keep volumes full")
- return &KeepError{503, errors.New("Full")}
+ return &KeepError{ErrFull, errors.New("Full")}
} else {
log.Printf("all Keep volumes failed")
- return &KeepError{500, errors.New("Fail")}
+ return &KeepError{ErrOther, errors.New("Fail")}
}
}
diff --git a/services/keep/keep_test.go b/services/keep/keep_test.go
index bbff19d..cb5e89c 100644
--- a/services/keep/keep_test.go
+++ b/services/keep/keep_test.go
@@ -129,11 +129,11 @@ func TestPutBlockOneVol(t *testing.T) {
}
}
-// TestPutBlockCorrupt
+// TestPutBlockMD5Fail
// Check that PutBlock returns an error if passed a block and hash that
// do not match.
//
-func TestPutBlockCorrupt(t *testing.T) {
+func TestPutBlockMD5Fail(t *testing.T) {
defer teardown()
// Create two test Keep volumes.
@@ -145,7 +145,7 @@ func TestPutBlockCorrupt(t *testing.T) {
t.Error("PutBlock succeeded despite a block mismatch")
} else {
ke := err.(*KeepError)
- if ke.HTTPCode != 401 || ke.Err.Error() != "MD5Fail" {
+ if ke.HTTPCode != ErrMD5Fail {
t.Errorf("PutBlock returned the wrong error (%v)", ke)
}
}
@@ -157,6 +157,32 @@ func TestPutBlockCorrupt(t *testing.T) {
}
}
+// TestPutBlockCollision
+// PutBlock must report a 400 Collision error when asked to store a block
+// when a different block exists on disk under the same identifier.
+//
+func TestPutBlockCollision(t *testing.T) {
+ defer teardown()
+
+ // Create two test Keep volumes.
+ KeepVolumes = setup(t, 2)
+
+ // Store a corrupted block under TEST_HASH.
+ store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
+
+ // Attempting to put TEST_BLOCK should produce a 400 Collision error.
+ if err := PutBlock(TEST_BLOCK, TEST_HASH); err == nil {
+ t.Error("Expected PutBlock error, but no error returned")
+ } else {
+ ke := err.(*KeepError)
+ if ke.HTTPCode != ErrCollision {
+ t.Errorf("Expected 400 Collision error, got %v", ke)
+ }
+ }
+
+ KeepVolumes = nil
+}
+
// TestFindKeepVolumes
// Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
// directories at the top level.
@@ -251,7 +277,7 @@ func teardown() {
// store
// Low-level code to write Keep blocks directly to disk for testing.
//
-func store(t *testing.T, keepdir string, filename string, block []byte) error {
+func store(t *testing.T, keepdir string, filename string, block []byte) {
blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
t.Fatal(err)
@@ -264,6 +290,4 @@ func store(t *testing.T, keepdir string, filename string, block []byte) error {
} else {
t.Fatal(err)
}
-
- return nil
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list