[ARVADOS] updated: 947b6db31486b4926100517047e4c02b6b4d5d05

git at public.curoverse.com git at public.curoverse.com
Mon Sep 7 16:43:30 EDT 2015


Summary of changes:
 doc/README.textile                                 |   2 +-
 doc/_includes/_alert-incomplete.liquid             |   2 +-
 doc/_includes/_alert_stub.liquid                   |   2 +-
 doc/_includes/_navbar_top.liquid                   |   2 +-
 doc/index.html.liquid                              |   8 +-
 doc/install/install-api-server.html.textile.liquid |   2 +-
 doc/install/install-docker.html.textile.liquid     |   2 +-
 doc/sdk/go/index.html.textile.liquid               |   4 +-
 .../firstpipeline.html.textile.liquid              |   2 +-
 .../examples/crunch-examples.html.textile.liquid   |  16 +-
 .../getting_started/community.html.textile.liquid  |   4 +-
 ...rial-gatk-variantfiltration.html.textile.liquid |   2 +-
 .../tutorials/intro-crunch.html.textile.liquid     |   2 +-
 .../running-external-program.html.textile.liquid   |   6 +-
 .../tutorial-keep-mount.html.textile.liquid        |   2 +-
 sdk/cli/bin/arv                                    |   9 +-
 services/arv-git-httpd/gitolite_test.go            |  93 ++++++++++++
 services/arv-git-httpd/integration_test.go         | 133 +++++++++++++++++
 services/arv-git-httpd/server_test.go              | 161 ++++-----------------
 19 files changed, 285 insertions(+), 169 deletions(-)
 create mode 100644 services/arv-git-httpd/gitolite_test.go
 create mode 100644 services/arv-git-httpd/integration_test.go

  discards  94e1489066f1038f0b4bc95a45af2bedc4b54be4 (commit)
  discards  799fabc9d0da8a165fe60a6c10dd207e281c5b5d (commit)
  discards  343d330c4115d2844adfd5b7cea92d6f05d53c30 (commit)
  discards  a3a31e3831d2e0b0ec2018405969d5ae55a9642a (commit)
       via  947b6db31486b4926100517047e4c02b6b4d5d05 (commit)
       via  8674798427a9d9e20e1586fe783aa5af6712ebb6 (commit)
       via  7930d7abaabf2fd1f3432eca10f26b821e0ef94f (commit)
       via  a3cda4b279d7446b55caab3a7ca8aef5960204cc (commit)
       via  eb73e21144a3b6460d7550583860b25fba3fffc8 (commit)
       via  ac1c1f563b53d22629e36e7d67de028abe55f1df (commit)
       via  0969c534741180e61ac318ffe799ac3512fc19d8 (commit)
       via  fb82c3a9ab31a55b708e61b183b30bd640f38c79 (commit)
       via  b7355341718c6d177b809bedbadc5a4fc4652de6 (commit)
       via  bef5ead515cbb990a461cad98dc01d631fcb3c36 (commit)
       via  7bf0bc0355fde5829644df8557990cd2353d92b6 (commit)
       via  b4151bcf5010bf6c8f26c819eb569cd58442cd1c (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (94e1489066f1038f0b4bc95a45af2bedc4b54be4)
            \
             N -- N -- N (947b6db31486b4926100517047e4c02b6b4d5d05)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 947b6db31486b4926100517047e4c02b6b4d5d05
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Sep 3 15:15:32 2015 -0400

    7121: Test mutex usage with a mock instead of time.Sleep.

diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 3dfdce2..53cf7be 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -14,6 +14,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 )
@@ -132,10 +133,14 @@ func (vs *volumeSet) Set(value string) error {
 	if _, err := os.Stat(value); err != nil {
 		return err
 	}
+	var locker sync.Locker
+	if flagSerializeIO {
+		locker = &sync.Mutex{}
+	}
 	*vs = append(*vs, &UnixVolume{
-		root:      value,
-		serialize: flagSerializeIO,
-		readonly:  flagReadonly,
+		root:     value,
+		locker:   locker,
+		readonly: flagReadonly,
 	})
 	return nil
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 368ddc5..f91861a 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -18,10 +18,12 @@ import (
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-	root      string // path to the volume's root directory
-	serialize bool
-	readonly  bool
-	mutex     sync.Mutex
+	// path to the volume's root directory
+	root string
+	// something to lock during IO, typically a sync.Mutex (or nil
+	// to skip locking)
+	locker   sync.Locker
+	readonly bool
 }
 
 func (v *UnixVolume) Touch(loc string) error {
@@ -34,9 +36,9 @@ func (v *UnixVolume) Touch(loc string) error {
 		return err
 	}
 	defer f.Close()
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
+	if v.locker != nil {
+		v.locker.Lock()
+		defer v.locker.Unlock()
 	}
 	if e := lockfile(f); e != nil {
 		return e
@@ -56,17 +58,17 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	}
 }
 
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Open the given file, lock the "serialize" locker if enabled, and
+// call the given function if and when the file is ready to read.
 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
 	f, err := os.Open(path)
 	if err != nil {
 		return err
 	}
 	defer f.Close()
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
+	if v.locker != nil {
+		v.locker.Lock()
+		defer v.locker.Unlock()
 	}
 	return fn(f)
 }
@@ -169,9 +171,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 	}
 	bpath := v.blockPath(loc)
 
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
+	if v.locker != nil {
+		v.locker.Lock()
+		defer v.locker.Unlock()
 	}
 	if _, err := tmpfile.Write(block); err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -298,9 +300,9 @@ func (v *UnixVolume) Delete(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
+	if v.locker != nil {
+		v.locker.Lock()
+		defer v.locker.Unlock()
 	}
 	p := v.blockPath(loc)
 	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 6ccc865..08ca31c 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -10,6 +10,7 @@ import (
 	"regexp"
 	"sort"
 	"strings"
+	"sync"
 	"syscall"
 	"testing"
 	"time"
@@ -20,10 +21,14 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
 	if err != nil {
 		t.Fatal(err)
 	}
+	var locker sync.Locker
+	if serialize {
+		locker = &sync.Mutex{}
+	}
 	return &UnixVolume{
-		root:      d,
-		serialize: serialize,
-		readonly:  readonly,
+		root:     d,
+		locker:   locker,
+		readonly: readonly,
 	}
 }
 
@@ -420,23 +425,38 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
 }
 
 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
-	v := TempUnixVolume(t, true, false)
+	v := TempUnixVolume(t, false, false)
 	defer _teardown(v)
 
-	v.mutex.Lock()
-	locked := true
-	go func() {
-		// TODO(TC): Don't rely on Sleep. Mock the mutex instead?
-		time.Sleep(10 * time.Millisecond)
-		locked = false
-		v.mutex.Unlock()
-	}()
-	v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
-		if locked {
-			t.Errorf("Worker func called before serialize lock was obtained")
-		}
+	v.Put(TEST_HASH, TEST_BLOCK)
+
+	mtx := NewMockMutex()
+	v.locker = mtx
+
+	funcCalled := make(chan struct{})
+	go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		funcCalled <- struct{}{}
 		return nil
 	})
+	select {
+	case mtx.AllowLock <- struct{}{}:
+	case <-funcCalled:
+		t.Fatal("Function was called before mutex was acquired")
+	case <-time.After(5 * time.Second):
+		t.Fatal("Timed out before mutex was acquired")
+	}
+	select {
+	case <-funcCalled:
+	case mtx.AllowUnlock <- struct{}{}:
+		t.Fatal("Mutex was released before function was called")
+	case <-time.After(5 * time.Second):
+		t.Fatal("Timed out waiting for funcCalled")
+	}
+	select {
+	case mtx.AllowUnlock <- struct{}{}:
+	case <-time.After(5 * time.Second):
+		t.Fatal("Timed out waiting for getFunc() to release mutex")
+	}
 }
 
 func TestUnixVolumeCompare(t *testing.T) {

commit 8674798427a9d9e20e1586fe783aa5af6712ebb6
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Sep 3 13:42:32 2015 -0400

    7121: Return DiskHashError instead of CollisionError from Compare() where appropriate.

diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
new file mode 100644
index 0000000..210286a
--- /dev/null
+++ b/services/keepstore/collision.go
@@ -0,0 +1,49 @@
+package main
+
+import (
+	"crypto/md5"
+	"fmt"
+	"io"
+)
+
+// Compute the MD5 digest of a data block (consisting of buf1 + buf2 +
+// all bytes readable from rdr). If all data is read successfully,
+// return DiskHashError or CollisionError depending on whether it
+// matches expectMD5. If an error occurs while reading, return that
+// error.
+//
+// "content has expected MD5" is called a collision because this
+// function is used in cases where we have another block in hand with
+// the given MD5 but different content.
+func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) error {
+	outcome := make(chan error)
+	data := make(chan []byte, 1)
+	go func() {
+		h := md5.New()
+		for b := range data {
+			h.Write(b)
+		}
+		if fmt.Sprintf("%x", h.Sum(nil)) == expectMD5 {
+			outcome <- CollisionError
+		} else {
+			outcome <- DiskHashError
+		}
+	}()
+	data <- buf1
+	if buf2 != nil {
+		data <- buf2
+	}
+	var err error
+	for rdr != nil && err == nil {
+		buf := make([]byte, 1 << 18)
+		var n int
+		n, err = rdr.Read(buf)
+		data <- buf[:n]
+	}
+	close(data)
+	if rdr != nil && err != io.EOF {
+		<-outcome
+		return err
+	}
+	return <-outcome
+}
diff --git a/services/keepstore/collision_test.go b/services/keepstore/collision_test.go
new file mode 100644
index 0000000..e6cfd16
--- /dev/null
+++ b/services/keepstore/collision_test.go
@@ -0,0 +1,45 @@
+package main
+
+import (
+	"bytes"
+	"testing"
+	"testing/iotest"
+
+	check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&CollisionSuite{})
+
+type CollisionSuite struct{}
+
+func (s *CollisionSuite) TestCollisionOrCorrupt(c *check.C) {
+	fooMD5 := "acbd18db4cc2f85cedef654fccc4a4d8"
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o'}, bytes.NewBufferString("o")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, nil, bytes.NewBufferString("oo")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f'}, []byte{'o', 'o'}, nil),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, nil, []byte{}, bytes.NewBufferString("foo")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("")),
+		check.Equals, CollisionError)
+	c.Check(collisionOrCorrupt(fooMD5, nil, nil, iotest.NewReadLogger("foo: ", iotest.DataErrReader(iotest.OneByteReader(bytes.NewBufferString("foo"))))),
+		check.Equals, CollisionError)
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o', 'o'}, nil, bytes.NewBufferString("bar")),
+		check.Equals, DiskHashError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{'f', 'o'}, nil, nil),
+		check.Equals, DiskHashError)
+	c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, bytes.NewBufferString("")),
+		check.Equals, DiskHashError)
+
+	c.Check(collisionOrCorrupt(fooMD5, []byte{}, nil, iotest.TimeoutReader(iotest.OneByteReader(bytes.NewBufferString("foo")))),
+		check.Equals, iotest.ErrTimeout)
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 2ffa8fa..368ddc5 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -1,5 +1,3 @@
-// A UnixVolume is a Volume backed by a locally mounted disk.
-//
 package main
 
 import (
@@ -111,12 +109,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 // Compare returns nil if Get(loc) would return the same content as
 // cmp. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-//
-// TODO(TC): Before returning CollisionError, compute the MD5 digest
-// of the data on disk (i.e., known-to-be-equal data in cmp +
-// remaining data on disk) and return DiskHashError instead of
-// CollisionError if it doesn't equal loc[:32].
-func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+func (v *UnixVolume) Compare(loc string, expect []byte) error {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
@@ -126,6 +119,7 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
 	if int64(bufLen) > stat.Size() {
 		bufLen = int(stat.Size())
 	}
+	cmp := expect
 	buf := make([]byte, bufLen)
 	return v.getFunc(path, func(rdr io.Reader) error {
 		// Loop invariants: all data read so far matched what
@@ -134,17 +128,13 @@ func (v *UnixVolume) Compare(loc string, cmp []byte) error {
 		// reader.
 		for {
 			n, err := rdr.Read(buf)
-			if n > len(cmp) {
-				// file on disk is too long
-				return CollisionError
-			} else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
-				return CollisionError
+			if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+				return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
 			}
 			cmp = cmp[n:]
 			if err == io.EOF {
 				if len(cmp) != 0 {
-					// file on disk is too short
-					return CollisionError
+					return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
 				}
 				return nil
 			} else if err != nil {

commit 7930d7abaabf2fd1f3432eca10f26b821e0ef94f
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Sep 3 00:54:52 2015 -0400

    7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume, UnixVolume, MockVolume.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a656ecf..a9bf91e 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -231,6 +231,9 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 			uri:          "/" + TEST_HASH,
 			request_body: TEST_BLOCK,
 		})
+	defer func(orig bool) {
+		never_delete = orig
+	}(never_delete)
 	never_delete = false
 	IssueRequest(
 		&RequestTester{
@@ -246,10 +249,13 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
 	}
 	for _, e := range []expect{
 		{0, "Get", 0},
+		{0, "Compare", 0},
 		{0, "Touch", 0},
 		{0, "Put", 0},
 		{0, "Delete", 0},
-		{1, "Get", 1},
+		{1, "Get", 0},
+		{1, "Compare", 1},
+		{1, "Touch", 1},
 		{1, "Put", 1},
 		{1, "Delete", 1},
 	} {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a86bb6a..8cc7d8b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -8,7 +8,6 @@ package main
 // StatusHandler   (GET /status.json)
 
 import (
-	"bytes"
 	"container/list"
 	"crypto/md5"
 	"encoding/json"
@@ -74,7 +73,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		}
 	}
 
-	block, err := GetBlock(mux.Vars(req)["hash"], false)
+	block, err := GetBlock(mux.Vars(req)["hash"])
 	if err != nil {
 		// This type assertion is safe because the only errors
 		// GetBlock can return are DiskHashError or NotFoundError.
@@ -442,10 +441,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // which volume to check for fetching blocks, storing blocks, etc.
 
 // ==============================
-// GetBlock fetches and returns the block identified by "hash".  If
-// the update_timestamp argument is true, GetBlock also updates the
-// block's file modification time (for the sake of PutBlock, which
-// must update the file's timestamp when the block already exists).
+// GetBlock fetches and returns the block identified by "hash".
 //
 // On success, GetBlock returns a byte slice with the block data, and
 // a nil error.
@@ -456,22 +452,11 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 
-func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
+func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	error_to_caller := NotFoundError
 
-	var vols []Volume
-	if update_timestamp {
-		// Pointless to find the block on an unwritable volume
-		// because Touch() will fail -- this is as good as
-		// "not found" for purposes of callers who need to
-		// update_timestamp.
-		vols = KeepVM.AllWritable()
-	} else {
-		vols = KeepVM.AllReadable()
-	}
-
-	for _, vol := range vols {
+	for _, vol := range KeepVM.AllReadable() {
 		buf, err := vol.Get(hash)
 		if err != nil {
 			// IsNotExist is an expected error and may be
@@ -500,15 +485,6 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
 			log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
 				vol, hash)
 		}
-		if update_timestamp {
-			if err := vol.Touch(hash); err != nil {
-				error_to_caller = GenericError
-				log.Printf("%s: Touch %s failed: %s",
-					vol, hash, error_to_caller)
-				bufs.Put(buf)
-				continue
-			}
-		}
 		return buf, nil
 	}
 	return nil, error_to_caller
@@ -548,21 +524,11 @@ func PutBlock(block []byte, hash string) error {
 		return RequestHashError
 	}
 
-	// If we already have a block on disk under this identifier, return
-	// success (but check for MD5 collisions).  While fetching the block,
-	// update its timestamp.
-	// The only errors that GetBlock can return are DiskHashError and NotFoundError.
-	// In either case, we want to write our new (good) block to disk,
-	// so there is nothing special to do if err != nil.
-	//
-	if oldblock, err := GetBlock(hash, true); err == nil {
-		defer bufs.Put(oldblock)
-		if bytes.Compare(block, oldblock) == 0 {
-			// The block already exists; return success.
-			return nil
-		} else {
-			return CollisionError
-		}
+	// If we already have this data, it's intact on disk, and we
+	// can update its timestamp, return success. If we have
+	// different data with the same hash, return failure.
+	if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+		return err
 	}
 
 	// Choose a Keep volume to write to.
@@ -603,6 +569,35 @@ func PutBlock(block []byte, hash string) error {
 	}
 }
 
+// CompareAndTouch returns nil if one of the volumes already has the
+// given content and it successfully updates the relevant block's
+// modification time in order to protect it from premature garbage
+// collection.
+func CompareAndTouch(hash string, buf []byte) error {
+	var bestErr error = NotFoundError
+	for _, vol := range KeepVM.AllWritable() {
+		if err := vol.Compare(hash, buf); err == CollisionError {
+			// Stop if we have a block with same hash but
+			// different content. (It will be impossible
+			// to tell which one is wanted if we have
+			// both, so there's no point writing it even
+			// on a different volume.)
+			return err
+		} else if err != nil {
+			// Couldn't find, couldn't open, etc.: try next volume.
+			continue
+		}
+		if err := vol.Touch(hash); err != nil {
+			log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+			bestErr = err
+			continue
+		}
+		// Compare and Touch both worked --> done.
+		return nil
+	}
+	return bestErr
+}
+
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IsValidLocator
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index e01b013..b89925f 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -60,7 +60,7 @@ func TestGetBlock(t *testing.T) {
 	}
 
 	// Check that GetBlock returns success.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != nil {
 		t.Errorf("GetBlock error: %s", err)
 	}
@@ -80,7 +80,7 @@ func TestGetBlockMissing(t *testing.T) {
 	defer KeepVM.Close()
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != NotFoundError {
 		t.Errorf("Expected NotFoundError, got %v", result)
 	}
@@ -101,7 +101,7 @@ func TestGetBlockCorrupt(t *testing.T) {
 	vols[0].Put(TEST_HASH, BAD_BLOCK)
 
 	// Check that GetBlock returns failure.
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != DiskHashError {
 		t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
 	}
@@ -156,7 +156,7 @@ func TestPutBlockOneVol(t *testing.T) {
 		t.Fatalf("PutBlock: %v", err)
 	}
 
-	result, err := GetBlock(TEST_HASH, false)
+	result, err := GetBlock(TEST_HASH)
 	if err != nil {
 		t.Fatalf("GetBlock: %v", err)
 	}
@@ -185,7 +185,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	}
 
 	// Confirm that GetBlock fails to return anything.
-	if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
+	if result, err := GetBlock(TEST_HASH); err != NotFoundError {
 		t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
 			string(result), err)
 	}
@@ -210,7 +210,7 @@ func TestPutBlockCorrupt(t *testing.T) {
 	}
 
 	// The block on disk should now match TEST_BLOCK.
-	if block, err := GetBlock(TEST_HASH, false); err != nil {
+	if block, err := GetBlock(TEST_HASH); err != nil {
 		t.Errorf("GetBlock: %v", err)
 	} else if bytes.Compare(block, TEST_BLOCK) != 0 {
 		t.Errorf("GetBlock returned: '%s'", string(block))
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 40b291e..a626d9b 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -290,7 +290,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
 	// Verify Locator1 to be un/deleted as expected
-	data, _ := GetBlock(testData.Locator1, false)
+	data, _ := GetBlock(testData.Locator1)
 	if testData.ExpectLocator1 {
 		if len(data) == 0 {
 			t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
@@ -303,7 +303,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 
 	// Verify Locator2 to be un/deleted as expected
 	if testData.Locator1 != testData.Locator2 {
-		data, _ = GetBlock(testData.Locator2, false)
+		data, _ = GetBlock(testData.Locator2)
 		if testData.ExpectLocator2 {
 			if len(data) == 0 {
 				t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 64fea34..d3616d0 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -15,7 +15,12 @@ type Volume interface {
 	// put the returned slice back into the buffer pool when it's
 	// finished with it.
 	Get(loc string) ([]byte, error)
-	Put(loc string, block []byte) error
+	// Confirm Get() would return buf. If so, return nil. If not,
+	// return CollisionError or DiskHashError (depending on
+	// whether the data on disk matches the expected hash), or
+	// whatever error was encountered opening/reading the file.
+	Compare(loc string, data []byte) error
+	Put(loc string, data []byte) error
 	Touch(loc string) error
 	Mtime(loc string) (time.Time, error)
 	IndexTo(prefix string, writer io.Writer) error
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index c5a7491..cbf6fb8 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -1,6 +1,8 @@
 package main
 
 import (
+	"bytes"
+	"crypto/md5"
 	"errors"
 	"fmt"
 	"io"
@@ -71,6 +73,24 @@ func (v *MockVolume) gotCall(method string) {
 	}
 }
 
+func (v *MockVolume) Compare(loc string, buf []byte) error {
+	v.gotCall("Compare")
+	<-v.Gate
+	if v.Bad {
+		return errors.New("Bad volume")
+	} else if block, ok := v.Store[loc]; ok {
+		if fmt.Sprintf("%x", md5.Sum(block)) != loc {
+			return DiskHashError
+		}
+		if bytes.Compare(buf, block) != 0 {
+			return CollisionError
+		}
+		return nil
+	} else {
+		return NotFoundError
+	}
+}
+
 func (v *MockVolume) Get(loc string) ([]byte, error) {
 	v.gotCall("Get")
 	<-v.Gate
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index a7ad6f9..2ffa8fa 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -3,6 +3,7 @@
 package main
 
 import (
+	"bytes"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -57,35 +58,49 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	}
 }
 
+// Open the given file, apply the serialize lock if enabled, and call
+// the given function if and when the file is ready to read.
+func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+	f, err := os.Open(path)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
+	return fn(f)
+}
+
+// stat is os.Stat() with some extra sanity checks.
+func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+	stat, err := os.Stat(path)
+	if err == nil {
+		if stat.Size() < 0 {
+			err = os.ErrInvalid
+		} else if stat.Size() > BLOCKSIZE {
+			err = TooLongError
+		}
+	}
+	return stat, err
+}
+
 // Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be found, opened, or read, Get returns a nil
-// slice and whatever non-nil error was returned by Stat or ReadFile.
+// Get returns a nil buffer IFF it returns a non-nil error.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	path := v.blockPath(loc)
-	stat, err := os.Stat(path)
+	stat, err := v.stat(path)
 	if err != nil {
 		return nil, err
 	}
-	if stat.Size() < 0 {
-		return nil, os.ErrInvalid
-	} else if stat.Size() == 0 {
-		return bufs.Get(0), nil
-	} else if stat.Size() > BLOCKSIZE {
-		return nil, TooLongError
-	}
-	f, err := os.Open(path)
-	if err != nil {
-		return nil, err
-	}
-	defer f.Close()
 	buf := bufs.Get(int(stat.Size()))
-	if v.serialize {
-		v.mutex.Lock()
-		defer v.mutex.Unlock()
-	}
-	_, err = io.ReadFull(f, buf)
+	err = v.getFunc(path, func(rdr io.Reader) error {
+		_, err = io.ReadFull(rdr, buf)
+		return err
+	})
 	if err != nil {
 		bufs.Put(buf)
 		return nil, err
@@ -93,6 +108,52 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	return buf, nil
 }
 
+// Compare returns nil if Get(loc) would return the same content as
+// cmp. It is functionally equivalent to Get() followed by
+// bytes.Compare(), but uses less memory.
+//
+// TODO(TC): Before returning CollisionError, compute the MD5 digest
+// of the data on disk (i.e., known-to-be-equal data in cmp +
+// remaining data on disk) and return DiskHashError instead of
+// CollisionError if it doesn't equal loc[:32].
+func (v *UnixVolume) Compare(loc string, cmp []byte) error {
+	path := v.blockPath(loc)
+	stat, err := v.stat(path)
+	if err != nil {
+		return err
+	}
+	bufLen := 1 << 20
+	if int64(bufLen) > stat.Size() {
+		bufLen = int(stat.Size())
+	}
+	buf := make([]byte, bufLen)
+	return v.getFunc(path, func(rdr io.Reader) error {
+		// Loop invariants: all data read so far matched what
+		// we expected, and the first N bytes of cmp are
+		// expected to equal the next N bytes read from
+		// reader.
+		for {
+			n, err := rdr.Read(buf)
+			if n > len(cmp) {
+				// file on disk is too long
+				return CollisionError
+			} else if n > 0 && bytes.Compare(cmp[:n], buf[:n]) != 0 {
+				return CollisionError
+			}
+			cmp = cmp[n:]
+			if err == io.EOF {
+				if len(cmp) != 0 {
+					// file on disk is too short
+					return CollisionError
+				}
+				return nil
+			} else if err != nil {
+				return err
+			}
+		}
+	})
+}
+
 // Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index ebb8421..6ccc865 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -2,7 +2,9 @@ package main
 
 import (
 	"bytes"
+	"errors"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"os"
 	"regexp"
@@ -385,3 +387,83 @@ func TestNodeStatus(t *testing.T) {
 		t.Errorf("uninitialized bytes_used in %v", volinfo)
 	}
 }
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	v.Put(TEST_HASH, TEST_BLOCK)
+	mockErr := errors.New("Mock error")
+	err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		return mockErr
+	})
+	if err != mockErr {
+		t.Errorf("Got %v, expected %v", err, mockErr)
+	}
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	funcCalled := false
+	err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		funcCalled = true
+		return nil
+	})
+	if err == nil {
+		t.Errorf("Expected error opening non-existent file")
+	}
+	if funcCalled {
+		t.Errorf("Worker func should not have been called")
+	}
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+	v := TempUnixVolume(t, true, false)
+	defer _teardown(v)
+
+	v.mutex.Lock()
+	locked := true
+	go func() {
+		// TODO(TC): Don't rely on Sleep. Mock the mutex instead?
+		time.Sleep(10 * time.Millisecond)
+		locked = false
+		v.mutex.Unlock()
+	}()
+	v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+		if locked {
+			t.Errorf("Worker func called before serialize lock was obtained")
+		}
+		return nil
+	})
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+	v := TempUnixVolume(t, false, false)
+	defer _teardown(v)
+
+	v.Put(TEST_HASH, TEST_BLOCK)
+	err := v.Compare(TEST_HASH, TEST_BLOCK)
+	if err != nil {
+		t.Errorf("Got err %q, expected nil", err)
+	}
+
+	err = v.Compare(TEST_HASH, []byte("baddata"))
+	if err != CollisionError {
+		t.Errorf("Got err %q, expected %q", err, CollisionError)
+	}
+
+	_store(t, v, TEST_HASH, []byte("baddata"))
+	err = v.Compare(TEST_HASH, TEST_BLOCK)
+	if err != DiskHashError {
+		t.Errorf("Got err %q, expected %q", err, DiskHashError)
+	}
+
+	p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+	os.Chmod(p, 000)
+	err = v.Compare(TEST_HASH, TEST_BLOCK)
+	if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+		t.Errorf("Got err %q, expected %q", err, "permission denied")
+	}
+}

commit a3cda4b279d7446b55caab3a7ca8aef5960204cc
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Sep 2 23:19:23 2015 -0400

    7121: Add test case to demonstrate deadlock.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 8be4710..a656ecf 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -807,6 +807,39 @@ func ExpectBody(
 	}
 }
 
+// See #7121
+func TestPutNeedsOnlyOneBuffer(t *testing.T) {
+	defer teardown()
+	KeepVM = MakeTestVolumeManager(1)
+	defer KeepVM.Close()
+
+	defer func(orig *bufferPool) {
+		bufs = orig
+	}(bufs)
+	bufs = newBufferPool(1, BLOCKSIZE)
+
+	ok := make(chan struct{})
+	go func() {
+		for i := 0; i < 2; i++ {
+			response := IssueRequest(
+				&RequestTester{
+					method:       "PUT",
+					uri:          "/" + TEST_HASH,
+					request_body: TEST_BLOCK,
+				})
+			ExpectStatusCode(t,
+				"TestPutNeedsOnlyOneBuffer", http.StatusOK, response)
+		}
+		ok <- struct{}{}
+	}()
+
+	select {
+	case <-ok:
+	case <-time.After(time.Second):
+		t.Fatal("PUT deadlocks with maxBuffers==1")
+	}
+}
+
 // Invoke the PutBlockHandler a bunch of times to test for bufferpool resource
 // leak.
 func TestPutHandlerNoBufferleak(t *testing.T) {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list