[ARVADOS] updated: 94e1489066f1038f0b4bc95a45af2bedc4b54be4
git at public.curoverse.com
git at public.curoverse.com
Thu Sep 3 15:18:44 EDT 2015
Summary of changes:
services/keepstore/keepstore.go | 11 +++++--
services/keepstore/volume_unix.go | 38 +++++++++++++------------
services/keepstore/volume_unix_test.go | 52 +++++++++++++++++++++++-----------
3 files changed, 64 insertions(+), 37 deletions(-)
via 94e1489066f1038f0b4bc95a45af2bedc4b54be4 (commit)
from 799fabc9d0da8a165fe60a6c10dd207e281c5b5d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 94e1489066f1038f0b4bc95a45af2bedc4b54be4
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Sep 3 15:15:32 2015 -0400
7121: Test mutex usage with a mock instead of time.Sleep.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 3dfdce2..53cf7be 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -14,6 +14,7 @@ import (
"os"
"os/signal"
"strings"
+ "sync"
"syscall"
"time"
)
@@ -132,10 +133,14 @@ func (vs *volumeSet) Set(value string) error {
if _, err := os.Stat(value); err != nil {
return err
}
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
*vs = append(*vs, &UnixVolume{
- root: value,
- serialize: flagSerializeIO,
- readonly: flagReadonly,
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
})
return nil
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 368ddc5..f91861a 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -18,10 +18,12 @@ import (
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to the volume's root directory
- serialize bool
- readonly bool
- mutex sync.Mutex
+ // path to the volume's root directory
+ root string
+ // something to lock during IO, typically a sync.Mutex (or nil
+ // to skip locking)
+ locker sync.Locker
+ readonly bool
}
func (v *UnixVolume) Touch(loc string) error {
@@ -34,9 +36,9 @@ func (v *UnixVolume) Touch(loc string) error {
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if e := lockfile(f); e != nil {
return e
@@ -56,17 +58,17 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
}
}
-// Open the given file, apply the serialize lock if enabled, and call
-// the given function if and when the file is ready to read.
+// Open the given file, lock the "serialize" locker if enabled, and
+// call the given function if and when the file is ready to read.
func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
return fn(f)
}
@@ -169,9 +171,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
}
bpath := v.blockPath(loc)
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
@@ -298,9 +300,9 @@ func (v *UnixVolume) Delete(loc string) error {
if v.readonly {
return MethodDisabledError
}
- if v.serialize {
- v.mutex.Lock()
- defer v.mutex.Unlock()
+ if v.locker != nil {
+ v.locker.Lock()
+ defer v.locker.Unlock()
}
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 6ccc865..08ca31c 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -10,6 +10,7 @@ import (
"regexp"
"sort"
"strings"
+ "sync"
"syscall"
"testing"
"time"
@@ -20,10 +21,14 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
if err != nil {
t.Fatal(err)
}
+ var locker sync.Locker
+ if serialize {
+ locker = &sync.Mutex{}
+ }
return &UnixVolume{
- root: d,
- serialize: serialize,
- readonly: readonly,
+ root: d,
+ locker: locker,
+ readonly: readonly,
}
}
@@ -420,23 +425,38 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
}
func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
- v := TempUnixVolume(t, true, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
- v.mutex.Lock()
- locked := true
- go func() {
- // TODO(TC): Don't rely on Sleep. Mock the mutex instead?
- time.Sleep(10 * time.Millisecond)
- locked = false
- v.mutex.Unlock()
- }()
- v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
- if locked {
- t.Errorf("Worker func called before serialize lock was obtained")
- }
+ v.Put(TEST_HASH, TEST_BLOCK)
+
+ mtx := NewMockMutex()
+ v.locker = mtx
+
+ funcCalled := make(chan struct{})
+ go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+ funcCalled <- struct{}{}
return nil
})
+ select {
+ case mtx.AllowLock <- struct{}{}:
+ case <-funcCalled:
+ t.Fatal("Function was called before mutex was acquired")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out before mutex was acquired")
+ }
+ select {
+ case <-funcCalled:
+ case mtx.AllowUnlock <- struct{}{}:
+ t.Fatal("Mutex was released before function was called")
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for funcCalled")
+ }
+ select {
+ case mtx.AllowUnlock <- struct{}{}:
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timed out waiting for getFunc() to release mutex")
+ }
}
func TestUnixVolumeCompare(t *testing.T) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list