[ARVADOS] updated: dbce533271fca5a106ab8a00ad2621177445131f

git at public.curoverse.com git at public.curoverse.com
Thu Apr 24 18:07:00 EDT 2014


Summary of changes:
 services/keep/src/keep/keep.go        |   13 ++-
 services/keep/src/keep/keep_test.go   |    2 +-
 services/keep/src/keep/volume.go      |   83 +++++++++++++++++-
 services/keep/src/keep/volume_test.go |  149 +++++++++++++++++++++++++++++----
 4 files changed, 223 insertions(+), 24 deletions(-)

       via  dbce533271fca5a106ab8a00ad2621177445131f (commit)
      from  78b5bfcb1fa832e4460cfaa41f7815ce1d07a9a4 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit dbce533271fca5a106ab8a00ad2621177445131f
Author: Tim Pierce <twp at curoverse.com>
Date:   Thu Apr 24 18:02:49 2014 -0400

    Added -serialize flag.
    
    Added IORequest and IOResponse types for communicating I/O
    requests over channels with a goroutine.
    
    New IOHandler method on UnixVolume. IOHandler receives requests on a
    command channel, handles them, and delivers responses. Whenever a
    UnixVolume is created with a non-nil queue, an IOHandler must be started
    to handle its requests.
    
    UnixVolume methods Get and Put now handle external I/O requests.  These
    methods serialize I/O requests if serialization is enabled for that
    volume; otherwise they call Read and Write directly.
    
    New unit tests: TestGetSerialized and TestPutSerialized.
    
    Refs #2620.

diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index 1a70a3f..9e0dfec 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -87,10 +87,13 @@ func main() {
 	//    directories.
 
 	var listen, volumearg string
+	var serialize_io bool
 	flag.StringVar(&listen, "listen", DEFAULT_ADDR,
 		"interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
 	flag.StringVar(&volumearg, "volumes", "",
 		"Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+	flag.BoolVar(&serialize_io, "serialize-io", false,
+		"If set, all read and write operations on local Keep volumes will be serialized.")
 	flag.Parse()
 
 	// Look for local keep volumes.
@@ -109,7 +112,11 @@ func main() {
 	for _, v := range keepvols {
 		if _, err := os.Stat(v); err == nil {
 			log.Println("adding Keep volume:", v)
-			KeepVolumes = append(KeepVolumes, &UnixVolume{v})
+			newvol := &UnixVolume{v, nil}
+			if serialize_io {
+				newvol.queue = make(chan *IORequest)
+			}
+			KeepVolumes = append(KeepVolumes, newvol)
 		} else {
 			log.Printf("bad Keep volume: %s\n", err)
 		}
@@ -309,7 +316,7 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	for _, vol := range KeepVolumes {
-		if buf, err := vol.Read(hash); err != nil {
+		if buf, err := vol.Get(hash); err != nil {
 			// IsNotExist is an expected error and may be ignored.
 			// (If all volumes report IsNotExist, we return a NotFoundError)
 			// A CorruptError should be returned immediately.
@@ -394,7 +401,7 @@ func PutBlock(block []byte, hash string) error {
 	// Store the block on the first available Keep volume.
 	allFull := true
 	for _, vol := range KeepVolumes {
-		err := vol.Write(hash, block)
+		err := vol.Put(hash, block)
 		if err == nil {
 			return nil // success!
 		}
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index d1048f7..6c82920 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -369,7 +369,7 @@ func setup(t *testing.T, num_volumes int) []Volume {
 	for i := range vols {
 		if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
 			root := dir + "/keep"
-			vols[i] = &UnixVolume{root}
+			vols[i] = &UnixVolume{root, nil}
 			os.Mkdir(root, 0755)
 		} else {
 			t.Fatal(err)
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index 9cd6231..425fcfd 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -19,19 +19,88 @@ import (
 )
 
 type Volume interface {
-	Read(loc string) ([]byte, error)
-	Write(loc string, block []byte) error
+	Get(loc string) ([]byte, error)
+	Put(loc string, block []byte) error
 	Index(prefix string) string
 	Status() *VolumeStatus
 	String() string
 }
 
+// IORequests are encapsulated requests to perform I/O on a Keep volume.
+// When running in serialized mode, the Keep front end sends IORequests
+// on a channel to an IORunner, which handles them one at a time and
+// returns an IOResponse.
+//
+type IOMethod int
+
+const (
+	KeepGet IOMethod = iota
+	KeepPut
+)
+
+type IORequest struct {
+	method IOMethod
+	loc    string
+	data   []byte
+	reply  chan *IOResponse
+}
+
+type IOResponse struct {
+	data []byte
+	err  error
+}
+
+// A UnixVolume is configured with:
+//
+// * root: the path to the volume's root directory
+// * queue: if non-nil, all I/O requests for this volume should be queued
+//   on this channel. The response will be delivered on the IOResponse
+//   channel included in the request.
+//
 type UnixVolume struct {
-	root string // path to this volume
+	root  string // path to this volume
+	queue chan *IORequest
 }
 
-func (v *UnixVolume) String() string {
-	return fmt.Sprintf("[UnixVolume %s]", v.root)
+func (v *UnixVolume) IOHandler() {
+	for req := range v.queue {
+		var result IOResponse
+		switch req.method {
+		case KeepGet:
+			result.data, result.err = v.Read(req.loc)
+		case KeepPut:
+			result.err = v.Write(req.loc, req.data)
+		}
+		req.reply <- &result
+	}
+}
+
+func MakeUnixVolume(root string, queue chan *IORequest) UnixVolume {
+	v := UnixVolume{root, queue}
+	if queue != nil {
+		go v.IOHandler()
+	}
+	return v
+}
+
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+	if v.queue == nil {
+		return v.Read(loc)
+	}
+	reply := make(chan *IOResponse)
+	v.queue <- &IORequest{KeepGet, loc, nil, reply}
+	response := <-reply
+	return response.data, response.err
+}
+
+func (v *UnixVolume) Put(loc string, block []byte) error {
+	if v.queue == nil {
+		return v.Write(loc, block)
+	}
+	reply := make(chan *IOResponse)
+	v.queue <- &IORequest{KeepPut, loc, block, reply}
+	response := <-reply
+	return response.err
 }
 
 // Read retrieves a block identified by the locator string "loc", and
@@ -230,3 +299,7 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 	}
 	return
 }
+
+func (v *UnixVolume) String() string {
+	return fmt.Sprintf("[UnixVolume %s]", v.root)
+}
diff --git a/services/keep/src/keep/volume_test.go b/services/keep/src/keep/volume_test.go
index 9248627..e1c628e 100644
--- a/services/keep/src/keep/volume_test.go
+++ b/services/keep/src/keep/volume_test.go
@@ -9,15 +9,18 @@ import (
 	"time"
 )
 
-func TempUnixVolume(t *testing.T) UnixVolume {
+func TempUnixVolume(t *testing.T, queue chan *IORequest) UnixVolume {
 	d, err := ioutil.TempDir("", "volume_test")
 	if err != nil {
 		t.Fatal(err)
 	}
-	return UnixVolume{d}
+	return MakeUnixVolume(d, queue)
 }
 
 func _teardown(v UnixVolume) {
+	if v.queue != nil {
+		close(v.queue)
+	}
 	os.RemoveAll(v.root)
 }
 
@@ -39,12 +42,12 @@ func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
 	}
 }
 
-func TestRead(t *testing.T) {
-	v := TempUnixVolume(t)
+func TestGet(t *testing.T) {
+	v := TempUnixVolume(t, nil)
 	defer _teardown(v)
 	_store(t, v, TEST_HASH, TEST_BLOCK)
 
-	buf, err := v.Read(TEST_HASH)
+	buf, err := v.Get(TEST_HASH)
 	if err != nil {
 		t.Error(err)
 	}
@@ -53,12 +56,12 @@ func TestRead(t *testing.T) {
 	}
 }
 
-func TestReadNotFound(t *testing.T) {
-	v := TempUnixVolume(t)
+func TestGetNotFound(t *testing.T) {
+	v := TempUnixVolume(t, nil)
 	defer _teardown(v)
 	_store(t, v, TEST_HASH, TEST_BLOCK)
 
-	buf, err := v.Read(TEST_HASH_2)
+	buf, err := v.Get(TEST_HASH_2)
 	switch {
 	case os.IsNotExist(err):
 		break
@@ -69,11 +72,11 @@ func TestReadNotFound(t *testing.T) {
 	}
 }
 
-func TestWrite(t *testing.T) {
-	v := TempUnixVolume(t)
+func TestPut(t *testing.T) {
+	v := TempUnixVolume(t, nil)
 	defer _teardown(v)
 
-	err := v.Write(TEST_HASH, TEST_BLOCK)
+	err := v.Put(TEST_HASH, TEST_BLOCK)
 	if err != nil {
 		t.Error(err)
 	}
@@ -86,19 +89,135 @@ func TestWrite(t *testing.T) {
 	}
 }
 
-func TestWriteBadVolume(t *testing.T) {
-	v := TempUnixVolume(t)
+func TestPutBadVolume(t *testing.T) {
+	v := TempUnixVolume(t, nil)
 	defer _teardown(v)
 
 	os.Chmod(v.root, 000)
-	err := v.Write(TEST_HASH, TEST_BLOCK)
+	err := v.Put(TEST_HASH, TEST_BLOCK)
 	if err == nil {
 		t.Error("Write should have failed")
 	}
 }
 
+// Serialization tests.
+//
+// TODO(twp): a proper test of I/O serialization requires that
+// a second request start while the first one is still executing.
+// Doing this correctly requires some tricky synchronization.
+// For now we'll just launch a bunch of requests in goroutines
+// and demonstrate that they return accurate results.
+//
+func TestGetSerialized(t *testing.T) {
+	v := TempUnixVolume(t, make(chan *IORequest))
+	defer _teardown(v)
+
+	_store(t, v, TEST_HASH, TEST_BLOCK)
+	_store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+	_store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+	sem := make(chan int)
+	go func(sem chan int) {
+		buf, err := v.Get(TEST_HASH)
+		if err != nil {
+			t.Errorf("err1: %v", err)
+		}
+		if bytes.Compare(buf, TEST_BLOCK) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
+		}
+		sem <- 1
+	}(sem)
+
+	go func(sem chan int) {
+		buf, err := v.Get(TEST_HASH_2)
+		if err != nil {
+			t.Errorf("err2: %v", err)
+		}
+		if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
+		}
+		sem <- 1
+	}(sem)
+
+	go func(sem chan int) {
+		buf, err := v.Get(TEST_HASH_3)
+		if err != nil {
+			t.Errorf("err3: %v", err)
+		}
+		if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+			t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
+		}
+		sem <- 1
+	}(sem)
+
+	// Wait for all goroutines to finish
+	for done := 0; done < 2; {
+		done += <-sem
+	}
+}
+
+func TestPutSerialized(t *testing.T) {
+	v := TempUnixVolume(t, make(chan *IORequest))
+	defer _teardown(v)
+
+	sem := make(chan int)
+	go func(sem chan int) {
+		err := v.Put(TEST_HASH, TEST_BLOCK)
+		if err != nil {
+			t.Errorf("err1: %v", err)
+		}
+		sem <- 1
+	}(sem)
+
+	go func(sem chan int) {
+		err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
+		if err != nil {
+			t.Errorf("err2: %v", err)
+		}
+		sem <- 1
+	}(sem)
+
+	go func(sem chan int) {
+		err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
+		if err != nil {
+			t.Errorf("err3: %v", err)
+		}
+		sem <- 1
+	}(sem)
+
+	// Wait for all goroutines to finish
+	for done := 0; done < 2; {
+		done += <-sem
+	}
+
+	// Double check that we actually wrote the blocks we expected to write.
+	buf, err := v.Get(TEST_HASH)
+	if err != nil {
+		t.Errorf("Get #1: %v", err)
+	}
+	if bytes.Compare(buf, TEST_BLOCK) != 0 {
+		t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
+	}
+
+	buf, err = v.Get(TEST_HASH_2)
+	if err != nil {
+		t.Errorf("Get #2: %v", err)
+	}
+	if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+		t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
+	}
+
+	buf, err = v.Get(TEST_HASH_3)
+	if err != nil {
+		t.Errorf("Get #3: %v", err)
+	}
+	if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+		t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+	}
+}
+
 func TestIsFull(t *testing.T) {
-	v := TempUnixVolume(t)
+	v := TempUnixVolume(t, nil)
 	defer _teardown(v)
 
 	full_path := v.root + "/full"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list