[ARVADOS] updated: dbce533271fca5a106ab8a00ad2621177445131f
git at public.curoverse.com
git at public.curoverse.com
Thu Apr 24 18:07:00 EDT 2014
Summary of changes:
services/keep/src/keep/keep.go | 13 ++-
services/keep/src/keep/keep_test.go | 2 +-
services/keep/src/keep/volume.go | 83 +++++++++++++++++-
services/keep/src/keep/volume_test.go | 149 +++++++++++++++++++++++++++++----
4 files changed, 223 insertions(+), 24 deletions(-)
via dbce533271fca5a106ab8a00ad2621177445131f (commit)
from 78b5bfcb1fa832e4460cfaa41f7815ce1d07a9a4 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit dbce533271fca5a106ab8a00ad2621177445131f
Author: Tim Pierce <twp at curoverse.com>
Date: Thu Apr 24 18:02:49 2014 -0400
Added -serialize flag.
Added IORequest and IOResponse types for communicating I/O
requests over channels with a goroutine.
New IOHandler method on UnixVolume. IOHandler receives requests on a
command channel, handles them, and delivers responses. Whenever a
UnixVolume is created with a non-nil queue, an IOHandler must be started
to handle its requests.
UnixVolume methods Get and Put now handle external I/O requests. These
methods serialize I/O requests if serialization is enabled for that
volume; otherwise they call Read and Write directly.
New unit tests: TestGetSerialized and TestPutSerialized.
Refs #2620.
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index 1a70a3f..9e0dfec 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -87,10 +87,13 @@ func main() {
// directories.
var listen, volumearg string
+ var serialize_io bool
flag.StringVar(&listen, "listen", DEFAULT_ADDR,
"interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
flag.StringVar(&volumearg, "volumes", "",
"Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+ flag.BoolVar(&serialize_io, "serialize-io", false,
+ "If set, all read and write operations on local Keep volumes will be serialized.")
flag.Parse()
// Look for local keep volumes.
@@ -109,7 +112,11 @@ func main() {
for _, v := range keepvols {
if _, err := os.Stat(v); err == nil {
log.Println("adding Keep volume:", v)
- KeepVolumes = append(KeepVolumes, &UnixVolume{v})
+ newvol := &UnixVolume{v, nil}
+ if serialize_io {
+ newvol.queue = make(chan *IORequest)
+ }
+ KeepVolumes = append(KeepVolumes, newvol)
} else {
log.Printf("bad Keep volume: %s\n", err)
}
@@ -309,7 +316,7 @@ func GetVolumeStatus(volume string) *VolumeStatus {
func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
for _, vol := range KeepVolumes {
- if buf, err := vol.Read(hash); err != nil {
+ if buf, err := vol.Get(hash); err != nil {
// IsNotExist is an expected error and may be ignored.
// (If all volumes report IsNotExist, we return a NotFoundError)
// A CorruptError should be returned immediately.
@@ -394,7 +401,7 @@ func PutBlock(block []byte, hash string) error {
// Store the block on the first available Keep volume.
allFull := true
for _, vol := range KeepVolumes {
- err := vol.Write(hash, block)
+ err := vol.Put(hash, block)
if err == nil {
return nil // success!
}
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index d1048f7..6c82920 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -369,7 +369,7 @@ func setup(t *testing.T, num_volumes int) []Volume {
for i := range vols {
if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
root := dir + "/keep"
- vols[i] = &UnixVolume{root}
+ vols[i] = &UnixVolume{root, nil}
os.Mkdir(root, 0755)
} else {
t.Fatal(err)
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index 9cd6231..425fcfd 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -19,19 +19,88 @@ import (
)
type Volume interface {
- Read(loc string) ([]byte, error)
- Write(loc string, block []byte) error
+ Get(loc string) ([]byte, error)
+ Put(loc string, block []byte) error
Index(prefix string) string
Status() *VolumeStatus
String() string
}
+// IORequests are encapsulated requests to perform I/O on a Keep volume.
+// When running in serialized mode, the Keep front end sends IORequests
+// on a channel to an IORunner, which handles them one at a time and
+// returns an IOResponse.
+//
+type IOMethod int
+
+const (
+ KeepGet IOMethod = iota
+ KeepPut
+)
+
+type IORequest struct {
+ method IOMethod
+ loc string
+ data []byte
+ reply chan *IOResponse
+}
+
+type IOResponse struct {
+ data []byte
+ err error
+}
+
+// A UnixVolume is configured with:
+//
+// * root: the path to the volume's root directory
+// * queue: if non-nil, all I/O requests for this volume should be queued
+// on this channel. The response will be delivered on the IOResponse
+// channel included in the request.
+//
type UnixVolume struct {
- root string // path to this volume
+ root string // path to this volume
+ queue chan *IORequest
}
-func (v *UnixVolume) String() string {
- return fmt.Sprintf("[UnixVolume %s]", v.root)
+func (v *UnixVolume) IOHandler() {
+ for req := range v.queue {
+ var result IOResponse
+ switch req.method {
+ case KeepGet:
+ result.data, result.err = v.Read(req.loc)
+ case KeepPut:
+ result.err = v.Write(req.loc, req.data)
+ }
+ req.reply <- &result
+ }
+}
+
+func MakeUnixVolume(root string, queue chan *IORequest) UnixVolume {
+ v := UnixVolume{root, queue}
+ if queue != nil {
+ go v.IOHandler()
+ }
+ return v
+}
+
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+ if v.queue == nil {
+ return v.Read(loc)
+ }
+ reply := make(chan *IOResponse)
+ v.queue <- &IORequest{KeepGet, loc, nil, reply}
+ response := <-reply
+ return response.data, response.err
+}
+
+func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.queue == nil {
+ return v.Write(loc, block)
+ }
+ reply := make(chan *IOResponse)
+ v.queue <- &IORequest{KeepPut, loc, block, reply}
+ response := <-reply
+ return response.err
}
// Read retrieves a block identified by the locator string "loc", and
@@ -230,3 +299,7 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
}
return
}
+
+func (v *UnixVolume) String() string {
+ return fmt.Sprintf("[UnixVolume %s]", v.root)
+}
diff --git a/services/keep/src/keep/volume_test.go b/services/keep/src/keep/volume_test.go
index 9248627..e1c628e 100644
--- a/services/keep/src/keep/volume_test.go
+++ b/services/keep/src/keep/volume_test.go
@@ -9,15 +9,18 @@ import (
"time"
)
-func TempUnixVolume(t *testing.T) UnixVolume {
+func TempUnixVolume(t *testing.T, queue chan *IORequest) UnixVolume {
d, err := ioutil.TempDir("", "volume_test")
if err != nil {
t.Fatal(err)
}
- return UnixVolume{d}
+ return MakeUnixVolume(d, queue)
}
func _teardown(v UnixVolume) {
+ if v.queue != nil {
+ close(v.queue)
+ }
os.RemoveAll(v.root)
}
@@ -39,12 +42,12 @@ func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
}
}
-func TestRead(t *testing.T) {
- v := TempUnixVolume(t)
+func TestGet(t *testing.T) {
+ v := TempUnixVolume(t, nil)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
- buf, err := v.Read(TEST_HASH)
+ buf, err := v.Get(TEST_HASH)
if err != nil {
t.Error(err)
}
@@ -53,12 +56,12 @@ func TestRead(t *testing.T) {
}
}
-func TestReadNotFound(t *testing.T) {
- v := TempUnixVolume(t)
+func TestGetNotFound(t *testing.T) {
+ v := TempUnixVolume(t, nil)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
- buf, err := v.Read(TEST_HASH_2)
+ buf, err := v.Get(TEST_HASH_2)
switch {
case os.IsNotExist(err):
break
@@ -69,11 +72,11 @@ func TestReadNotFound(t *testing.T) {
}
}
-func TestWrite(t *testing.T) {
- v := TempUnixVolume(t)
+func TestPut(t *testing.T) {
+ v := TempUnixVolume(t, nil)
defer _teardown(v)
- err := v.Write(TEST_HASH, TEST_BLOCK)
+ err := v.Put(TEST_HASH, TEST_BLOCK)
if err != nil {
t.Error(err)
}
@@ -86,19 +89,135 @@ func TestWrite(t *testing.T) {
}
}
-func TestWriteBadVolume(t *testing.T) {
- v := TempUnixVolume(t)
+func TestPutBadVolume(t *testing.T) {
+ v := TempUnixVolume(t, nil)
defer _teardown(v)
os.Chmod(v.root, 000)
- err := v.Write(TEST_HASH, TEST_BLOCK)
+ err := v.Put(TEST_HASH, TEST_BLOCK)
if err == nil {
t.Error("Write should have failed")
}
}
+// Serialization tests.
+//
+// TODO(twp): a proper test of I/O serialization requires that
+// a second request start while the first one is still executing.
+// Doing this correctly requires some tricky synchronization.
+// For now we'll just launch a bunch of requests in goroutines
+// and demonstrate that they return accurate results.
+//
+func TestGetSerialized(t *testing.T) {
+ v := TempUnixVolume(t, make(chan *IORequest))
+ defer _teardown(v)
+
+ _store(t, v, TEST_HASH, TEST_BLOCK)
+ _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+ _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH_2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ buf, err := v.Get(TEST_HASH_3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+ t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 2; {
+ done += <-sem
+ }
+}
+
+func TestPutSerialized(t *testing.T) {
+ v := TempUnixVolume(t, make(chan *IORequest))
+ defer _teardown(v)
+
+ sem := make(chan int)
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err != nil {
+ t.Errorf("err1: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
+ if err != nil {
+ t.Errorf("err2: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ go func(sem chan int) {
+ err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
+ if err != nil {
+ t.Errorf("err3: %v", err)
+ }
+ sem <- 1
+ }(sem)
+
+ // Wait for all goroutines to finish
+ for done := 0; done < 2; {
+ done += <-sem
+ }
+
+ // Double check that we actually wrote the blocks we expected to write.
+ buf, err := v.Get(TEST_HASH)
+ if err != nil {
+ t.Errorf("Get #1: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK) != 0 {
+ t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
+ }
+
+ buf, err = v.Get(TEST_HASH_2)
+ if err != nil {
+ t.Errorf("Get #2: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+ t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
+ }
+
+ buf, err = v.Get(TEST_HASH_3)
+ if err != nil {
+ t.Errorf("Get #3: %v", err)
+ }
+ if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+ t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+ }
+}
+
func TestIsFull(t *testing.T) {
- v := TempUnixVolume(t)
+ v := TempUnixVolume(t, nil)
defer _teardown(v)
full_path := v.root + "/full"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list