[ARVADOS] updated: 4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd

git at public.curoverse.com git at public.curoverse.com
Tue Apr 8 00:06:30 EDT 2014


Summary of changes:
 services/keep/keep.go      |   64 +++++++++++++++++++++++++++++--------------
 services/keep/keep_test.go |   36 ++++++++++++++++++++----
 2 files changed, 73 insertions(+), 27 deletions(-)

       via  4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd (commit)
      from  bc4eea1e54651c507b8add0f5edc80968673c6fc (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4eba2cb5d7d6a5d76b9ce4d3f535a1de911400cd
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Apr 8 00:04:29 2014 -0400

    Check for MD5 collisions and corrupt blocks. (refs #2292)

diff --git a/services/keep/keep.go b/services/keep/keep.go
index 4534860..5083e69 100644
--- a/services/keep/keep.go
+++ b/services/keep/keep.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bufio"
+	"bytes"
 	"crypto/md5"
 	"errors"
 	"fmt"
@@ -34,6 +35,15 @@ type KeepError struct {
 	Err      error
 }
 
+const (
+	ErrCollision = 400
+	ErrMD5Fail   = 401
+	ErrCorrupt   = 402
+	ErrNotFound  = 404
+	ErrOther     = 500
+	ErrFull      = 503
+)
+
 func (e *KeepError) Error() string {
 	return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
 }
@@ -172,7 +182,7 @@ func GetBlock(hash string) ([]byte, error) {
 			//
 			log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
 				vol, blockFilename, filehash)
-			continue
+			return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
 		}
 
 		// Success!
@@ -180,7 +190,7 @@ func GetBlock(hash string) ([]byte, error) {
 	}
 
 	log.Printf("%s: not found on any volumes, giving up\n", hash)
-	return buf, &KeepError{404, errors.New("not found: " + hash)}
+	return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
 }
 
 /* PutBlock(block, hash)
@@ -195,15 +205,18 @@ func GetBlock(hash string) ([]byte, error) {
    On success, PutBlock returns nil.
    On failure, it returns a KeepError with one of the following codes:
 
+   400 Collision
+          A different block with the same hash already exists on this
+          Keep server.
    401 MD5Fail
-         -- The MD5 hash of the BLOCK does not match the argument HASH.
+          The MD5 hash of the BLOCK does not match the argument HASH.
    503 Full
-         -- There was not enough space left in any Keep volume to store
-            the object.
+          There was not enough space left in any Keep volume to store
+          the object.
    500 Fail
-         -- The object could not be stored for some other reason (e.g.
-            all writes failed). The text of the error message should
-            provide as much detail as possible.
+          The object could not be stored for some other reason (e.g.
+          all writes failed). The text of the error message should
+          provide as much detail as possible.
 */
 
 func PutBlock(block []byte, hash string) error {
@@ -211,9 +224,25 @@ func PutBlock(block []byte, hash string) error {
 	blockhash := fmt.Sprintf("%x", md5.Sum(block))
 	if blockhash != hash {
 		log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-		return &KeepError{401, errors.New("MD5Fail")}
+		return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
 	}
 
+	// If we already have a block on disk under this identifier, return
+	// success (but check for MD5 collisions, which may signify on-disk corruption).
+	if oldblock, err := GetBlock(hash); err == nil {
+		if bytes.Compare(block, oldblock) == 0 {
+			return nil
+		} else {
+			return &KeepError{ErrCollision, errors.New("Collision")}
+		}
+	} else {
+		ke := err.(*KeepError)
+		if ke.HTTPCode == ErrCorrupt {
+			return &KeepError{ErrCollision, errors.New("Collision")}
+		}
+	}
+
+	// Store the block on the first available Keep volume.
 	allFull := true
 	for _, vol := range KeepVolumes {
 		if IsFull(vol) {
@@ -228,18 +257,11 @@ func PutBlock(block []byte, hash string) error {
 		}
 
 		blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
+
 		f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
 		if err != nil {
-			// if the block already exists, just return success.
-			// TODO(twp): should we check here whether the file on disk
-			// matches the file we were asked to store?
-			if os.IsExist(err) {
-				return nil
-			} else {
-				// Open failed for some other reason.
-				log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
-				continue
-			}
+			log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
+			continue
 		}
 
 		if _, err := f.Write(block); err == nil {
@@ -253,10 +275,10 @@ func PutBlock(block []byte, hash string) error {
 
 	if allFull {
 		log.Printf("all Keep volumes full")
-		return &KeepError{503, errors.New("Full")}
+		return &KeepError{ErrFull, errors.New("Full")}
 	} else {
 		log.Printf("all Keep volumes failed")
-		return &KeepError{500, errors.New("Fail")}
+		return &KeepError{ErrOther, errors.New("Fail")}
 	}
 }
 
diff --git a/services/keep/keep_test.go b/services/keep/keep_test.go
index bbff19d..cb5e89c 100644
--- a/services/keep/keep_test.go
+++ b/services/keep/keep_test.go
@@ -129,11 +129,11 @@ func TestPutBlockOneVol(t *testing.T) {
 	}
 }
 
-// TestPutBlockCorrupt
+// TestPutBlockMD5Fail
 //     Check that PutBlock returns an error if passed a block and hash that
 //     do not match.
 //
-func TestPutBlockCorrupt(t *testing.T) {
+func TestPutBlockMD5Fail(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes.
@@ -145,7 +145,7 @@ func TestPutBlockCorrupt(t *testing.T) {
 		t.Error("PutBlock succeeded despite a block mismatch")
 	} else {
 		ke := err.(*KeepError)
-		if ke.HTTPCode != 401 || ke.Err.Error() != "MD5Fail" {
+		if ke.HTTPCode != ErrMD5Fail {
 			t.Errorf("PutBlock returned the wrong error (%v)", ke)
 		}
 	}
@@ -157,6 +157,32 @@ func TestPutBlockCorrupt(t *testing.T) {
 	}
 }
 
+// TestPutBlockCollision
+//     PutBlock must report a 400 Collision error when asked to store a block
+//     when a different block exists on disk under the same identifier.
+//
+func TestPutBlockCollision(t *testing.T) {
+	defer teardown()
+
+	// Create two test Keep volumes.
+	KeepVolumes = setup(t, 2)
+
+	// Store a corrupted block under TEST_HASH.
+	store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
+
+	// Attempting to put TEST_BLOCK should produce a 400 Collision error.
+	if err := PutBlock(TEST_BLOCK, TEST_HASH); err == nil {
+		t.Error("Expected PutBlock error, but no error returned")
+	} else {
+		ke := err.(*KeepError)
+		if ke.HTTPCode != ErrCollision {
+			t.Errorf("Expected 400 Collision error, got %v", ke)
+		}
+	}
+
+	KeepVolumes = nil
+}
+
 // TestFindKeepVolumes
 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
 //     directories at the top level.
@@ -251,7 +277,7 @@ func teardown() {
 // store
 //     Low-level code to write Keep blocks directly to disk for testing.
 //
-func store(t *testing.T, keepdir string, filename string, block []byte) error {
+func store(t *testing.T, keepdir string, filename string, block []byte) {
 	blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
 	if err := os.MkdirAll(blockdir, 0755); err != nil {
 		t.Fatal(err)
@@ -264,6 +290,4 @@ func store(t *testing.T, keepdir string, filename string, block []byte) error {
 	} else {
 		t.Fatal(err)
 	}
-
-	return nil
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list