[ARVADOS] updated: 57b46f9b292e561739fde3ea098c366db8f4bc23
git at public.curoverse.com
git at public.curoverse.com
Wed Apr 30 13:49:27 EDT 2014
Summary of changes:
services/keep/src/keep/keep.go | 65 +++++++++++++++-----------
services/keep/src/keep/keep_test.go | 86 ++++++++++++++++++++++-------------
services/keep/src/keep/volume.go | 56 +++++++++++++++++++++++
3 files changed, 147 insertions(+), 60 deletions(-)
via 57b46f9b292e561739fde3ea098c366db8f4bc23 (commit)
from 30f834eef8916c0d613c098a4897ac932a2e0b37 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 57b46f9b292e561739fde3ea098c366db8f4bc23
Author: Tim Pierce <twp at curoverse.com>
Date: Wed Apr 30 13:49:08 2014 -0400
Replaced KeepVolumes with a VolumeManager interface.
A VolumeManager interface may be used to set policy for writing to a set
of Keep volumes. The sole implementation at present is RRVolumeManager,
which implements simple round-robin scheduling; other implementations
could include a manager which selects the least-loaded disk first, the
fastest one, the one with the fewest pending writes, etc. etc.
Refs #2620
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index c619f92..e621955 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -37,8 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
var PROC_MOUNTS = "/proc/mounts"
-// KeepVolumes is a slice of volumes on which blocks can be stored.
-var KeepVolumes []Volume
+// The Keep VolumeManager maintains a list of available volumes.
+var KeepVM VolumeManager
// ==========
// Error types.
@@ -108,21 +108,24 @@ func main() {
}
// Check that the specified volumes actually exist.
- KeepVolumes = []Volume(nil)
+ var goodvols []Volume = nil
for _, v := range keepvols {
if _, err := os.Stat(v); err == nil {
log.Println("adding Keep volume:", v)
newvol := MakeUnixVolume(v, serialize_io)
- KeepVolumes = append(KeepVolumes, &newvol)
+ goodvols = append(goodvols, &newvol)
} else {
log.Printf("bad Keep volume: %s\n", err)
}
}
- if len(KeepVolumes) == 0 {
+ if len(goodvols) == 0 {
log.Fatal("could not find any keep volumes")
}
+ // Start a round-robin VolumeManager with the volumes we have found.
+ KeepVM = MakeRRVolumeManager(goodvols)
+
// Set up REST handlers.
//
// Start with a router that will route each URL path to an
@@ -229,7 +232,7 @@ func IndexHandler(w http.ResponseWriter, req *http.Request) {
prefix := mux.Vars(req)["prefix"]
var index string
- for _, vol := range KeepVolumes {
+ for _, vol := range KeepVM.Volumes() {
index = index + vol.Index(prefix)
}
w.Write([]byte(index))
@@ -276,8 +279,8 @@ func StatusHandler(w http.ResponseWriter, req *http.Request) {
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
- for i, vol := range KeepVolumes {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
+ for i, vol := range KeepVM.Volumes() {
st.Volumes[i] = vol.Status()
}
return st
@@ -312,7 +315,7 @@ func GetVolumeStatus(volume string) *VolumeStatus {
func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
- for _, vol := range KeepVolumes {
+ for _, vol := range KeepVM.Volumes() {
if buf, err := vol.Get(hash); err != nil {
// IsNotExist is an expected error and may be ignored.
// (If all volumes report IsNotExist, we return a NotFoundError)
@@ -395,27 +398,33 @@ func PutBlock(block []byte, hash string) error {
}
}
- // Store the block on the first available Keep volume.
- allFull := true
- for _, vol := range KeepVolumes {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ // Choose a Keep volume to write to.
+ // If this volume fails, try all of the volumes in order.
+ vol := KeepVM.Choose()
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
+ } else {
+ allFull := true
+ for _, vol := range KeepVM.Volumes() {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
+ }
+ if err != FullError {
+ // The volume is not full but the write did not succeed.
+ // Report the error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ }
}
- }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ if allFull {
+ log.Printf("all Keep volumes full")
+ return FullError
+ } else {
+ log.Printf("all Keep volumes failed")
+ return GenericError
+ }
}
}
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index 5b7c3c7..30d103d 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -48,8 +48,11 @@ func TestGetBlock(t *testing.T) {
defer teardown()
// Prepare two test Keep volumes. Our block is stored on the second volume.
- KeepVolumes = setup(2)
- if err := KeepVolumes[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
t.Error(err)
}
@@ -70,7 +73,8 @@ func TestGetBlockMissing(t *testing.T) {
defer teardown()
// Create two empty test Keep volumes.
- KeepVolumes = setup(2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH)
@@ -87,8 +91,11 @@ func TestGetBlockCorrupt(t *testing.T) {
defer teardown()
// Create two test Keep volumes and store a corrupt block in one.
- KeepVolumes = setup(2)
- KeepVolumes[0].Put(TEST_HASH, BAD_BLOCK)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, BAD_BLOCK)
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH)
@@ -108,16 +115,18 @@ func TestPutBlockOK(t *testing.T) {
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Fatalf("PutBlock: %v", err)
}
- result, err := KeepVolumes[0].Get(TEST_HASH)
+ vols := KeepVM.Volumes()
+ result, err := vols[0].Get(TEST_HASH)
if err != nil {
- t.Fatalf("KeepVolumes[0].Get returned error: %v", err)
+ t.Fatalf("Volume #0 Get returned error: %v", err)
}
if string(result) != string(TEST_BLOCK) {
t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
@@ -133,8 +142,11 @@ func TestPutBlockOneVol(t *testing.T) {
defer teardown()
// Create two test Keep volumes, but cripple one of them.
- KeepVolumes = setup(2)
- KeepVolumes[0].(*MockVolume).Bad = true
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -160,7 +172,8 @@ func TestPutBlockMD5Fail(t *testing.T) {
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Check that PutBlock returns the expected error when the hash does
// not match the block.
@@ -183,10 +196,12 @@ func TestPutBlockCorrupt(t *testing.T) {
defer teardown()
// Create two test Keep volumes.
- KeepVolumes = setup(2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Store a corrupted block under TEST_HASH.
- KeepVolumes[0].Put(TEST_HASH, BAD_BLOCK)
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, BAD_BLOCK)
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Errorf("PutBlock: %v", err)
}
@@ -212,7 +227,8 @@ func TestPutBlockCollision(t *testing.T) {
var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
// Prepare two test Keep volumes.
- KeepVolumes = setup(2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
@@ -314,14 +330,17 @@ func TestIndex(t *testing.T) {
// Set up Keep volumes and populate them.
// Include multiple blocks on different volumes, and
// some metadata files.
- KeepVolumes = setup(2)
- KeepVolumes[0].Put(TEST_HASH, TEST_BLOCK)
- KeepVolumes[1].Put(TEST_HASH_2, TEST_BLOCK_2)
- KeepVolumes[0].Put(TEST_HASH_3, TEST_BLOCK_3)
- KeepVolumes[0].Put(TEST_HASH+".meta", []byte("metadata"))
- KeepVolumes[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
-
- index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+ vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
+ vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
+ vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
+
+ index := vols[0].Index("") + vols[1].Index("")
expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
TEST_HASH_3 + `\+\d+ \d+\n` +
TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -346,13 +365,16 @@ func TestNodeStatus(t *testing.T) {
defer teardown()
// Set up test Keep volumes with some blocks.
- KeepVolumes = setup(2)
- KeepVolumes[0].Put(TEST_HASH, TEST_BLOCK)
- KeepVolumes[1].Put(TEST_HASH_2, TEST_BLOCK_2)
+ KeepVM = MakeTestVolumeManager(2)
+ defer func() { KeepVM.Quit() }()
+
+ vols := KeepVM.Volumes()
+ vols[0].Put(TEST_HASH, TEST_BLOCK)
+ vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
// Get node status and make a basic sanity check.
st := GetNodeStatus()
- for i := range KeepVolumes {
+ for i := range vols {
volinfo := st.Volumes[i]
mtp := volinfo.MountPoint
if mtp != "/bogo" {
@@ -374,21 +396,21 @@ func TestNodeStatus(t *testing.T) {
// Helper functions for unit tests.
// ========================================
-// setup
-// Create KeepVolumes for testing.
-// Returns a slice of pathnames to temporary Keep volumes.
+// MakeTestVolumeManager
+// Creates and returns a RRVolumeManager with the specified number
+// of MockVolumes.
//
-func setup(num_volumes int) []Volume {
+func MakeTestVolumeManager(num_volumes int) VolumeManager {
vols := make([]Volume, num_volumes)
for i := range vols {
vols[i] = CreateMockVolume()
}
- return vols
+ return MakeRRVolumeManager(vols)
}
// teardown
// Cleanup to perform after each test.
//
func teardown() {
- KeepVolumes = nil
+ KeepVM = nil
}
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index dd4245a..fffc815 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -71,3 +71,59 @@ func (v *MockVolume) Status() *VolumeStatus {
func (v *MockVolume) String() string {
return "[MockVolume]"
}
+
+// A VolumeManager manages a collection of volumes.
+//
+// - Volumes is a slice of available Volumes.
+// - Choose() returns a Volume suitable for writing to.
+// - Quit() instructs the VolumeManager to shut down gracefully.
+//
+type VolumeManager interface {
+ Volumes() []Volume
+ Choose() Volume
+ Quit()
+}
+
+type RRVolumeManager struct {
+ volumes []Volume
+ nextwrite chan Volume
+ quit chan int
+}
+
+func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
+ // Create a new VolumeManager struct with the specified volumes,
+ // and with new Nextwrite and Quit channels.
+ // The Quit channel is buffered with a capacity of 1 so that
+ // another routine may write to it without blocking.
+ vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
+
+ // This goroutine implements round-robin volume selection.
+ // It sends each available Volume in turn to the Nextwrite
+ // channel, until receiving a notification on the Quit channel
+ // that it should terminate.
+ go func() {
+ var i int = 0
+ for {
+ select {
+ case <-vm.quit:
+ return
+ case vm.nextwrite <- vm.volumes[i]:
+ i = (i + 1) % len(vm.volumes)
+ }
+ }
+ }()
+
+ return vm
+}
+
+func (vm *RRVolumeManager) Volumes() []Volume {
+ return vm.volumes
+}
+
+func (vm *RRVolumeManager) Choose() Volume {
+ return <-vm.nextwrite
+}
+
+func (vm *RRVolumeManager) Quit() {
+ vm.quit <- 1
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list