[ARVADOS] updated: 799fabc9d0da8a165fe60a6c10dd207e281c5b5d

git at public.curoverse.com git at public.curoverse.com
Thu Sep 3 13:44:23 EDT 2015


Summary of changes:
 services/keepstore/collision.go        | 49 ++++++++++++++++++++
 services/keepstore/collision_test.go   | 45 +++++++++++++++++++
 services/keepstore/volume_unix.go      | 45 +------------------
 services/keepstore/volume_unix_test.go | 82 ++++++++++++++++++++++++++++++++++
 4 files changed, 178 insertions(+), 43 deletions(-)
 create mode 100644 services/keepstore/collision.go
 create mode 100644 services/keepstore/collision_test.go

  discards  8f05c08e5836f9fda631c8afe287bfd0bc4feffc (commit)
  discards  8ac1cdc7230ca1e9603856e3d2071ca1f86e7262 (commit)
       via  799fabc9d0da8a165fe60a6c10dd207e281c5b5d (commit)
       via  343d330c4115d2844adfd5b7cea92d6f05d53c30 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (8f05c08e5836f9fda631c8afe287bfd0bc4feffc)
            \
             N -- N -- N (799fabc9d0da8a165fe60a6c10dd207e281c5b5d)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 799fabc9d0da8a165fe60a6c10dd207e281c5b5d
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Sep 3 13:42:32 2015 -0400

    7121: Return DiskHashError instead of CollisionError from Compare() where appropriate.

diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
new file mode 100644
index 0000000..210286a
--- /dev/null
+++ b/services/keepstore/collision.go
@@ -0,0 +1,49 @@
+package main
+
+import (
+	"crypto/md5"
+	"fmt"
+	"io"
+)
+
+// Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
+// all bytes readable from rdr). If all data is read successfully,
+// return DiskHashError or CollisionError depending on whether it
+// matches expectMD5. If an error occurs while reading, return that
+// error.
+//
+// "content has expected MD5" is called a collision because this
+// function is used in cases where we have another block in hand with
+// the given MD5 but different content.
+func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
+	outcome := make(chan error)
+	data := make(chan []byte, 1)
+	go func() {
+		h := md5.New()
+		for b := range data {
+			h.Write(b)
+		}
+		if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
+			outcome <- CollisionError
+		} else {
+			outcome <- DiskHashError
+		}
+	}()
+	data <- buf1
+	if buf2 != nil {
+		data <- buf2
+	}
+	var err error
+	for rdr != nil && err == nil {
+		buf := make([]byte, 1 << 18)
+		var n int
+		n, err = rdr.Read(buf)
+		data <- buf[:n]
+	}
+	close(data)
+	if rdr != nil && err != io.EOF {
+		<-outcome
+		return err
+	}
+	return <-outcome
+}
diff --git a/services/keepstore/collision_test.go b/services/keepstore/collision_test.go
new file mode 100644
index 0000000..e6cfd16
--- /dev/null
+++ b/services/keepstore/collision_test.go
@@ -0,0 +1,45 @@
+package main
+
+import (
+	"bytes"
+	"testing"
+	"testing/iotest"
+
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&CollisionSuite{})
+
+type CollisionSuite struct{}
+
+func (s *CollisionSuite) TestCollisionOrCorrupt(c *check.C) {
+	fooMD5 := "acbd18db4cc2f85cedef654fccc4a4d8"
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o'}, bytes.NewBufferString("o")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, nil, bytes.NewBufferString("oo")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o', 'o'}, nil),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, nil, []byte{}, bytes.NewBufferString("foo")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, nil, nil, iotest.NewReadLogger("foo: ", iotest.DataErrReader(iotest.OneByteReader(bytes.NewBufferString("foo"))))),
+		check.Equals, CollisionError)
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("bar")),
+		check.Equals, DiskHashError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, nil),
+		check.Equals, DiskHashError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, bytes.NewBufferString("")),
+		check.Equals, DiskHashError)
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, iotest.TimeoutReader(iotest.OneByteReader(bytes.NewBufferString("foo")))),
+		check.Equals, iotest.ErrTimeout)
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 2ffa8fa..368ddc5 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -1,5 +1,3 @@
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
@@ -111,12 +109,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 // Compare returns nil if Get(loc) would return the same content as
 // cmp. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-//
-// TODO(TC): Before returning CollisionError, compute the MD5 digest
-// of the data on disk (i.e., known-to-be-equal data in cmp +
-// remaining data on disk) and return DiskHashError instead of
-// CollisionError if it doesn't equal loc[:32].
-func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
@@ -126,6 +119,7 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
 	if int64(bufLen) > stat.Size() {
 		bufLen = int(stat.Size())
 	}
+	cmp := expect
 	buf := make([]byte, bufLen)
 	return v.getFunc(path, func(rdr io.Reader) error {
 		// Loop invariants: all data read so far matched what
@@ -134,17 +128,13 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
 		// reader.
 		for {
 			n, err := rdr.Read(buf)
-			if n > len(cmp) {
-				// file on disk is too long
-				return CollisionError
-			} else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
-				return CollisionError
+			if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+				return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
 			}
 			cmp = cmp[n:]
 			if err == io.EOF {
 				if len(cmp) != 0 {
-					// file on disk is too short
-					return CollisionError
+					return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
 				}
 				return nil
 			} else if err != nil {

commit 343d330c4115d2844adfd5b7cea92d6f05d53c30
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Sep 3 00:54:52 2015 -0400

    7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume, UnixVolume, MockVolume.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a656ecf..a9bf91e 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -231,6 +231,9 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 			uri:          "/" + TEST_HASH,
 			request_body: TEST_BLOCK,
 		})
+	defer func(orig bool) {
+		never_delete = orig
+	}(never_delete)
 	never_delete = false
 	IssueRequest(
 		&RequestTester{
@@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 	}
 	for _, e := range []expect{
 		{0, "Get", 0},
+		{0, "Compare", 0},
 		{0, "Touch", 0},
 		{0, "Put", 0},
 		{0, "Delete", 0},
-		{1, "Get", 1},
+		{1, "Get", 0},
+		{1, "Compare", 1},
+		{1, "Touch", 1},
 		{1, "Put", 1},
 		{1, "Delete", 1},
 	} {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a86bb6a..8cc7d8b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -8,7 +8,6 @@ package main
 // StatusHandler   (GET /status.json)
 
 import (
-	"bytes"
 	"container/list"
 	"crypto/md5"
 	"encoding/json"
@@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		}
 	}
 
-	block, err := GetBlock(mux.Vars(req)["hash"], false)
+	block, err := GetBlock(mux.Vars(req)["hash"])
 	if err != nil {
 		// This type assertion is safe because the only errors
 		// GetBlock can return are DiskHashError or NotFoundError.
@@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // which volume to check for fetching blocks, storing blocks, etc.
 
 // ==============================
-// GetBlock fetches and returns the block identified by "hash".  If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+// GetBlock fetches and returns the block identified by "hash".
 //
 // On success, GetBlock returns a byte slice with the block data, and
 // a nil error.
@@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	error_to_caller := NotFoundError
 
-	var vols []Volume
-	if update_timestamp {
-		// Pointless to find the block on an unwritable volume
-		// because Touch() will fail -- this is as good as
-		// "not found" for purposes of callers who need to
-		// update_timestamp.
-		vols = KeepVM.AllWritable()
-	} else {
-		vols = KeepVM.AllReadable()
-	}
-
-	for _, vol := range vols {
+	for _, vol := range KeepVM.AllReadable() {
 		buf, err := vol.Get(hash)
 		if err != nil {
 			// IsNotExist is an expected error and may be
@@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
 			log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
 				vol, hash)
 		}
-		if update_timestamp {
-			if err := vol.Touch(hash); err != nil {
-				error_to_caller = GenericError
-				log.Printf("%s: Touch %s failed: %s",
-					vol, hash, error_to_caller)
-				bufs.Put(buf)
-				continue
-			}
-		}
 		return buf, nil
 	}
 	return nil, error_to_caller
@@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error {
 		return RequestHashError
 	}
 
-	// If we already have a block on disk under this identifier, return
-	// success (but check for MD5 collisions).  While fetching the block,
-	// update its timestamp.
-	// The only errors that GetBlock can return are DiskHashError and NotFoundError.
-	// In either case, we want to write our new (good) block to disk,
-	// so there is nothing special to do if err != nil.
-	//
-	if oldblock, err := GetBlock(hash, true); err == nil {
-		defer bufs.Put(oldblock)
-		if bytes.Compare(block, oldblock) == 0 {
-			// The block already exists; return success.
-			return nil
-		} else {
-			return CollisionError
-		}
+	// If we already have this data, it's intact on disk, and we
+	// can update its timestamp, return success. If we have
+	// different data with the same hash, return failure.
+	if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+		return err
 	}
 
 	// Choose a Keep volume to write to.
@@ -603,6 +569,35 @@ func PutBlock(block []byte, hash string) error {
 	}
 }
 
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+	var bestErr error = NotFoundError
+	for _, vol := range KeepVM.AllWritable() {
+		if err := vol.Compare(hash, buf); err == CollisionError {
+			// Stop if we have a block with same hash but
+			// different content. (It will be impossible
+			// to tell which one is wanted if we have
+			// both, so there's no point writing it even
+			// on a different volume.)
+			return err
+		} else if err != nil {
+			// Couldn't find, couldn't open, etc.: try next volume.
+			continue
+		}
+		if err := vol.Touch(hash); err != nil {
+			log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+			bestErr = err
+			continue
+		}
+		// Compare and Touch both worked --> done.
+		return nil
+	}
+	return bestErr
+}
+
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IsValidLocator
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index e01b013..b89925f 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -60,7 +60,7 @@ func TestGetBlock(t *testing.T) {
 	}
 
 	// Check that GetBlock returns success.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != nil {
 		t.Errorf("GetBlock error: %s", err)
 	}
@@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) {
 	defer KeepVM.Close()
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != NotFoundError {
 		t.Errorf("Expected NotFoundError, got %v", result)
 	}
@@ -101,7 +101,7 @@ func TestGetBlockCorrupt(t *testing.T) {
 	vols[0].Put(TEST_HASH, BAD_BLOCK)
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != DiskHashError {
 		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
 	}
@@ -156,7 +156,7 @@ func TestPutBlockOneVol(t *testing.T) {
 		t.Fatalf("PutBlock: %v", err)
 	}
 
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != nil {
 		t.Fatalf("GetBlock: %v", err)
 	}
@@ -185,7 +185,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	}
 
 	// Confirm that GetBlock fails to return anything.
-	if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+	if result, err := GetBlock(TEST_HASH); err != NotFoundError {
 		t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
 			string(result), err)
 	}
@@ -210,7 +210,7 @@ func TestPutBlockCorrupt(t *testing.T) {
 	}
 
 	// The block on disk should now match TEST_BLOCK.
-	if block, err := GetBlock(TEST_HASH, false); err != nil {
+	if block, err := GetBlock(TEST_HASH); err != nil {
 		t.Errorf("GetBlock: %v", err)
 	} else if bytes.Compare(block, TEST_BLOCK) != 0 {
 		t.Errorf("GetBlock returned: '%s'", string(block))
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 40b291e..a626d9b 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
 	// Verify Locator1 to be un/deleted as expected
-	data, _ := GetBlock(testData.Locator1, false)
+	data, _ := GetBlock(testData.Locator1)
 	if testData.ExpectLocator1 {
 		if len(data) == 0 {
 			t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator2 to be un/deleted as expected
 	if testData.Locator1 != testData.Locator2 {
-		data, _ = GetBlock(testData.Locator2, false)
+		data, _ = GetBlock(testData.Locator2)
 		if testData.ExpectLocator2 {
 			if len(data) == 0 {
 				t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 64fea34..d3616d0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -15,7 +15,12 @@ type Volume interface {
 	// put the returned slice back into the buffer pool when it's
 	// finished with it.
 	Get(loc string) ([]byte, error)
-	Put(loc string, block []byte) error
+	// Confirm Get() would return buf. If so, return nil. If not,
+	// return CollisionError or DiskHashError (depending on
+	// whether the data on disk matches the expected hash), or
+	// whatever error was encountered opening/reading the file.
+	Compare(loc string, data []byte) error
+	Put(loc string, data []byte) error
 	Touch(loc string) error
 	Mtime(loc string) (time.Time, error)
 	IndexTo(prefix string, writer io.Writer) error
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index c5a7491..cbf6fb8 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -1,6 +1,8 @@
 package main
 
 import (
+	"bytes"
+	"crypto/md5"
 	"errors"
 	"fmt"
 	"io"
@@ -71,6 +73,24 @@ func (v *MockVolume) gotCall(method string) {
 	}
 }
 
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+	v.gotCall("Compare")
+	<-v.Gate
+	if v.Bad {
+		return errors.New("Bad volume")
+	} else if block, ok := v.Store[loc]; ok {
+		if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+			return DiskHashError
+		}
+		if bytes.Compare(buf, block) != 0 {
+			return CollisionError
+		}
+		return nil
+	} else {
+		return NotFoundError
+	}
+}
+
 func (v *MockVolume) Get(loc string) ([]byte, error) {
 	v.gotCall("Get")
 	<-v.Gate
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index a7ad6f9..2ffa8fa 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -3,6 +3,7 @@
 package main
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -57,35 +58,49 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	}
 }
 
+// Open the given file, apply the serialize lock if enabled, and call
+// the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+	f, err := os.Open(path)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
+	return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+	stat, err := os.Stat(path)
+	if err == nil {
+		if stat.Size() < 0 {
+			err = os.ErrInvalid
+		} else if stat.Size() > BLOCKSIZE {
+			err = TooLongError
+		}
+	}
+	return stat, err
+}
+
 // Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	path := v.blockPath(loc)
-	stat, err := os.Stat(path)
+	stat, err := v.stat(path)
 	if err != nil {
 		return nil, err
 	}
-	if stat.Size() < 0 {
-		return nil, os.ErrInvalid
-	} else if stat.Size() == 0 {
-		return bufs.Get(0), nil
-	} else if stat.Size() > BLOCKSIZE {
-		return nil, TooLongError
-	}
-	f, err := os.Open(path)
-	if err != nil {
-		return nil, err
-	}
-	defer f.Close()
 	buf := bufs.Get(int(stat.Size()))
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
-	}
-	_, err = io.ReadFull(f, buf)
+	err = v.getFunc(path, func(rdr io.Reader) error {
+		_, err = io.ReadFull(rdr, buf)
+		return err
+	})
 	if err != nil {
 		bufs.Put(buf)
 		return nil, err
@@ -93,6 +108,52 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	return buf, nil
 }
 
+// Compare returns nil if Get(loc) would return the same content as
+// cmp. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+//
+// TODO(TC): Before returning CollisionError, compute the MD5 digest
+// of the data on disk (i.e., known-to-be-equal data in cmp +
+// remaining data on disk) and return DiskHashError instead of
+// CollisionError if it doesn't equal loc[:32].
+func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+	path := v.blockPath(loc)
+	stat, err := v.stat(path)
+	if err != nil {
+		return err
+	}
+	bufLen := 1 << 20
+	if int64(bufLen) > stat.Size() {
+		bufLen = int(stat.Size())
+	}
+	buf := make([]byte, bufLen)
+	return v.getFunc(path, func(rdr io.Reader) error {
+		// Loop invariants: all data read so far matched what
+		// we expected, and the first N bytes of cmp are
+		// expected to equal the next N bytes read from
+		// reader.
+		for {
+			n, err := rdr.Read(buf)
+			if n > len(cmp) {
+				// file on disk is too long
+				return CollisionError
+			} else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
+				return CollisionError
+			}
+			cmp = cmp[n:]
+			if err == io.EOF {
+				if len(cmp) != 0 {
+					// file on disk is too short
+					return CollisionError
+				}
+				return nil
+			} else if err != nil {
+				return err
+			}
+		}
+	})
+}
+
 // Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index ebb8421..6ccc865 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -2,7 +2,9 @@ package main
 
 import (
 	"bytes"
+	"errors"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
 	"regexp"
@@ -385,3 +387,83 @@ func TestNodeStatus(t *testing.T) {
 		t.Errorf("uninitialized bytes_used in %v", volinfo)
 	}
 }
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	v.Put(TEST_HASH, TEST_BLOCK)
+	mockErr := errors.New("Mock error")
+	err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		return mockErr
+	})
+	if err != mockErr {
+		t.Errorf("Got %v, expected %v", err, mockErr)
+	}
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	funcCalled := false
+	err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		funcCalled = true
+		return nil
+	})
+	if err == nil {
+		t.Errorf("Expected error opening non-existent file")
+	}
+	if funcCalled {
+		t.Errorf("Worker func should not have been called")
+	}
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+	v := TempUnixVolume(t, true, false)
+	defer _teardown(v)
+
+	v.mutex.Lock()
+	locked := true
+	go func() {
+		// TODO(TC): Don't rely on Sleep. Mock the mutex instead?
+		time.Sleep(10 * time.Millisecond)
+		locked = false
+		v.mutex.Unlock()
+	}()
+	v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		if locked {
+			t.Errorf("Worker func called before serialize lock was obtained")
+		}
+		return nil
+	})
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	v.Put(TEST_HASH, TEST_BLOCK)
+	err := v.Compare(TEST_HASH, TEST_BLOCK)
+	if err != nil {
+		t.Errorf("Got err %q, expected nil", err)
+	}
+
+	err = v.Compare(TEST_HASH, []byte("baddata"))
+	if err != CollisionError {
+		t.Errorf("Got err %q, expected %q", err, CollisionError)
+	}
+
+	_store(t, v, TEST_HASH, []byte("baddata"))
+	err = v.Compare(TEST_HASH, TEST_BLOCK)
+	if err != DiskHashError {
+		t.Errorf("Got err %q, expected %q", err, DiskHashError)
+	}
+
+	p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+	os.Chmod(p, 000)
+	err = v.Compare(TEST_HASH, TEST_BLOCK)
+	if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+		t.Errorf("Got err %q, expected %q", err, "permission denied")
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list