[ARVADOS] created: 8f05c08e5836f9fda631c8afe287bfd0bc4feffc
git at public.curoverse.com
git at public.curoverse.com
Thu Sep 3 01:19:11 EDT 2015
at 8f05c08e5836f9fda631c8afe287bfd0bc4feffc (commit)
commit 8f05c08e5836f9fda631c8afe287bfd0bc4feffc
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Sep 3 01:19:05 2015 -0400
7121: Return DiskHashError instead of CollisionError from Compare() where appropriate.
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 2ffa8fa..ccf8987 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -4,6 +4,7 @@ package main
import (
"bytes"
+ "crypto/md5"
"fmt"
"io"
"io/ioutil"
@@ -86,6 +87,44 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
return stat, err
}
+// Compute the MD5 digest of data found on disk (possibly including
+// some already read into buf1 and buf2 and some still readable from
+// rdr). If all data is read successfully, return DiskHashError or
+// CollisionError depending on whether it matches expectMD5. If
+// an error occurs while reading, return that error.
+func (v *UnixVolume) collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
+ outcome := make(chan error)
+ data := make(chan []byte, 1)
+ go func() {
+ h := md5.New()
+ for b := range data {
+ h.Write(b)
+ }
+ if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
+ outcome <- CollisionError
+ } else {
+ outcome <- DiskHashError
+ }
+ }()
+ data <- buf1
+ if buf2 != nil {
+ data <- buf2
+ }
+ buf := make([]byte, 1 << 20)
+ var err error
+ for rdr != nil && err == nil {
+ var n int
+ n, err = rdr.Read(buf)
+ data <- buf[:n]
+ }
+ close(data)
+ if err != nil {
+ <-outcome
+ return err
+ }
+ return <-outcome
+}
+
// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
@@ -111,12 +150,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
// Compare returns nil if Get(loc) would return the same content as
// cmp. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
-//
-// TODO(TC): Before returning CollisionError, compute the MD5 digest
-// of the data on disk (i.e., known-to-be-equal data in cmp +
-// remaining data on disk) and return DiskHashError instead of
-// CollisionError if it doesn't equal loc[:32].
-func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
@@ -126,6 +160,7 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
if int64(bufLen) > stat.Size() {
bufLen = int(stat.Size())
}
+ cmp := expect
buf := make([]byte, bufLen)
return v.getFunc(path, func(rdr io.Reader) error {
// Loop invariants: all data read so far matched what
@@ -134,17 +169,13 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
// reader.
for {
n, err := rdr.Read(buf)
- if n > len(cmp) {
- // file on disk is too long
- return CollisionError
- } else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
- return CollisionError
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return v.collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
}
cmp = cmp[n:]
if err == io.EOF {
if len(cmp) != 0 {
- // file on disk is too short
- return CollisionError
+ return v.collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:0], nil)
}
return nil
} else if err != nil {
commit 8ac1cdc7230ca1e9603856e3d2071ca1f86e7262
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Sep 3 00:54:52 2015 -0400
7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume, UnixVolume, MockVolume.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a656ecf..a9bf91e 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -231,6 +231,9 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
uri: "/" + TEST_HASH,
request_body: TEST_BLOCK,
})
+ defer func(orig bool) {
+ never_delete = orig
+ }(never_delete)
never_delete = false
IssueRequest(
&RequestTester{
@@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
}
for _, e := range []expect{
{0, "Get", 0},
+ {0, "Compare", 0},
{0, "Touch", 0},
{0, "Put", 0},
{0, "Delete", 0},
- {1, "Get", 1},
+ {1, "Get", 0},
+ {1, "Compare", 1},
+ {1, "Touch", 1},
{1, "Put", 1},
{1, "Delete", 1},
} {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a86bb6a..8cc7d8b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -8,7 +8,6 @@ package main
// StatusHandler (GET /status.json)
import (
- "bytes"
"container/list"
"crypto/md5"
"encoding/json"
@@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
}
}
- block, err := GetBlock(mux.Vars(req)["hash"], false)
+ block, err := GetBlock(mux.Vars(req)["hash"])
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
@@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// which volume to check for fetching blocks, storing blocks, etc.
// ==============================
-// GetBlock fetches and returns the block identified by "hash". If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+// GetBlock fetches and returns the block identified by "hash".
//
// On success, GetBlock returns a byte slice with the block data, and
// a nil error.
@@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// DiskHashError.
//
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- var vols []Volume
- if update_timestamp {
- // Pointless to find the block on an unwritable volume
- // because Touch() will fail -- this is as good as
- // "not found" for purposes of callers who need to
- // update_timestamp.
- vols = KeepVM.AllWritable()
- } else {
- vols = KeepVM.AllReadable()
- }
-
- for _, vol := range vols {
+ for _, vol := range KeepVM.AllReadable() {
buf, err := vol.Get(hash)
if err != nil {
// IsNotExist is an expected error and may be
@@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
vol, hash)
}
- if update_timestamp {
- if err := vol.Touch(hash); err != nil {
- error_to_caller = GenericError
- log.Printf("%s: Touch %s failed: %s",
- vol, hash, error_to_caller)
- bufs.Put(buf)
- continue
- }
- }
return buf, nil
}
return nil, error_to_caller
@@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error {
return RequestHashError
}
- // If we already have a block on disk under this identifier, return
- // success (but check for MD5 collisions). While fetching the block,
- // update its timestamp.
- // The only errors that GetBlock can return are DiskHashError and NotFoundError.
- // In either case, we want to write our new (good) block to disk,
- // so there is nothing special to do if err != nil.
- //
- if oldblock, err := GetBlock(hash, true); err == nil {
- defer bufs.Put(oldblock)
- if bytes.Compare(block, oldblock) == 0 {
- // The block already exists; return success.
- return nil
- } else {
- return CollisionError
- }
+ // If we already have this data, it's intact on disk, and we
+ // can update its timestamp, return success. If we have
+ // different data with the same hash, return failure.
+ if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ return err
}
// Choose a Keep volume to write to.
@@ -603,6 +569,35 @@ func PutBlock(block []byte, hash string) error {
}
}
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+ var bestErr error = NotFoundError
+ for _, vol := range KeepVM.AllWritable() {
+ if err := vol.Compare(hash, buf); err == CollisionError {
+ // Stop if we have a block with same hash but
+ // different content. (It will be impossible
+ // to tell which one is wanted if we have
+ // both, so there's no point writing it even
+ // on a different volume.)
+ return err
+ } else if err != nil {
+ // Couldn't find, couldn't open, etc.: try next volume.
+ continue
+ }
+ if err := vol.Touch(hash); err != nil {
+ log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+ bestErr = err
+ continue
+ }
+ // Compare and Touch both worked --> done.
+ return nil
+ }
+ return bestErr
+}
+
var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
// IsValidLocator
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index e01b013..b89925f 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -60,7 +60,7 @@ func TestGetBlock(t *testing.T) {
}
// Check that GetBlock returns success.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TEST_HASH)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
@@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) {
defer KeepVM.Close()
// Check that GetBlock returns failure.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TEST_HASH)
if err != NotFoundError {
t.Errorf("Expected NotFoundError, got %v", result)
}
@@ -101,7 +101,7 @@ func TestGetBlockCorrupt(t *testing.T) {
vols[0].Put(TEST_HASH, BAD_BLOCK)
// Check that GetBlock returns failure.
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TEST_HASH)
if err != DiskHashError {
t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
}
@@ -156,7 +156,7 @@ func TestPutBlockOneVol(t *testing.T) {
t.Fatalf("PutBlock: %v", err)
}
- result, err := GetBlock(TEST_HASH, false)
+ result, err := GetBlock(TEST_HASH)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
@@ -185,7 +185,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+ if result, err := GetBlock(TEST_HASH); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
@@ -210,7 +210,7 @@ func TestPutBlockCorrupt(t *testing.T) {
}
// The block on disk should now match TEST_BLOCK.
- if block, err := GetBlock(TEST_HASH, false); err != nil {
+ if block, err := GetBlock(TEST_HASH); err != nil {
t.Errorf("GetBlock: %v", err)
} else if bytes.Compare(block, TEST_BLOCK) != 0 {
t.Errorf("GetBlock returned: '%s'", string(block))
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 40b291e..a626d9b 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1, false)
+ data, _ := GetBlock(testData.Locator1)
if testData.ExpectLocator1 {
if len(data) == 0 {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2, false)
+ data, _ = GetBlock(testData.Locator2)
if testData.ExpectLocator2 {
if len(data) == 0 {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 64fea34..d3616d0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -15,7 +15,12 @@ type Volume interface {
// put the returned slice back into the buffer pool when it's
// finished with it.
Get(loc string) ([]byte, error)
- Put(loc string, block []byte) error
+ // Confirm Get() would return buf. If so, return nil. If not,
+ // return CollisionError or DiskHashError (depending on
+ // whether the data on disk matches the expected hash), or
+ // whatever error was encountered opening/reading the file.
+ Compare(loc string, data []byte) error
+ Put(loc string, data []byte) error
Touch(loc string) error
Mtime(loc string) (time.Time, error)
IndexTo(prefix string, writer io.Writer) error
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index c5a7491..cbf6fb8 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -1,6 +1,8 @@
package main
import (
+ "bytes"
+ "crypto/md5"
"errors"
"fmt"
"io"
@@ -71,6 +73,24 @@ func (v *MockVolume) gotCall(method string) {
}
}
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+ v.gotCall("Compare")
+ <-v.Gate
+ if v.Bad {
+ return errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+ return DiskHashError
+ }
+ if bytes.Compare(buf, block) != 0 {
+ return CollisionError
+ }
+ return nil
+ } else {
+ return NotFoundError
+ }
+}
+
func (v *MockVolume) Get(loc string) ([]byte, error) {
v.gotCall("Get")
<-v.Gate
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index a7ad6f9..2ffa8fa 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -3,6 +3,7 @@
package main
import (
+ "bytes"
"fmt"
"io"
"io/ioutil"
@@ -57,35 +58,49 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
}
}
+// Open the given file, apply the serialize lock if enabled, and call
+// the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
+ return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+ stat, err := os.Stat(path)
+ if err == nil {
+ if stat.Size() < 0 {
+ err = os.ErrInvalid
+ } else if stat.Size() > BLOCKSIZE {
+ err = TooLongError
+ }
+ }
+ return stat, err
+}
+
// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- stat, err := os.Stat(path)
+ stat, err := v.stat(path)
if err != nil {
return nil, err
}
- if stat.Size() < 0 {
- return nil, os.ErrInvalid
- } else if stat.Size() == 0 {
- return bufs.Get(0), nil
- } else if stat.Size() > BLOCKSIZE {
- return nil, TooLongError
- }
- f, err := os.Open(path)
- if err != nil {
- return nil, err
- }
- defer f.Close()
buf := bufs.Get(int(stat.Size()))
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
- }
- _, err = io.ReadFull(f, buf)
+ err = v.getFunc(path, func(rdr io.Reader) error {
+ _, err = io.ReadFull(rdr, buf)
+ return err
+ })
if err != nil {
bufs.Put(buf)
return nil, err
@@ -93,6 +108,52 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
return buf, nil
}
+// Compare returns nil if Get(loc) would return the same content as
+// cmp. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+//
+// TODO(TC): Before returning CollisionError, compute the MD5 digest
+// of the data on disk (i.e., known-to-be-equal data in cmp +
+// remaining data on disk) and return DiskHashError instead of
+// CollisionError if it doesn't equal loc[:32].
+func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+ path := v.blockPath(loc)
+ stat, err := v.stat(path)
+ if err != nil {
+ return err
+ }
+ bufLen := 1 << 20
+ if int64(bufLen) > stat.Size() {
+ bufLen = int(stat.Size())
+ }
+ buf := make([]byte, bufLen)
+ return v.getFunc(path, func(rdr io.Reader) error {
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // reader.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) {
+ // file on disk is too long
+ return CollisionError
+ } else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return CollisionError
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ // file on disk is too short
+ return CollisionError
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+ })
+}
+
// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
commit a3a31e3831d2e0b0ec2018405969d5ae55a9642a
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Sep 2 23:19:23 2015 -0400
7121: Add test case to demonstrate deadlock.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 8be4710..a656ecf 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -807,6 +807,39 @@ func ExpectBody(
}
}
+// See #7121
+func TestPutNeedsOnlyOneBuffer(t *testing.T) {
+ defer teardown()
+ KeepVM = MakeTestVolumeManager(1)
+ defer KeepVM.Close()
+
+ defer func(orig *bufferPool) {
+ bufs = orig
+ }(bufs)
+ bufs = newBufferPool(1, BLOCKSIZE)
+
+ ok := make(chan struct{})
+ go func() {
+ for i := 0; i < 2; i++ {
+ response := IssueRequest(
+ &RequestTester{
+ method: "PUT",
+ uri: "/" + TEST_HASH,
+ request_body: TEST_BLOCK,
+ })
+ ExpectStatusCode(t,
+ "TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
+ }
+ ok <- struct{}{}
+ }()
+
+ select {
+ case <-ok:
+ case <-time.After(time.Second):
+ t.Fatal("PUT deadlocks with maxBuffers==1")
+ }
+}
+
// Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
// leak.
func TestPutHandlerNoBufferleak(t *testing.T) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list