[ARVADOS] updated: eca0d3d0a6938e08dde6067c4565754c45f5ebb7

git at public.curoverse.com git at public.curoverse.com
Wed Apr 23 16:14:19 EDT 2014


Summary of changes:
 services/keep/src/keep/keep.go      |  175 +++++--------------------------
 services/keep/src/keep/keep_test.go |   34 ++++---
 services/keep/src/keep/volume.go    |  195 ++++++++++++++++++++++++++++++++++-
 3 files changed, 238 insertions(+), 166 deletions(-)

       via  eca0d3d0a6938e08dde6067c4565754c45f5ebb7 (commit)
       via  b66ad0b212a80c117b5b344a715331962d0bfe2d (commit)
      from  8202cb5f1682588e35dcf90519f53d74c06d4aa6 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit eca0d3d0a6938e08dde6067c4565754c45f5ebb7
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Apr 22 17:19:43 2014 -0400

    Made KeepVolumes a slice of Volume objects, not strings.
    
    Moved more Unix-specific code into volume.go as part of the process.

diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index e4a2675..8b8ae17 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -14,12 +14,9 @@ import (
 	"log"
 	"net/http"
 	"os"
-	"path/filepath"
 	"regexp"
-	"strconv"
 	"strings"
 	"syscall"
-	"time"
 )
 
 // ======================
@@ -40,7 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
 
 var PROC_MOUNTS = "/proc/mounts"
 
-var KeepVolumes []string
+// KeepVolumes is a slice of volumes on which blocks can be stored.
+var KeepVolumes []Volume
 
 // ==========
 // Error types.
@@ -107,11 +105,11 @@ func main() {
 	}
 
 	// Check that the specified volumes actually exist.
-	KeepVolumes = []string(nil)
+	KeepVolumes = []Volume(nil)
 	for _, v := range keepvols {
 		if _, err := os.Stat(v); err == nil {
 			log.Println("adding Keep volume:", v)
-			KeepVolumes = append(KeepVolumes, v)
+			KeepVolumes = append(KeepVolumes, &UnixVolume{v})
 		} else {
 			log.Printf("bad Keep volume: %s\n", err)
 		}
@@ -226,7 +224,10 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
 func IndexHandler(w http.ResponseWriter, req *http.Request) {
 	prefix := mux.Vars(req)["prefix"]
 
-	index := IndexLocators(prefix)
+	var index string
+	for _, vol := range KeepVolumes {
+		index = index + vol.Index(prefix)
+	}
 	w.Write([]byte(index))
 }
 
@@ -273,7 +274,7 @@ func GetNodeStatus() *NodeStatus {
 
 	st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
 	for i, vol := range KeepVolumes {
-		st.Volumes[i] = GetVolumeStatus(vol)
+		st.Volumes[i] = vol.Status()
 	}
 	return st
 }
@@ -305,66 +306,10 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 	return &VolumeStatus{volume, devnum, free, used}
 }
 
-// IndexLocators
-//     Returns a string containing a list of locator ids found on this
-//     Keep server.  If {prefix} is given, return only those locator
-//     ids that begin with the given prefix string.
-//
-//     The return string consists of a sequence of newline-separated
-//     strings in the format
-//
-//         locator+size modification-time
-//
-//     e.g.:
-//
-//         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-//         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-//         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func IndexLocators(prefix string) string {
-	var output string
-	for _, vol := range KeepVolumes {
-		filepath.Walk(vol,
-			func(path string, info os.FileInfo, err error) error {
-				// This WalkFunc inspects each path in the volume
-				// and prints an index line for all files that begin
-				// with prefix.
-				if err != nil {
-					log.Printf("IndexHandler: %s: walking to %s: %s",
-						vol, path, err)
-					return nil
-				}
-				locator := filepath.Base(path)
-				// Skip directories that do not match prefix.
-				// We know there is nothing interesting inside.
-				if info.IsDir() &&
-					!strings.HasPrefix(locator, prefix) &&
-					!strings.HasPrefix(prefix, locator) {
-					return filepath.SkipDir
-				}
-				// Skip any file that is not apparently a locator, e.g. .meta files
-				if is_valid, err := IsValidLocator(locator); err != nil {
-					return err
-				} else if !is_valid {
-					return nil
-				}
-				// Print filenames beginning with prefix
-				if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-					output = output + fmt.Sprintf(
-						"%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
-				}
-				return nil
-			})
-	}
-
-	return output
-}
-
 func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	for _, vol := range KeepVolumes {
-		uv := UnixVolume{vol}
-		if buf, err := uv.Read(hash); err != nil {
+		if buf, err := vol.Read(hash); err != nil {
 			// IsNotExist is an expected error and may be ignored.
 			// (If all volumes report IsNotExist, we return a NotFoundError)
 			// A CorruptError should be returned immediately.
@@ -438,39 +383,16 @@ func PutBlock(block []byte, hash string) error {
 	// Store the block on the first available Keep volume.
 	allFull := true
 	for _, vol := range KeepVolumes {
-		if IsFull(vol) {
-			continue
-		}
-		allFull = false
-		blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
-		if err := os.MkdirAll(blockDir, 0755); err != nil {
-			log.Printf("%s: could not create directory %s: %s",
-				hash, blockDir, err)
-			continue
-		}
-
-		tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
-		if tmperr != nil {
-			log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
-			continue
-		}
-		blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-
-		if _, err := tmpfile.Write(block); err != nil {
-			log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
-			continue
+		err := vol.Write(hash, block)
+		if err == nil {
+			return nil // success!
 		}
-		if err := tmpfile.Close(); err != nil {
-			log.Printf("closing %s: %s\n", tmpfile.Name(), err)
-			os.Remove(tmpfile.Name())
-			continue
+		if err != FullError {
+			// The volume is not full but the write did not succeed.
+			// Report the error and continue trying.
+			allFull = false
+			log.Printf("%s: Write(%s): %s\n", vol, hash, err)
 		}
-		if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
-			log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
-			os.Remove(tmpfile.Name())
-			continue
-		}
-		return nil
 	}
 
 	if allFull {
@@ -482,53 +404,6 @@ func PutBlock(block []byte, hash string) error {
 	}
 }
 
-func IsFull(volume string) (isFull bool) {
-	fullSymlink := volume + "/full"
-
-	// Check if the volume has been marked as full in the last hour.
-	if link, err := os.Readlink(fullSymlink); err == nil {
-		if ts, err := strconv.Atoi(link); err == nil {
-			fulltime := time.Unix(int64(ts), 0)
-			if time.Since(fulltime).Hours() < 1.0 {
-				return true
-			}
-		}
-	}
-
-	if avail, err := FreeDiskSpace(volume); err == nil {
-		isFull = avail < MIN_FREE_KILOBYTES
-	} else {
-		log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
-		isFull = false
-	}
-
-	// If the volume is full, timestamp it.
-	if isFull {
-		now := fmt.Sprintf("%d", time.Now().Unix())
-		os.Symlink(now, fullSymlink)
-	}
-	return
-}
-
-// FreeDiskSpace(volume)
-//     Returns the amount of available disk space on VOLUME,
-//     as a number of 1k blocks.
-//
-//     TODO(twp): consider integrating this better with
-//     VolumeStatus (e.g. keep a NodeStatus object up-to-date
-//     periodically and use it as the source of info)
-//
-func FreeDiskSpace(volume string) (free uint64, err error) {
-	var fs syscall.Statfs_t
-	err = syscall.Statfs(volume, &fs)
-	if err == nil {
-		// Statfs output is not guaranteed to measure free
-		// space in terms of 1K blocks.
-		free = fs.Bavail * uint64(fs.Bsize) / 1024
-	}
-	return
-}
-
 // ReadAtMost
 //     Reads bytes repeatedly from an io.Reader until either
 //     encountering EOF, or the maxbytes byte limit has been reached.
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index 97fa1c7..ce5a41c 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -37,6 +37,9 @@ var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
 //           - use an interface to mock ioutil.TempFile with a File
 //             object that always returns an error on write
 //
+// TODO(twp): Make these tests less dependent on being able to access
+// the UnixVolume root field.
+//
 // ========================================
 // GetBlock tests.
 // ========================================
@@ -136,7 +139,7 @@ func TestPutBlockOneVol(t *testing.T) {
 
 	// Create two test Keep volumes, but cripple one of them.
 	KeepVolumes = setup(t, 2)
-	os.Chmod(KeepVolumes[0], 000)
+	os.Chmod(KeepVolumes[0].(*UnixVolume).root, 000)
 
 	// Check that PutBlock stores the data as expected.
 	if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -237,12 +240,12 @@ func TestFindKeepVolumes(t *testing.T) {
 	defer teardown()
 
 	// Initialize two keep volumes.
-	var tempVols []string = setup(t, 2)
+	var tempVols []Volume = setup(t, 2)
 
 	// Set up a bogus PROC_MOUNTS file.
 	if f, err := ioutil.TempFile("", "keeptest"); err == nil {
 		for _, vol := range tempVols {
-			fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+			fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol.(*UnixVolume).root))
 		}
 		f.Close()
 		PROC_MOUNTS = f.Name()
@@ -254,9 +257,10 @@ func TestFindKeepVolumes(t *testing.T) {
 				len(tempVols), len(resultVols))
 		}
 		for i := range tempVols {
-			if tempVols[i] != resultVols[i] {
+			tempVolRoot := tempVols[i].(*UnixVolume).root
+			if tempVolRoot != resultVols[i] {
 				t.Errorf("FindKeepVolumes returned %s, expected %s\n",
-					resultVols[i], tempVols[i])
+					tempVolRoot, tempVols[i])
 			}
 		}
 
@@ -305,7 +309,7 @@ func TestIndex(t *testing.T) {
 	store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
 	store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
 
-	index := IndexLocators("")
+	index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
 	expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
 		TEST_HASH_3 + `\+\d+ \d+\n` +
 		TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -337,7 +341,7 @@ func TestNodeStatus(t *testing.T) {
 	for i, vol := range KeepVolumes {
 		volinfo := st.Volumes[i]
 		mtp := volinfo.MountPoint
-		if mtp != vol {
+		if mtp != vol.(*UnixVolume).root {
 			t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
 		}
 		if volinfo.DeviceNum == 0 {
@@ -360,12 +364,13 @@ func TestNodeStatus(t *testing.T) {
 //     Create KeepVolumes for testing.
 //     Returns a slice of pathnames to temporary Keep volumes.
 //
-func setup(t *testing.T, num_volumes int) []string {
-	vols := make([]string, num_volumes)
+func setup(t *testing.T, num_volumes int) []Volume {
+	vols := make([]Volume, num_volumes)
 	for i := range vols {
 		if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
-			vols[i] = dir + "/keep"
-			os.Mkdir(vols[i], 0755)
+			root := dir + "/keep"
+			vols[i] = &UnixVolume{root}
+			os.Mkdir(root, 0755)
 		} else {
 			t.Fatal(err)
 		}
@@ -378,16 +383,17 @@ func setup(t *testing.T, num_volumes int) []string {
 //
 func teardown() {
 	for _, vol := range KeepVolumes {
-		os.RemoveAll(path.Dir(vol))
+		os.RemoveAll(path.Dir(vol.(*UnixVolume).root))
 	}
 	KeepVolumes = nil
 }
 
 // store
 //     Low-level code to write Keep blocks directly to disk for testing.
+//     Note: works only on UnixVolumes.
 //
-func store(t *testing.T, keepdir string, filename string, block []byte) {
-	blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
+func store(t *testing.T, vol Volume, filename string, block []byte) {
+	blockdir := fmt.Sprintf("%s/%s", vol.(*UnixVolume).root, filename[:3])
 	if err := os.MkdirAll(blockdir, 0755); err != nil {
 		t.Fatal(err)
 	}
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index 20113ff..745f984 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -3,15 +3,23 @@ package main
 import (
 	"crypto/md5"
 	"fmt"
+	"io/ioutil"
 	"log"
 	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
 )
 
 // A Volume is an interface that represents a Keep back-end volume.
 
 type Volume interface {
-	Read(locator string) ([]byte, error)
-	Write(locator string, block []byte) error
+	Read(loc string) ([]byte, error)
+	Write(loc string, block []byte) error
+	Index(prefix string) string
+	Status() *VolumeStatus
 }
 
 // A UnixVolume is a Volume that writes to a locally mounted disk.
@@ -19,12 +27,23 @@ type UnixVolume struct {
 	root string // path to this volume
 }
 
-func (v *UnixVolume) Read(locator string) ([]byte, error) {
+// Read retrieves a block identified by the locator string "loc", and
+// returns its contents as a byte slice.
+//
+// If the block could not be opened or read, Read returns a nil slice
+// and the os.Error that was generated.
+//
+// If the block is present but its content hash does not match loc,
+// Read returns the block and a CorruptError.  It is the caller's
+// responsibility to decide what (if anything) to do with the
+// corrupted data block.
+//
+func (v *UnixVolume) Read(loc string) ([]byte, error) {
 	var f *os.File
 	var err error
 	var nread int
 
-	blockFilename := fmt.Sprintf("%s/%s/%s", v.root, locator[0:3], locator)
+	blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
 
 	f, err = os.Open(blockFilename)
 	if err != nil {
@@ -41,7 +60,7 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
 	// Double check the file checksum.
 	//
 	filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
-	if filehash != locator {
+	if filehash != loc {
 		// TODO(twp): this condition probably represents a bad disk and
 		// should raise major alarm bells for an administrator: e.g.
 		// they should be sent directly to an event manager at high
@@ -55,3 +74,169 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
 	// Success!
 	return buf[:nread], nil
 }
+
+// Write stores a block of data identified by the locator string
+// "loc".  It returns nil on success.  If the volume is full, it
+// returns a FullError.  If the write fails due to some other error,
+// that error is returned.
+//
+func (v *UnixVolume) Write(loc string, block []byte) error {
+	if v.IsFull() {
+		return FullError
+	}
+	blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
+	if err := os.MkdirAll(blockDir, 0755); err != nil {
+		log.Printf("%s: could not create directory %s: %s",
+			loc, blockDir, err)
+		return err
+	}
+
+	tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+	if tmperr != nil {
+		log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+		return tmperr
+	}
+	blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
+
+	if _, err := tmpfile.Write(block); err != nil {
+		log.Printf("%s: writing to %s: %s\n", v.root, blockFilename, err)
+		return err
+	}
+	if err := tmpfile.Close(); err != nil {
+		log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+		os.Remove(tmpfile.Name())
+		return err
+	}
+	if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+		log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+		os.Remove(tmpfile.Name())
+		return err
+	}
+	return nil
+}
+
+// Status returns a VolumeStatus struct describing the volume's
+// current state.
+//
+func (v *UnixVolume) Status() *VolumeStatus {
+	var fs syscall.Statfs_t
+	var devnum uint64
+
+	if fi, err := os.Stat(v.root); err == nil {
+		devnum = fi.Sys().(*syscall.Stat_t).Dev
+	} else {
+		log.Printf("%s: os.Stat: %s\n", v.root, err)
+		return nil
+	}
+
+	err := syscall.Statfs(v.root, &fs)
+	if err != nil {
+		log.Printf("%s: statfs: %s\n", v.root, err)
+		return nil
+	}
+	// These calculations match the way df calculates disk usage:
+	// "free" space is measured by fs.Bavail, but "used" space
+	// uses fs.Blocks - fs.Bfree.
+	free := fs.Bavail * uint64(fs.Bsize)
+	used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
+	return &VolumeStatus{v.root, devnum, free, used}
+}
+
+// Index returns a list of blocks found on this volume which begin with
+// the specified prefix.
+//
+// The return value is a multiline string (separated by
+// newlines). Each line is in the format
+//
+//     locator+size modification-time
+//
+// e.g.:
+//
+//     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+//     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+//     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func (v *UnixVolume) Index(prefix string) (output string) {
+	filepath.Walk(v.root,
+		func(path string, info os.FileInfo, err error) error {
+			// This WalkFunc inspects each path in the volume
+			// and prints an index line for all files that begin
+			// with prefix.
+			if err != nil {
+				log.Printf("IndexHandler: %s: walking to %s: %s",
+					v.root, path, err)
+				return nil
+			}
+			locator := filepath.Base(path)
+			// Skip directories that do not match prefix.
+			// We know there is nothing interesting inside.
+			if info.IsDir() &&
+				!strings.HasPrefix(locator, prefix) &&
+				!strings.HasPrefix(prefix, locator) {
+				return filepath.SkipDir
+			}
+			// Skip any file that is not apparently a locator, e.g. .meta files
+			if is_valid, err := IsValidLocator(locator); err != nil {
+				return err
+			} else if !is_valid {
+				return nil
+			}
+			// Print filenames beginning with prefix
+			if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+				output = output + fmt.Sprintf(
+					"%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+			}
+			return nil
+		})
+
+	return
+}
+
+// IsFull returns true if the free space on the volume is less than
+// MIN_FREE_KILOBYTES.
+//
+func (v *UnixVolume) IsFull() (isFull bool) {
+	fullSymlink := v.root + "/full"
+
+	// Check if the volume has been marked as full in the last hour.
+	if link, err := os.Readlink(fullSymlink); err == nil {
+		if ts, err := strconv.Atoi(link); err == nil {
+			fulltime := time.Unix(int64(ts), 0)
+			if time.Since(fulltime).Hours() < 1.0 {
+				return true
+			}
+		}
+	}
+
+	if avail, err := v.FreeDiskSpace(); err == nil {
+		isFull = avail < MIN_FREE_KILOBYTES
+	} else {
+		log.Printf("%s: FreeDiskSpace: %s\n", v.root, err)
+		isFull = false
+	}
+
+	// If the volume is full, timestamp it.
+	if isFull {
+		now := fmt.Sprintf("%d", time.Now().Unix())
+		os.Symlink(now, fullSymlink)
+	}
+	return
+}
+
+// FreeDiskSpace returns the number of unused 1k blocks available on
+// the volume.
+//
+//     TODO(twp): consider integrating this better with VolumeStatus
+//     (e.g. keep a NodeStatus object up-to-date periodically and use
+//     it as the source of info)
+//
+func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+	var fs syscall.Statfs_t
+	err = syscall.Statfs(v.root, &fs)
+	if err == nil {
+		// Statfs output is not guaranteed to measure free
+		// space in terms of 1K blocks.
+		free = fs.Bavail * uint64(fs.Bsize) / 1024
+	}
+	return
+}

commit b66ad0b212a80c117b5b344a715331962d0bfe2d
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Apr 22 11:20:02 2014 -0400

    Bug fix: GetBlock must report a CorruptError immediately.
    
    Refs #2620.

diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index f13de44..e4a2675 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -365,12 +365,18 @@ func GetBlock(hash string) ([]byte, error) {
 	for _, vol := range KeepVolumes {
 		uv := UnixVolume{vol}
 		if buf, err := uv.Read(hash); err != nil {
-			if os.IsNotExist(err) {
-				// IsNotExist is an expected error.
+			// IsNotExist is an expected error and may be ignored.
+			// (If all volumes report IsNotExist, we return a NotFoundError)
+			// A CorruptError should be returned immediately.
+			// Any other errors should be logged but we continue trying to
+			// read.
+			switch {
+			case os.IsNotExist(err):
 				continue
-			} else {
+			case err == CorruptError:
+				return nil, err
+			default:
 				log.Printf("GetBlock: reading %s: %s\n", hash, err)
-				return buf, err
 			}
 		} else {
 			// Success!

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list