[ARVADOS] updated: eca0d3d0a6938e08dde6067c4565754c45f5ebb7
git at public.curoverse.com
git at public.curoverse.com
Wed Apr 23 16:14:19 EDT 2014
Summary of changes:
services/keep/src/keep/keep.go | 175 +++++--------------------------
services/keep/src/keep/keep_test.go | 34 ++++---
services/keep/src/keep/volume.go | 195 ++++++++++++++++++++++++++++++++++-
3 files changed, 238 insertions(+), 166 deletions(-)
via eca0d3d0a6938e08dde6067c4565754c45f5ebb7 (commit)
via b66ad0b212a80c117b5b344a715331962d0bfe2d (commit)
from 8202cb5f1682588e35dcf90519f53d74c06d4aa6 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit eca0d3d0a6938e08dde6067c4565754c45f5ebb7
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Apr 22 17:19:43 2014 -0400
Made KeepVolumes a slice of Volume objects, not strings.
Moved more Unix-specific code into volume.go as part of the process.
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index e4a2675..8b8ae17 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -14,12 +14,9 @@ import (
"log"
"net/http"
"os"
- "path/filepath"
"regexp"
- "strconv"
"strings"
"syscall"
- "time"
)
// ======================
@@ -40,7 +37,8 @@ const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
var PROC_MOUNTS = "/proc/mounts"
-var KeepVolumes []string
+// KeepVolumes is a slice of volumes on which blocks can be stored.
+var KeepVolumes []Volume
// ==========
// Error types.
@@ -107,11 +105,11 @@ func main() {
}
// Check that the specified volumes actually exist.
- KeepVolumes = []string(nil)
+ KeepVolumes = []Volume(nil)
for _, v := range keepvols {
if _, err := os.Stat(v); err == nil {
log.Println("adding Keep volume:", v)
- KeepVolumes = append(KeepVolumes, v)
+ KeepVolumes = append(KeepVolumes, &UnixVolume{v})
} else {
log.Printf("bad Keep volume: %s\n", err)
}
@@ -226,7 +224,10 @@ func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
func IndexHandler(w http.ResponseWriter, req *http.Request) {
prefix := mux.Vars(req)["prefix"]
- index := IndexLocators(prefix)
+ var index string
+ for _, vol := range KeepVolumes {
+ index = index + vol.Index(prefix)
+ }
w.Write([]byte(index))
}
@@ -273,7 +274,7 @@ func GetNodeStatus() *NodeStatus {
st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
for i, vol := range KeepVolumes {
- st.Volumes[i] = GetVolumeStatus(vol)
+ st.Volumes[i] = vol.Status()
}
return st
}
@@ -305,66 +306,10 @@ func GetVolumeStatus(volume string) *VolumeStatus {
return &VolumeStatus{volume, devnum, free, used}
}
-// IndexLocators
-// Returns a string containing a list of locator ids found on this
-// Keep server. If {prefix} is given, return only those locator
-// ids that begin with the given prefix string.
-//
-// The return string consists of a sequence of newline-separated
-// strings in the format
-//
-// locator+size modification-time
-//
-// e.g.:
-//
-// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-//
-func IndexLocators(prefix string) string {
- var output string
- for _, vol := range KeepVolumes {
- filepath.Walk(vol,
- func(path string, info os.FileInfo, err error) error {
- // This WalkFunc inspects each path in the volume
- // and prints an index line for all files that begin
- // with prefix.
- if err != nil {
- log.Printf("IndexHandler: %s: walking to %s: %s",
- vol, path, err)
- return nil
- }
- locator := filepath.Base(path)
- // Skip directories that do not match prefix.
- // We know there is nothing interesting inside.
- if info.IsDir() &&
- !strings.HasPrefix(locator, prefix) &&
- !strings.HasPrefix(prefix, locator) {
- return filepath.SkipDir
- }
- // Skip any file that is not apparently a locator, e.g. .meta files
- if is_valid, err := IsValidLocator(locator); err != nil {
- return err
- } else if !is_valid {
- return nil
- }
- // Print filenames beginning with prefix
- if !info.IsDir() && strings.HasPrefix(locator, prefix) {
- output = output + fmt.Sprintf(
- "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
- }
- return nil
- })
- }
-
- return output
-}
-
func GetBlock(hash string) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
for _, vol := range KeepVolumes {
- uv := UnixVolume{vol}
- if buf, err := uv.Read(hash); err != nil {
+ if buf, err := vol.Read(hash); err != nil {
// IsNotExist is an expected error and may be ignored.
// (If all volumes report IsNotExist, we return a NotFoundError)
// A CorruptError should be returned immediately.
@@ -438,39 +383,16 @@ func PutBlock(block []byte, hash string) error {
// Store the block on the first available Keep volume.
allFull := true
for _, vol := range KeepVolumes {
- if IsFull(vol) {
- continue
- }
- allFull = false
- blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
- if err := os.MkdirAll(blockDir, 0755); err != nil {
- log.Printf("%s: could not create directory %s: %s",
- hash, blockDir, err)
- continue
- }
-
- tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
- if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
- continue
- }
- blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
-
- if _, err := tmpfile.Write(block); err != nil {
- log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
- continue
+ err := vol.Write(hash, block)
+ if err == nil {
+ return nil // success!
}
- if err := tmpfile.Close(); err != nil {
- log.Printf("closing %s: %s\n", tmpfile.Name(), err)
- os.Remove(tmpfile.Name())
- continue
+ if err != FullError {
+ // The volume is not full but the write did not succeed.
+ // Report the error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
}
- if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
- log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
- os.Remove(tmpfile.Name())
- continue
- }
- return nil
}
if allFull {
@@ -482,53 +404,6 @@ func PutBlock(block []byte, hash string) error {
}
}
-func IsFull(volume string) (isFull bool) {
- fullSymlink := volume + "/full"
-
- // Check if the volume has been marked as full in the last hour.
- if link, err := os.Readlink(fullSymlink); err == nil {
- if ts, err := strconv.Atoi(link); err == nil {
- fulltime := time.Unix(int64(ts), 0)
- if time.Since(fulltime).Hours() < 1.0 {
- return true
- }
- }
- }
-
- if avail, err := FreeDiskSpace(volume); err == nil {
- isFull = avail < MIN_FREE_KILOBYTES
- } else {
- log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
- isFull = false
- }
-
- // If the volume is full, timestamp it.
- if isFull {
- now := fmt.Sprintf("%d", time.Now().Unix())
- os.Symlink(now, fullSymlink)
- }
- return
-}
-
-// FreeDiskSpace(volume)
-// Returns the amount of available disk space on VOLUME,
-// as a number of 1k blocks.
-//
-// TODO(twp): consider integrating this better with
-// VolumeStatus (e.g. keep a NodeStatus object up-to-date
-// periodically and use it as the source of info)
-//
-func FreeDiskSpace(volume string) (free uint64, err error) {
- var fs syscall.Statfs_t
- err = syscall.Statfs(volume, &fs)
- if err == nil {
- // Statfs output is not guaranteed to measure free
- // space in terms of 1K blocks.
- free = fs.Bavail * uint64(fs.Bsize) / 1024
- }
- return
-}
-
// ReadAtMost
// Reads bytes repeatedly from an io.Reader until either
// encountering EOF, or the maxbytes byte limit has been reached.
diff --git a/services/keep/src/keep/keep_test.go b/services/keep/src/keep/keep_test.go
index 97fa1c7..ce5a41c 100644
--- a/services/keep/src/keep/keep_test.go
+++ b/services/keep/src/keep/keep_test.go
@@ -37,6 +37,9 @@ var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
// - use an interface to mock ioutil.TempFile with a File
// object that always returns an error on write
//
+// TODO(twp): Make these tests less dependent on being able to access
+// the UnixVolume root field.
+//
// ========================================
// GetBlock tests.
// ========================================
@@ -136,7 +139,7 @@ func TestPutBlockOneVol(t *testing.T) {
// Create two test Keep volumes, but cripple one of them.
KeepVolumes = setup(t, 2)
- os.Chmod(KeepVolumes[0], 000)
+ os.Chmod(KeepVolumes[0].(*UnixVolume).root, 000)
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
@@ -237,12 +240,12 @@ func TestFindKeepVolumes(t *testing.T) {
defer teardown()
// Initialize two keep volumes.
- var tempVols []string = setup(t, 2)
+ var tempVols []Volume = setup(t, 2)
// Set up a bogus PROC_MOUNTS file.
if f, err := ioutil.TempFile("", "keeptest"); err == nil {
for _, vol := range tempVols {
- fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+ fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol.(*UnixVolume).root))
}
f.Close()
PROC_MOUNTS = f.Name()
@@ -254,9 +257,10 @@ func TestFindKeepVolumes(t *testing.T) {
len(tempVols), len(resultVols))
}
for i := range tempVols {
- if tempVols[i] != resultVols[i] {
+ tempVolRoot := tempVols[i].(*UnixVolume).root
+ if tempVolRoot != resultVols[i] {
t.Errorf("FindKeepVolumes returned %s, expected %s\n",
- resultVols[i], tempVols[i])
+ tempVolRoot, tempVols[i])
}
}
@@ -305,7 +309,7 @@ func TestIndex(t *testing.T) {
store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
- index := IndexLocators("")
+ index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
TEST_HASH_3 + `\+\d+ \d+\n` +
TEST_HASH_2 + `\+\d+ \d+\n$`
@@ -337,7 +341,7 @@ func TestNodeStatus(t *testing.T) {
for i, vol := range KeepVolumes {
volinfo := st.Volumes[i]
mtp := volinfo.MountPoint
- if mtp != vol {
+ if mtp != vol.(*UnixVolume).root {
t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
}
if volinfo.DeviceNum == 0 {
@@ -360,12 +364,13 @@ func TestNodeStatus(t *testing.T) {
// Create KeepVolumes for testing.
// Returns a slice of pathnames to temporary Keep volumes.
//
-func setup(t *testing.T, num_volumes int) []string {
- vols := make([]string, num_volumes)
+func setup(t *testing.T, num_volumes int) []Volume {
+ vols := make([]Volume, num_volumes)
for i := range vols {
if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
- vols[i] = dir + "/keep"
- os.Mkdir(vols[i], 0755)
+ root := dir + "/keep"
+ vols[i] = &UnixVolume{root}
+ os.Mkdir(root, 0755)
} else {
t.Fatal(err)
}
@@ -378,16 +383,17 @@ func setup(t *testing.T, num_volumes int) []string {
//
func teardown() {
for _, vol := range KeepVolumes {
- os.RemoveAll(path.Dir(vol))
+ os.RemoveAll(path.Dir(vol.(*UnixVolume).root))
}
KeepVolumes = nil
}
// store
// Low-level code to write Keep blocks directly to disk for testing.
+// Note: works only on UnixVolumes.
//
-func store(t *testing.T, keepdir string, filename string, block []byte) {
- blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
+func store(t *testing.T, vol Volume, filename string, block []byte) {
+ blockdir := fmt.Sprintf("%s/%s", vol.(*UnixVolume).root, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
t.Fatal(err)
}
diff --git a/services/keep/src/keep/volume.go b/services/keep/src/keep/volume.go
index 20113ff..745f984 100644
--- a/services/keep/src/keep/volume.go
+++ b/services/keep/src/keep/volume.go
@@ -3,15 +3,23 @@ package main
import (
"crypto/md5"
"fmt"
+ "io/ioutil"
"log"
"os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
)
// A Volume is an interface that represents a Keep back-end volume.
type Volume interface {
- Read(locator string) ([]byte, error)
- Write(locator string, block []byte) error
+ Read(loc string) ([]byte, error)
+ Write(loc string, block []byte) error
+ Index(prefix string) string
+ Status() *VolumeStatus
}
// A UnixVolume is a Volume that writes to a locally mounted disk.
@@ -19,12 +27,23 @@ type UnixVolume struct {
root string // path to this volume
}
-func (v *UnixVolume) Read(locator string) ([]byte, error) {
+// Read retrieves a block identified by the locator string "loc", and
+// returns its contents as a byte slice.
+//
+// If the block could not be opened or read, Read returns a nil slice
+// and the os.Error that was generated.
+//
+// If the block is present but its content hash does not match loc,
+// Read returns the block and a CorruptError. It is the caller's
+// responsibility to decide what (if anything) to do with the
+// corrupted data block.
+//
+func (v *UnixVolume) Read(loc string) ([]byte, error) {
var f *os.File
var err error
var nread int
- blockFilename := fmt.Sprintf("%s/%s/%s", v.root, locator[0:3], locator)
+ blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
f, err = os.Open(blockFilename)
if err != nil {
@@ -41,7 +60,7 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
// Double check the file checksum.
//
filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
- if filehash != locator {
+ if filehash != loc {
// TODO(twp): this condition probably represents a bad disk and
// should raise major alarm bells for an administrator: e.g.
// they should be sent directly to an event manager at high
@@ -55,3 +74,169 @@ func (v *UnixVolume) Read(locator string) ([]byte, error) {
// Success!
return buf[:nread], nil
}
+
+// Write stores a block of data identified by the locator string
+// "loc". It returns nil on success. If the volume is full, it
+// returns a FullError. If the write fails due to some other error,
+// that error is returned.
+//
+func (v *UnixVolume) Write(loc string, block []byte) error {
+ if v.IsFull() {
+ return FullError
+ }
+ blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
+ if err := os.MkdirAll(blockDir, 0755); err != nil {
+ log.Printf("%s: could not create directory %s: %s",
+ loc, blockDir, err)
+ return err
+ }
+
+ tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
+ if tmperr != nil {
+ log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
+ return tmperr
+ }
+ blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
+
+ if _, err := tmpfile.Write(block); err != nil {
+ log.Printf("%s: writing to %s: %s\n", v.root, blockFilename, err)
+ return err
+ }
+ if err := tmpfile.Close(); err != nil {
+ log.Printf("closing %s: %s\n", tmpfile.Name(), err)
+ os.Remove(tmpfile.Name())
+ return err
+ }
+ if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
+ log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
+ os.Remove(tmpfile.Name())
+ return err
+ }
+ return nil
+}
+
+// Status returns a VolumeStatus struct describing the volume's
+// current state.
+//
+func (v *UnixVolume) Status() *VolumeStatus {
+ var fs syscall.Statfs_t
+ var devnum uint64
+
+ if fi, err := os.Stat(v.root); err == nil {
+ devnum = fi.Sys().(*syscall.Stat_t).Dev
+ } else {
+ log.Printf("%s: os.Stat: %s\n", v.root, err)
+ return nil
+ }
+
+ err := syscall.Statfs(v.root, &fs)
+ if err != nil {
+ log.Printf("%s: statfs: %s\n", v.root, err)
+ return nil
+ }
+ // These calculations match the way df calculates disk usage:
+ // "free" space is measured by fs.Bavail, but "used" space
+ // uses fs.Blocks - fs.Bfree.
+ free := fs.Bavail * uint64(fs.Bsize)
+ used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
+ return &VolumeStatus{v.root, devnum, free, used}
+}
+
+// Index returns a list of blocks found on this volume which begin with
+// the specified prefix.
+//
+// The return value is a multiline string (separated by
+// newlines). Each line is in the format
+//
+// locator+size modification-time
+//
+// e.g.:
+//
+// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
+// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
+// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
+//
+func (v *UnixVolume) Index(prefix string) (output string) {
+ filepath.Walk(v.root,
+ func(path string, info os.FileInfo, err error) error {
+ // This WalkFunc inspects each path in the volume
+ // and prints an index line for all files that begin
+ // with prefix.
+ if err != nil {
+ log.Printf("IndexHandler: %s: walking to %s: %s",
+ v.root, path, err)
+ return nil
+ }
+ locator := filepath.Base(path)
+ // Skip directories that do not match prefix.
+ // We know there is nothing interesting inside.
+ if info.IsDir() &&
+ !strings.HasPrefix(locator, prefix) &&
+ !strings.HasPrefix(prefix, locator) {
+ return filepath.SkipDir
+ }
+ // Skip any file that is not apparently a locator, e.g. .meta files
+ if is_valid, err := IsValidLocator(locator); err != nil {
+ return err
+ } else if !is_valid {
+ return nil
+ }
+ // Print filenames beginning with prefix
+ if !info.IsDir() && strings.HasPrefix(locator, prefix) {
+ output = output + fmt.Sprintf(
+ "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
+ }
+ return nil
+ })
+
+ return
+}
+
+// IsFull returns true if the free space on the volume is less than
+// MIN_FREE_KILOBYTES.
+//
+func (v *UnixVolume) IsFull() (isFull bool) {
+ fullSymlink := v.root + "/full"
+
+ // Check if the volume has been marked as full in the last hour.
+ if link, err := os.Readlink(fullSymlink); err == nil {
+ if ts, err := strconv.Atoi(link); err == nil {
+ fulltime := time.Unix(int64(ts), 0)
+ if time.Since(fulltime).Hours() < 1.0 {
+ return true
+ }
+ }
+ }
+
+ if avail, err := v.FreeDiskSpace(); err == nil {
+ isFull = avail < MIN_FREE_KILOBYTES
+ } else {
+ log.Printf("%s: FreeDiskSpace: %s\n", v.root, err)
+ isFull = false
+ }
+
+ // If the volume is full, timestamp it.
+ if isFull {
+ now := fmt.Sprintf("%d", time.Now().Unix())
+ os.Symlink(now, fullSymlink)
+ }
+ return
+}
+
+// FreeDiskSpace returns the number of unused 1k blocks available on
+// the volume.
+//
+// TODO(twp): consider integrating this better with VolumeStatus
+// (e.g. keep a NodeStatus object up-to-date periodically and use
+// it as the source of info)
+//
+func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+ var fs syscall.Statfs_t
+ err = syscall.Statfs(v.root, &fs)
+ if err == nil {
+ // Statfs output is not guaranteed to measure free
+ // space in terms of 1K blocks.
+ free = fs.Bavail * uint64(fs.Bsize) / 1024
+ }
+ return
+}
commit b66ad0b212a80c117b5b344a715331962d0bfe2d
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Apr 22 11:20:02 2014 -0400
Bug fix: GetBlock must report a CorruptError immediately.
Refs #2620.
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index f13de44..e4a2675 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -365,12 +365,18 @@ func GetBlock(hash string) ([]byte, error) {
for _, vol := range KeepVolumes {
uv := UnixVolume{vol}
if buf, err := uv.Read(hash); err != nil {
- if os.IsNotExist(err) {
- // IsNotExist is an expected error.
+ // IsNotExist is an expected error and may be ignored.
+ // (If all volumes report IsNotExist, we return a NotFoundError)
+ // A CorruptError should be returned immediately.
+ // Any other errors should be logged but we continue trying to
+ // read.
+ switch {
+ case os.IsNotExist(err):
continue
- } else {
+ case err == CorruptError:
+ return nil, err
+ default:
log.Printf("GetBlock: reading %s: %s\n", hash, err)
- return buf, err
}
} else {
// Success!
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list