[ARVADOS] updated: 57b46f9b292e561739fde3ea098c366db8f4bc23

git at public.curoverse.com git at public.curoverse.com
Wed Apr 30 13:49:27 EDT 2014


Summary of changes:
 services/keep/src/keep/keep.go      |   65 +++++++++++++++-----------
 services/keep/src/keep/keep_test.go |   86 ++++++++++++++++++++++-------------
 services/keep/src/keep/volume.go    |   56 +++++++++++++++++++++++
 3 files changed, 147 insertions(+), 60 deletions(-)

       via  57b46f9b292e561739fde3ea098c366db8f4bc23 (commit)
      from  30f834eef8916c0d613c098a4897ac932a2e0b37 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 57b46f9b292e561739fde3ea098c366db8f4bc23
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Apr 30 13:49:08 2014 -0400

    Replaced KeepVolumes with a VolumeManager interface.
    
    A VolumeManager interface may be used to set policy for writing to a set
    of Keep volumes. The sole implementation at present is RRVolumeManager,
    which implements simple round-robin scheduling; other implementations
    could include a manager which selects the least-loaded disk first, the
    fastest one, the one with the fewest pending writes, etc. etc.
    
    Refs #2620

diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index c619f92..e621955 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -37,8 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
 
 var PROC_MOUNTS = "/proc/mounts"
 
-// KeepVolumes is a slice of volumes on which blocks can be stored.
-var KeepVolumes []Volume
+// The Keep VolumeManager maintains a list of available volumes.
+var KeepVM VolumeManager
 
 // ==========
 // Error types.
@@ -108,21 +108,24 @@ func main() {
 	}
 
 	// Check that the specified volumes actually exist.
-	KeepVolumes = []Volume(nil)
+	var goodvols []Volume = nil
 	for _, v := range keepvols {
 		if _, err := os.Stat(v); err == nil {
 			log.Println("adding Keep volume:", v)
 			newvol := MakeUnixVolume(v, serialize_io)
-			KeepVolumes = append(KeepVolumes, &newvol)
+			goodvols = append(goodvols, &newvol)
 		} else {
 			log.Printf("bad Keep volume: %s\n", err)
 		}
 	}
 
-	if len(KeepVolumes) == 0 {
+	if len(goodvols) == 0 {
 		log.Fatal("could not find any keep volumes")
 	}
 
+	// Start a round-robin VolumeManager with the volumes we have found.
+	KeepVM = MakeRRVolumeManager(goodvols)
+
 	// Set up REST handlers.
 	//
 	// Start with a router that will route each URL path to an
@@ -229,7 +232,7 @@ func IndexHandler(w http.ResponseWriter, req *http.Request) {
 	prefix := mux.Vars(req)["prefix"]
 
 	var index string
-	for _, vol := range KeepVolumes {
+	for _, vol := range KeepVM.Volumes() {
 		index = index + vol.Index(prefix)
 	}
 	w.Write([]byte(index))
@@ -276,8 +279,8 @@ func StatusHandler(w http.ResponseWriter, req *http.Request) {
 func GetNodeStatus() *NodeStatus {
 	st := new(NodeStatus)
 
-	st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
-	for i, vol := range KeepVolumes {
+	st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
+	for i, vol := range KeepVM.Volumes() {
 		st.Volumes[i] = vol.Status()
 	}
 	return st
@@ -312,7 +315,7 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 
 func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
-	for _, vol := range KeepVolumes {
+	for _, vol := range KeepVM.Volumes() {
 		if buf, err := vol.Get(hash); err != nil {
 			// IsNotExist is an expected error and may be ignored.
 			// (If all volumes report IsNotExist, we return a NotFoundError)
@@ -395,27 +398,33 @@ func PutBlock(block []byte, hash string) error {
 		}
 	}
 
-	// Store the block on the first available Keep volume.
-	allFull := true
-	for _, vol := range KeepVolumes {
-		err := vol.Put(hash, block)
-		if err == nil {
-			return nil // success!
-		}
-		if err != FullError {
-			// The volume is not full but the write did not succeed.
-			// Report the error and continue trying.
-			allFull = false
-			log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+	// Choose a Keep volume to write to.
+	// If this volume fails, try all of the volumes in order.
+	vol := KeepVM.Choose()
+	if err := vol.Put(hash, block); err == nil {
+		return nil // success!
+	} else {
+		allFull := true
+		for _, vol := range KeepVM.Volumes() {
+			err := vol.Put(hash, block)
+			if err == nil {
+				return nil // success!
+			}
+			if err != FullError {
+				// The volume is not full but the write did not succeed.
+				// Report the error and continue trying.
+				allFull = false
+				log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+			}
 		}
-	}
 
-	if allFull {
-		log.Printf("all Keep volumes full")
-		return FullError
-	} else {
-		log.Printf("all Keep volumes failed")
-		return GenericError
+		if allFull {
+			log.Printf("all Keep volumes full")
+			return FullError
+		} else {
+			log.Printf("all Keep volumes failed")
+			return GenericError
+		}
 	}
 }
 
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index 5b7c3c7..30d103d 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -48,8 +48,11 @@ func TestGetBlock(t *testing.T) {
 	defer teardown()
 
 	// Prepare two test Keep volumes. Our block is stored on the second volume.
-	KeepVolumes = setup(2)
-	if err := KeepVolumes[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
 		t.Error(err)
 	}
 
@@ -70,7 +73,8 @@ func TestGetBlockMissing(t *testing.T) {
 	defer teardown()
 
 	// Create two empty test Keep volumes.
-	KeepVolumes = setup(2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
 
 	// Check that GetBlock returns failure.
 	result, err := GetBlock(TEST_HASH)
@@ -87,8 +91,11 @@ func TestGetBlockCorrupt(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes and store a corrupt block in one.
-	KeepVolumes = setup(2)
-	KeepVolumes[0].Put(TEST_HASH, BAD_BLOCK)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	vols[0].Put(TEST_HASH, BAD_BLOCK)
 
 	// Check that GetBlock returns failure.
 	result, err := GetBlock(TEST_HASH)
@@ -108,16 +115,18 @@ func TestPutBlockOK(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes.
-	KeepVolumes = setup(2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
 
 	// Check that PutBlock stores the data as expected.
 	if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
 		t.Fatalf("PutBlock: %v", err)
 	}
 
-	result, err := KeepVolumes[0].Get(TEST_HASH)
+	vols := KeepVM.Volumes()
+	result, err := vols[0].Get(TEST_HASH)
 	if err != nil {
-		t.Fatalf("KeepVolumes[0].Get returned error: %v", err)
+		t.Fatalf("Volume #0 Get returned error: %v", err)
 	}
 	if string(result) != string(TEST_BLOCK) {
 		t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
@@ -133,8 +142,11 @@ func TestPutBlockOneVol(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes, but cripple one of them.
-	KeepVolumes = setup(2)
-	KeepVolumes[0].(*MockVolume).Bad = true
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	vols[0].(*MockVolume).Bad = true
 
 	// Check that PutBlock stores the data as expected.
 	if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -160,7 +172,8 @@ func TestPutBlockMD5Fail(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes.
-	KeepVolumes = setup(2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
 
 	// Check that PutBlock returns the expected error when the hash does
 	// not match the block.
@@ -183,10 +196,12 @@ func TestPutBlockCorrupt(t *testing.T) {
 	defer teardown()
 
 	// Create two test Keep volumes.
-	KeepVolumes = setup(2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
 
 	// Store a corrupted block under TEST_HASH.
-	KeepVolumes[0].Put(TEST_HASH, BAD_BLOCK)
+	vols := KeepVM.Volumes()
+	vols[0].Put(TEST_HASH, BAD_BLOCK)
 	if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
 		t.Errorf("PutBlock: %v", err)
 	}
@@ -212,7 +227,8 @@ func TestPutBlockCollision(t *testing.T) {
 	var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
 
 	// Prepare two test Keep volumes.
-	KeepVolumes = setup(2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
 
 	// Store one block, then attempt to store the other. Confirm that
 	// PutBlock reported a CollisionError.
@@ -314,14 +330,17 @@ func TestIndex(t *testing.T) {
 	// Set up Keep volumes and populate them.
 	// Include multiple blocks on different volumes, and
 	// some metadata files.
-	KeepVolumes = setup(2)
-	KeepVolumes[0].Put(TEST_HASH, TEST_BLOCK)
-	KeepVolumes[1].Put(TEST_HASH_2, TEST_BLOCK_2)
-	KeepVolumes[0].Put(TEST_HASH_3, TEST_BLOCK_3)
-	KeepVolumes[0].Put(TEST_HASH+".meta", []byte("metadata"))
-	KeepVolumes[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
-
-	index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	vols[0].Put(TEST_HASH, TEST_BLOCK)
+	vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+	vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
+	vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
+	vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+
+	index := vols[0].Index("") + vols[1].Index("")
 	expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
 		TEST_HASH_3 + `\+\d+ \d+\n` +
 		TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -346,13 +365,16 @@ func TestNodeStatus(t *testing.T) {
 	defer teardown()
 
 	// Set up test Keep volumes with some blocks.
-	KeepVolumes = setup(2)
-	KeepVolumes[0].Put(TEST_HASH, TEST_BLOCK)
-	KeepVolumes[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+	KeepVM = MakeTestVolumeManager(2)
+	defer func() { KeepVM.Quit() }()
+
+	vols := KeepVM.Volumes()
+	vols[0].Put(TEST_HASH, TEST_BLOCK)
+	vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
 
 	// Get node status and make a basic sanity check.
 	st := GetNodeStatus()
-	for i := range KeepVolumes {
+	for i := range vols {
 		volinfo := st.Volumes[i]
 		mtp := volinfo.MountPoint
 		if mtp != "/bogo" {
@@ -374,21 +396,21 @@ func TestNodeStatus(t *testing.T) {
 // Helper functions for unit tests.
 // ========================================
 
-// setup
-//     Create KeepVolumes for testing.
-//     Returns a slice of pathnames to temporary Keep volumes.
+// MakeTestVolumeManager
+//     Creates and returns a RRVolumeManager with the specified number
+//     of MockVolumes.
 //
-func setup(num_volumes int) []Volume {
+func MakeTestVolumeManager(num_volumes int) VolumeManager {
 	vols := make([]Volume, num_volumes)
 	for i := range vols {
 		vols[i] = CreateMockVolume()
 	}
-	return vols
+	return MakeRRVolumeManager(vols)
 }
 
 // teardown
 //     Cleanup to perform after each test.
 //
 func teardown() {
-	KeepVolumes = nil
+	KeepVM = nil
 }
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index dd4245a..fffc815 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -71,3 +71,59 @@ func (v *MockVolume) Status() *VolumeStatus {
 func (v *MockVolume) String() string {
 	return "[MockVolume]"
 }
+
+// A VolumeManager manages a collection of volumes.
+//
+// - Volumes is a slice of available Volumes.
+// - Choose() returns a Volume suitable for writing to.
+// - Quit() instructs the VolumeManager to shut down gracefully.
+//
+type VolumeManager interface {
+	Volumes() []Volume
+	Choose() Volume
+	Quit()
+}
+
+type RRVolumeManager struct {
+	volumes   []Volume
+	nextwrite chan Volume
+	quit      chan int
+}
+
+func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
+	// Create a new VolumeManager struct with the specified volumes,
+	// and with new Nextwrite and Quit channels.
+	// The Quit channel is buffered with a capacity of 1 so that
+	// another routine may write to it without blocking.
+	vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
+
+	// This goroutine implements round-robin volume selection.
+	// It sends each available Volume in turn to the Nextwrite
+	// channel, until receiving a notification on the Quit channel
+	// that it should terminate.
+	go func() {
+		var i int = 0
+		for {
+			select {
+			case <-vm.quit:
+				return
+			case vm.nextwrite <- vm.volumes[i]:
+				i = (i + 1) % len(vm.volumes)
+			}
+		}
+	}()
+
+	return vm
+}
+
+func (vm *RRVolumeManager) Volumes() []Volume {
+	return vm.volumes
+}
+
+func (vm *RRVolumeManager) Choose() Volume {
+	return <-vm.nextwrite
+}
+
+func (vm *RRVolumeManager) Quit() {
+	vm.quit <- 1
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list