[ARVADOS] updated: 41caa50724d03189c5b4104c52bd5253974cf535
git at public.curoverse.com
git at public.curoverse.com
Thu Apr 16 12:14:53 EDT 2015
Summary of changes:
services/api/app/models/user.rb | 2 +-
services/api/test/unit/user_test.rb | 3 ++
services/keepstore/handler_test.go | 40 +++++++++++++++++++++++++
services/keepstore/handlers.go | 2 +-
services/keepstore/keepstore.go | 4 +--
services/keepstore/keepstore_test.go | 10 ++-----
services/keepstore/volume.go | 3 ++
services/keepstore/volume_test.go | 54 ++++++++++++++++++++++++----------
services/keepstore/volume_unix_test.go | 33 +++++++++++++++++++++
9 files changed, 125 insertions(+), 26 deletions(-)
discards 9d1daf120d6f2f86fcfd54fb88c2dd5af1b59521 (commit)
via 41caa50724d03189c5b4104c52bd5253974cf535 (commit)
via 397191893819083925600a61e2f355a3b6513354 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (9d1daf120d6f2f86fcfd54fb88c2dd5af1b59521)
\
N -- N -- N (41caa50724d03189c5b4104c52bd5253974cf535)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 41caa50724d03189c5b4104c52bd5253974cf535
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Apr 16 12:16:53 2015 -0400
3145: Add -readonly flag. Accept multiple -volume arguments.
* Leave -volumes (and accept comma-separated dir lists) for backward
compatibility.
* Detect read-only filesystems when using /proc/mounts to discover
Keep volumes.
* VolumeManagers know whether volumes are writable, so it's not
necessary to wait in a serialize queue just to find out a volume
isn't writable.
* Simplify RRVolumeManager: use atomic.AddUint32() instead of channels
and goroutines.
* -serialize (like -readonly) now affects only the volumes that come
after it in the argument list (and automatically discovered
volumes). This makes it possible to have a mix of serialized and
non-serialized volumes.
* Fix some test cases that just skipped the test (instead of failing)
when encountering an error during test setup.
* Move MockVolume code from volume.go to volume_test.go.
* Clean up some over-indented code blocks, see
https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow
* Remove some stating-the-obvious/redundant comments and duplicate log
messages.
diff --git a/docker/keep/run-keep.in b/docker/keep/run-keep.in
index a0b4cb0..385f0e6 100755
--- a/docker/keep/run-keep.in
+++ b/docker/keep/run-keep.in
@@ -8,4 +8,4 @@ else
permission_args=""
fi
-exec keepstore $permission_args -listen=":25107" -volumes="/keep-data"
+exec keepstore $permission_args -listen=":25107" -volume="/keep-data"
diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py
index b9502f0..51fe43f 100644
--- a/sdk/python/tests/run_test_server.py
+++ b/sdk/python/tests/run_test_server.py
@@ -256,7 +256,7 @@ def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
port = find_available_port()
keep_cmd = ["keepstore",
- "-volumes={}".format(keep0),
+ "-volume={}".format(keep0),
"-listen=:{}".format(port),
"-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 9ad77bf..1765ddf 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -43,9 +43,9 @@ func TestGetHandler(t *testing.T) {
// Prepare two test Keep volumes. Our block is stored on the second volume.
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
if err := vols[0].Put(TEST_HASH, TEST_BLOCK); err != nil {
t.Error(err)
}
@@ -151,7 +151,7 @@ func TestPutHandler(t *testing.T) {
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
// --------------
// No server key.
@@ -218,6 +218,46 @@ func TestPutHandler(t *testing.T) {
TEST_HASH_PUT_RESPONSE, response)
}
+func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
+ defer teardown()
+ data_manager_token = "fake-data-manager-token"
+ vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+ vols[0].Readonly = true
+ KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+ defer KeepVM.Close()
+ IssueRequest(
+ &RequestTester{
+ method: "PUT",
+ uri: "/"+TEST_HASH,
+ request_body: TEST_BLOCK,
+ })
+ IssueRequest(
+ &RequestTester{
+ method: "DELETE",
+ uri: "/"+TEST_HASH,
+ request_body: TEST_BLOCK,
+ api_token: data_manager_token,
+ })
+ type expect struct {
+ volnum int
+ method string
+ callcount int
+ }
+ for _, e := range []expect{
+ {0, "Get", 0},
+ {0, "Touch", 0},
+ {0, "Put", 0},
+ {0, "Delete", 0},
+ {1, "Get", 1},
+ {1, "Put", 1},
+ {1, "Delete", 1},
+ } {
+ if calls := vols[e.volnum].CallCount(e.method); calls != e.callcount {
+ t.Errorf("Got %d %s() on vol %d, expect %d", calls, e.method, e.volnum, e.callcount)
+ }
+ }
+}
+
// Test /index requests:
// - unauthenticated /index request
// - unauthenticated /index/prefix request
@@ -236,9 +276,9 @@ func TestIndexHandler(t *testing.T) {
// Include multiple blocks on different volumes, and
// some metadata files (which should be omitted from index listings)
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
@@ -395,9 +435,9 @@ func TestDeleteHandler(t *testing.T) {
// Include multiple blocks on different volumes, and
// some metadata files (which should be omitted from index listings)
KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Quit()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
// Explicitly set the permission_ttl to 0 for these
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f120f05..9e178fb 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -8,7 +8,6 @@ package main
// StatusHandler (GET /status.json)
import (
- "bufio"
"bytes"
"container/list"
"crypto/md5"
@@ -83,38 +82,6 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes scans all mounted volumes on the system for Keep
-// volumes, and returns a list of matching paths.
-//
-// A device is assumed to be a Keep volume if it is a normal or tmpfs
-// volume and has a "/keep" directory directly underneath the mount
-// point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
hash := mux.Vars(req)["hash"]
@@ -235,7 +202,7 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
prefix := mux.Vars(req)["prefix"]
var index string
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllReadable() {
index = index + vol.Index(prefix)
}
resp.Write([]byte(index))
@@ -282,8 +249,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
func GetNodeStatus() *NodeStatus {
st := new(NodeStatus)
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
+ st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
+ for i, vol := range KeepVM.AllReadable() {
st.Volumes[i] = vol.Status()
}
return st
@@ -358,14 +325,14 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- // Delete copies of this block from all available volumes. Report
- // how many blocks were successfully and unsuccessfully
- // deleted.
+ // Delete copies of this block from all available volumes.
+ // Report how many blocks were successfully deleted, and how
+ // many were found on writable volumes but not deleted.
var result struct {
Deleted int `json:"copies_deleted"`
Failed int `json:"copies_failed"`
}
- for _, vol := range KeepVM.Volumes() {
+ for _, vol := range KeepVM.AllWritable() {
if err := vol.Delete(hash); err == nil {
result.Deleted++
} else if os.IsNotExist(err) {
@@ -528,52 +495,54 @@ func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
// Attempt to read the requested hash from a keep volume.
error_to_caller := NotFoundError
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
+ var vols []Volume
+ if update_timestamp {
+ // Pointless to find the block on an unwritable volume
+ // because Touch() will fail -- this is as good as
+ // "not found" for purposes of callers who need to
+ // update_timestamp.
+ vols = KeepVM.AllWritable()
+ } else {
+ vols = KeepVM.AllReadable()
+ }
+
+ for _, vol := range vols {
+ buf, err := vol.Get(hash)
+ if err != nil {
+ // IsNotExist is an expected error and may be
+ // ignored. All other errors are logged. In
+ // any case we continue trying to read other
+ // volumes. If all volumes report IsNotExist,
+ // we return a NotFoundError.
+ if !os.IsNotExist(err) {
log.Printf("GetBlock: reading %s: %s\n", hash, err)
}
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- // Update the timestamp if the caller requested.
- // If we could not update the timestamp, continue looking on
- // other volumes.
- if update_timestamp {
- if vol.Touch(hash) != nil {
- continue
- }
- }
- return buf, nil
+ continue
+ }
+ // Check the file checksum.
+ //
+ filehash := fmt.Sprintf("%x", md5.Sum(buf))
+ if filehash != hash {
+ // TODO: Try harder to tell a sysadmin about
+ // this.
+ log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
+ vol, hash, filehash)
+ error_to_caller = DiskHashError
+ continue
+ }
+ if error_to_caller == DiskHashError {
+ log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
+ vol, hash)
+ }
+ if update_timestamp {
+ if err := vol.Touch(hash); err != nil {
+ error_to_caller = GenericError
+ log.Printf("%s: Touch %s failed: %s",
+ vol, hash, error_to_caller)
+ continue
}
}
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
+ return buf, nil
}
return nil, error_to_caller
}
@@ -630,31 +599,39 @@ func PutBlock(block []byte, hash string) error {
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
+ if vol := KeepVM.NextWritable(); vol != nil {
+ if err := vol.Put(hash, block); err == nil {
+ return nil // success!
}
+ }
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
+ writables := KeepVM.AllWritable()
+ if len(writables) == 0 {
+ log.Print("No writable volumes.")
+ return FullError
+ }
+
+ allFull := true
+ for _, vol := range writables {
+ err := vol.Put(hash, block)
+ if err == nil {
+ return nil // success!
}
+ if err != FullError {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull = false
+ log.Printf("%s: Write(%s): %s\n", vol, hash, err)
+ }
+ }
+
+ if allFull {
+ log.Print("All volumes are full.")
+ return FullError
+ } else {
+ // Already logged the non-full errors.
+ return GenericError
}
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index c1371dc..5333625 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -1,7 +1,9 @@
package main
import (
+ "bufio"
"bytes"
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -103,6 +105,88 @@ var KeepVM VolumeManager
var pullq *WorkQueue
var trashq *WorkQueue
+var (
+ flagSerializeIO bool
+ flagReadonly bool
+)
+type volumeSet []Volume
+
+func (vs *volumeSet) Set(value string) error {
+ if dirs := strings.Split(value, ","); len(dirs) > 1 {
+ log.Print("DEPRECATED: using comma-separated volume list.")
+ for _, dir := range dirs {
+ if err := vs.Set(dir); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if len(value) == 0 || value[0] != '/' {
+ return errors.New("Invalid volume: must begin with '/'.")
+ }
+ if _, err := os.Stat(value); err != nil {
+ return err
+ }
+ *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+ return nil
+}
+
+func (vs *volumeSet) String() string {
+ s := "["
+ for i, v := range *vs {
+ if i > 0 {
+ s = s + " "
+ }
+ s = s + v.String()
+ }
+ return s + "]"
+}
+
+// Discover adds a volume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *volumeSet) Discover() int {
+ added := 0
+ f, err := os.Open(PROC_MOUNTS)
+ if err != nil {
+ log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
+ }
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ args := strings.Fields(scanner.Text())
+ if err := scanner.Err(); err != nil {
+ log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
+ }
+ dev, mount := args[0], args[1]
+ if mount == "/" {
+ continue
+ }
+ if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+ continue
+ }
+ keepdir := mount + "/keep"
+ if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+ continue
+ }
+ // Set the -readonly flag (but only for this volume)
+ // if the filesystem is mounted readonly.
+ flagReadonlyWas := flagReadonly
+ for _, fsopt := range strings.Split(args[3], ",") {
+ if fsopt == "ro" {
+ flagReadonly = true
+ break
+ }
+ if fsopt == "rw" {
+ break
+ }
+ }
+ vs.Set(keepdir)
+ flagReadonly = flagReadonlyWas
+ added++
+ }
+ return added
+}
+
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
@@ -111,32 +195,12 @@ var trashq *WorkQueue
func main() {
log.Println("Keep started: pid", os.Getpid())
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
-
var (
data_manager_token_file string
listen string
permission_key_file string
permission_ttl_sec int
- serialize_io bool
- volumearg string
+ volumes volumeSet
pidfile string
)
flag.StringVar(
@@ -154,9 +218,7 @@ func main() {
&listen,
"listen",
DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
+ "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
flag.BoolVar(
&never_delete,
"never-delete",
@@ -176,20 +238,23 @@ func main() {
"Expiration time (in seconds) for newly generated permission "+
"signatures.")
flag.BoolVar(
- &serialize_io,
+ &flagSerializeIO,
"serialize",
false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
+ "Serialize read and write operations on the following volumes.")
+ flag.BoolVar(
+ &flagReadonly,
+ "readonly",
+ false,
+ "Do not write, delete, or touch anything on the following volumes.")
+ flag.Var(
+ &volumes,
"volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
-
+ "Deprecated synonym for -volume.")
+ flag.Var(
+ &volumes,
+ "volume",
+ "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
flag.StringVar(
&pidfile,
"pid",
@@ -198,31 +263,14 @@ func main() {
flag.Parse()
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
- }
-
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
+ if len(volumes) == 0 {
+ if volumes.Discover() == 0 {
+ log.Fatal("No volumes found.")
}
}
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
+ for _, v := range volumes {
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
// Initialize data manager token and permission key.
@@ -261,7 +309,7 @@ func main() {
}
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
+ KeepVM = MakeRRVolumeManager(volumes)
// Tell the built-in HTTP server to direct all requests to the REST router.
loggingRouter := MakeLoggingRESTRouter()
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 686f502..a6e29f4 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -52,9 +52,9 @@ func TestGetBlock(t *testing.T) {
// Prepare two test Keep volumes. Our block is stored on the second volume.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
t.Error(err)
}
@@ -77,7 +77,7 @@ func TestGetBlockMissing(t *testing.T) {
// Create two empty test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that GetBlock returns failure.
result, err := GetBlock(TEST_HASH, false)
@@ -95,9 +95,9 @@ func TestGetBlockCorrupt(t *testing.T) {
// Create two test Keep volumes and store a corrupt block in one.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, BAD_BLOCK)
// Check that GetBlock returns failure.
@@ -119,15 +119,15 @@ func TestPutBlockOK(t *testing.T) {
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Fatalf("PutBlock: %v", err)
}
- vols := KeepVM.Volumes()
- result, err := vols[0].Get(TEST_HASH)
+ vols := KeepVM.AllReadable()
+ result, err := vols[1].Get(TEST_HASH)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
@@ -146,9 +146,9 @@ func TestPutBlockOneVol(t *testing.T) {
// Create two test Keep volumes, but cripple one of them.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
@@ -176,7 +176,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Check that PutBlock returns the expected error when the hash does
// not match the block.
@@ -200,10 +200,10 @@ func TestPutBlockCorrupt(t *testing.T) {
// Create two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Store a corrupted block under TEST_HASH.
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
vols[0].Put(TEST_HASH, BAD_BLOCK)
if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
t.Errorf("PutBlock: %v", err)
@@ -231,7 +231,7 @@ func TestPutBlockCollision(t *testing.T) {
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
@@ -254,8 +254,8 @@ func TestPutBlockTouchFails(t *testing.T) {
// Prepare two test Keep volumes.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
- vols := KeepVM.Volumes()
+ defer KeepVM.Close()
+ vols := KeepVM.AllWritable()
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
@@ -293,29 +293,16 @@ func TestPutBlockTouchFails(t *testing.T) {
}
}
-// ========================================
-// FindKeepVolumes tests.
-// ========================================
-
-// TestFindKeepVolumes
-// Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
-// directories at the top level.
-//
-func TestFindKeepVolumes(t *testing.T) {
- var tempVols [2]string
+func TestDiscoverTmpfs(t *testing.T) {
+ var tempVols [4]string
var err error
- defer func() {
- for _, path := range tempVols {
- os.RemoveAll(path)
- }
- }()
-
- // Create two directories suitable for using as keep volumes.
+ // Create some directories suitable for using as keep volumes.
for i := range tempVols {
if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
t.Fatal(err)
}
+ defer os.RemoveAll(tempVols[i])
tempVols[i] = tempVols[i] + "/keep"
if err = os.Mkdir(tempVols[i], 0755); err != nil {
t.Fatal(err)
@@ -323,53 +310,69 @@ func TestFindKeepVolumes(t *testing.T) {
}
// Set up a bogus PROC_MOUNTS file.
- if f, err := ioutil.TempFile("", "keeptest"); err == nil {
- for _, vol := range tempVols {
- fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+ f, err := ioutil.TempFile("", "keeptest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ for i, vol := range tempVols {
+ // Add readonly mount points at odd indexes.
+ var opts string
+ switch i % 2 {
+ case 0:
+ opts = "rw,nosuid,nodev,noexec"
+ case 1:
+ opts = "nosuid,nodev,noexec,ro"
}
- f.Close()
- PROC_MOUNTS = f.Name()
-
- // Check that FindKeepVolumes finds the temp volumes.
- resultVols := FindKeepVolumes()
- if len(tempVols) != len(resultVols) {
- t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
- len(tempVols), len(resultVols))
+ fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
+ }
+ f.Close()
+ PROC_MOUNTS = f.Name()
+
+ var resultVols volumeSet
+ added := resultVols.Discover()
+
+ if added != len(resultVols) {
+ t.Errorf("Discover returned %d, but added %d volumes",
+ added, len(resultVols))
+ }
+ if added != len(tempVols) {
+ t.Errorf("Discover returned %d but we set up %d volumes",
+ added, len(tempVols))
+ }
+ for i, tmpdir := range tempVols {
+ if tmpdir != resultVols[i].(*UnixVolume).root {
+ t.Errorf("Discover returned %s, expected %s\n",
+ resultVols[i].(*UnixVolume).root, tmpdir)
}
- for i := range tempVols {
- if tempVols[i] != resultVols[i] {
- t.Errorf("FindKeepVolumes returned %s, expected %s\n",
- resultVols[i], tempVols[i])
- }
+ if expectReadonly := i % 2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ t.Errorf("Discover added %s with readonly=%v, should be %v",
+ tmpdir, !expectReadonly, expectReadonly)
}
-
- os.Remove(f.Name())
}
}
-// TestFindKeepVolumesFail
-// When no Keep volumes are present, FindKeepVolumes returns an empty slice.
-//
-func TestFindKeepVolumesFail(t *testing.T) {
+func TestDiscoverNone(t *testing.T) {
defer teardown()
// Set up a bogus PROC_MOUNTS file with no Keep vols.
- if f, err := ioutil.TempFile("", "keeptest"); err == nil {
- fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
- fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
- fmt.Fprintln(f, "proc /proc proc opts 0 0")
- fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
- fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
- f.Close()
- PROC_MOUNTS = f.Name()
-
- // Check that FindKeepVolumes returns an empty array.
- resultVols := FindKeepVolumes()
- if len(resultVols) != 0 {
- t.Fatalf("FindKeepVolumes returned %v", resultVols)
- }
-
- os.Remove(PROC_MOUNTS)
+ f, err := ioutil.TempFile("", "keeptest")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.Remove(f.Name())
+ fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
+ fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
+ fmt.Fprintln(f, "proc /proc proc opts 0 0")
+ fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
+ fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
+ f.Close()
+ PROC_MOUNTS = f.Name()
+
+ var resultVols volumeSet
+ added := resultVols.Discover()
+ if added != 0 || len(resultVols) != 0 {
+ t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
}
}
@@ -382,9 +385,9 @@ func TestIndex(t *testing.T) {
// Include multiple blocks on different volumes, and
// some metadata files.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
@@ -420,9 +423,9 @@ func TestNodeStatus(t *testing.T) {
// Set up test Keep volumes with some blocks.
KeepVM = MakeTestVolumeManager(2)
- defer func() { KeepVM.Quit() }()
+ defer KeepVM.Close()
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllReadable()
vols[0].Put(TEST_HASH, TEST_BLOCK)
vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
@@ -450,10 +453,8 @@ func TestNodeStatus(t *testing.T) {
// Helper functions for unit tests.
// ========================================
-// MakeTestVolumeManager
-// Creates and returns a RRVolumeManager with the specified number
-// of MockVolumes.
-//
+// MakeTestVolumeManager returns a RRVolumeManager with the specified
+// number of MockVolumes.
func MakeTestVolumeManager(num_volumes int) VolumeManager {
vols := make([]Volume, num_volumes)
for i := range vols {
@@ -462,9 +463,7 @@ func MakeTestVolumeManager(num_volumes int) VolumeManager {
return MakeRRVolumeManager(vols)
}
-// teardown
-// Cleanup to perform after each test.
-//
+// teardown cleans up after each test.
func teardown() {
data_manager_token = ""
enforce_permissions = false
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 72b9a46..ca26912 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -29,15 +29,14 @@ func RunTrashWorker(trashq *WorkQueue) {
*/
func TrashItem(trashRequest TrashRequest) (err error) {
// Verify if the block is to be deleted based on its Mtime
- for _, volume := range KeepVM.Volumes() {
+ for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err == nil {
- if trashRequest.BlockMtime == mtime.Unix() {
- currentTime := time.Now().Unix()
- if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
- err = volume.Delete(trashRequest.Locator)
- }
- }
+ if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ continue
+ }
+ currentTime := time.Now().Unix()
+ if time.Duration(currentTime-trashRequest.BlockMtime)*time.Second >= permission_ttl {
+ err = volume.Delete(trashRequest.Locator)
}
}
return
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 3031c25..af4040a 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -187,9 +187,10 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
// Create Keep Volumes
KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
// Put test content
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
if testData.CreateData {
vols[0].Put(testData.Locator1, testData.Block1)
vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
@@ -263,7 +264,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
if (testData.ExpectLocator1) &&
(testData.Locator1 == testData.Locator2) {
locatorFoundIn := 0
- for _, volume := range KeepVM.Volumes() {
+ for _, volume := range KeepVM.AllReadable() {
if _, err := volume.Get(testData.Locator1); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
@@ -276,5 +277,4 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
// Done
permission_ttl = actual_permission_ttl
trashq.Close()
- KeepVM.Quit()
}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index e7683ee..0f9fcff 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -5,10 +5,7 @@
package main
import (
- "errors"
- "fmt"
- "os"
- "strings"
+ "sync/atomic"
"time"
)
@@ -21,163 +18,59 @@ type Volume interface {
Delete(loc string) error
Status() *VolumeStatus
String() string
+ Writable() bool
}
-// MockVolumes are Volumes used to test the Keep front end.
-//
-// If the Bad field is true, this volume should return an error
-// on all writes and puts.
-//
-// The Touchable field signifies whether the Touch method will
-// succeed. Defaults to true. Note that Bad and Touchable are
-// independent: a MockVolume may be set up so that Put fails but Touch
-// works or vice versa.
-//
-// TODO(twp): rename Bad to something more descriptive, e.g. Writable,
-// and make sure that the tests that rely on it are testing the right
-// thing. We may need to simulate Writable, Touchable and Corrupt
-// volumes in different ways.
-//
-type MockVolume struct {
- Store map[string][]byte
- Timestamps map[string]time.Time
- Bad bool
- Touchable bool
-}
-
-func CreateMockVolume() *MockVolume {
- return &MockVolume{
- Store: make(map[string][]byte),
- Timestamps: make(map[string]time.Time),
- Bad: false,
- Touchable: true,
- }
-}
-
-func (v *MockVolume) Get(loc string) ([]byte, error) {
- if v.Bad {
- return nil, errors.New("Bad volume")
- } else if block, ok := v.Store[loc]; ok {
- return block, nil
- }
- return nil, os.ErrNotExist
+// A VolumeManager tells callers which volumes can read, which volumes
+// can write, and on which volume the next write should be attempted.
+type VolumeManager interface {
+ // AllReadable returns all volumes.
+ AllReadable() []Volume
+ // AllWritable returns all volumes that aren't known to be in
+ // a read-only state. (There is no guarantee that a write to
+ // one will succeed, though.)
+ AllWritable() []Volume
+ // NextWritable returns the volume where the next new block
+ // should be written. A VolumeManager can select a volume in
+ // order to distribute activity across spindles, fill up disks
+ // with more free space, etc.
+ NextWritable() Volume
+ // Close shuts down the volume manager cleanly.
+ Close()
}
-func (v *MockVolume) Put(loc string, block []byte) error {
- if v.Bad {
- return errors.New("Bad volume")
- }
- v.Store[loc] = block
- return v.Touch(loc)
+type RRVolumeManager struct {
+ readables []Volume
+ writables []Volume
+ counter uint32
}
-func (v *MockVolume) Touch(loc string) error {
- if v.Touchable {
- v.Timestamps[loc] = time.Now()
- return nil
+func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
+ vm := &RRVolumeManager{}
+ for _, v := range volumes {
+ vm.readables = append(vm.readables, v)
+ if v.Writable() {
+ vm.writables = append(vm.writables, v)
+ }
}
- return errors.New("Touch failed")
+ return vm
}
-func (v *MockVolume) Mtime(loc string) (time.Time, error) {
- var mtime time.Time
- var err error
- if v.Bad {
- err = errors.New("Bad volume")
- } else if t, ok := v.Timestamps[loc]; ok {
- mtime = t
- } else {
- err = os.ErrNotExist
- }
- return mtime, err
+func (vm *RRVolumeManager) AllReadable() []Volume {
+ return vm.readables
}
-func (v *MockVolume) Index(prefix string) string {
- var result string
- for loc, block := range v.Store {
- if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
- result = result + fmt.Sprintf("%s+%d %d\n",
- loc, len(block), 123456789)
- }
- }
- return result
+func (vm *RRVolumeManager) AllWritable() []Volume {
+ return vm.writables
}
-func (v *MockVolume) Delete(loc string) error {
- if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < permission_ttl {
- return nil
- }
- delete(v.Store, loc)
+func (vm *RRVolumeManager) NextWritable() Volume {
+ if len(vm.writables) == 0 {
return nil
}
- return os.ErrNotExist
-}
-
-func (v *MockVolume) Status() *VolumeStatus {
- var used uint64
- for _, block := range v.Store {
- used = used + uint64(len(block))
- }
- return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
-}
-
-func (v *MockVolume) String() string {
- return "[MockVolume]"
-}
-
-// A VolumeManager manages a collection of volumes.
-//
-// - Volumes is a slice of available Volumes.
-// - Choose() returns a Volume suitable for writing to.
-// - Quit() instructs the VolumeManager to shut down gracefully.
-//
-type VolumeManager interface {
- Volumes() []Volume
- Choose() Volume
- Quit()
-}
-
-type RRVolumeManager struct {
- volumes []Volume
- nextwrite chan Volume
- quit chan int
-}
-
-func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
- // Create a new VolumeManager struct with the specified volumes,
- // and with new Nextwrite and Quit channels.
- // The Quit channel is buffered with a capacity of 1 so that
- // another routine may write to it without blocking.
- vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
-
- // This goroutine implements round-robin volume selection.
- // It sends each available Volume in turn to the Nextwrite
- // channel, until receiving a notification on the Quit channel
- // that it should terminate.
- go func() {
- var i int = 0
- for {
- select {
- case <-vm.quit:
- return
- case vm.nextwrite <- vm.volumes[i]:
- i = (i + 1) % len(vm.volumes)
- }
- }
- }()
-
- return vm
-}
-
-func (vm *RRVolumeManager) Volumes() []Volume {
- return vm.volumes
-}
-
-func (vm *RRVolumeManager) Choose() Volume {
- return <-vm.nextwrite
+ i := atomic.AddUint32(&vm.counter, 1)
+ return vm.writables[i % uint32(len(vm.writables))]
}
-func (vm *RRVolumeManager) Quit() {
- vm.quit <- 1
+func (vm *RRVolumeManager) Close() {
}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
new file mode 100644
index 0000000..e93bb03
--- /dev/null
+++ b/services/keepstore/volume_test.go
@@ -0,0 +1,151 @@
+package main
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "time"
+)
+
+// MockVolumes are test doubles for Volumes, used to test handlers.
+type MockVolume struct {
+ Store map[string][]byte
+ Timestamps map[string]time.Time
+ // Bad volumes return an error for every operation.
+ Bad bool
+ // Touchable volumes' Touch() method succeeds for a locator
+ // that has been Put().
+ Touchable bool
+ // Readonly volumes return an error for Put, Delete, and
+ // Touch.
+ Readonly bool
+ called map[string]int
+ mutex sync.Mutex
+}
+
+// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
+// volume.
+func CreateMockVolume() *MockVolume {
+ return &MockVolume{
+ Store: make(map[string][]byte),
+ Timestamps: make(map[string]time.Time),
+ Bad: false,
+ Touchable: true,
+ Readonly: false,
+ called: map[string]int{},
+ }
+}
+
+// CallCount returns how many times the named method has been called.
+func (v *MockVolume) CallCount(method string) int {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ if c, ok := v.called[method]; !ok {
+ return 0
+ } else {
+ return c
+ }
+}
+
+func (v *MockVolume) gotCall(method string) {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ if _, ok := v.called[method]; !ok {
+ v.called[method] = 1
+ } else {
+ v.called[method]++
+ }
+}
+
+func (v *MockVolume) Get(loc string) ([]byte, error) {
+ v.gotCall("Get")
+ if v.Bad {
+ return nil, errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ return block, nil
+ }
+ return nil, os.ErrNotExist
+}
+
+func (v *MockVolume) Put(loc string, block []byte) error {
+ v.gotCall("Put")
+ if v.Bad {
+ return errors.New("Bad volume")
+ }
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ v.Store[loc] = block
+ return v.Touch(loc)
+}
+
+func (v *MockVolume) Touch(loc string) error {
+ v.gotCall("Touch")
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ if v.Touchable {
+ v.Timestamps[loc] = time.Now()
+ return nil
+ }
+ return errors.New("Touch failed")
+}
+
+func (v *MockVolume) Mtime(loc string) (time.Time, error) {
+ v.gotCall("Mtime")
+ var mtime time.Time
+ var err error
+ if v.Bad {
+ err = errors.New("Bad volume")
+ } else if t, ok := v.Timestamps[loc]; ok {
+ mtime = t
+ } else {
+ err = os.ErrNotExist
+ }
+ return mtime, err
+}
+
+func (v *MockVolume) Index(prefix string) string {
+ v.gotCall("Index")
+ var result string
+ for loc, block := range v.Store {
+ if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
+ result = result + fmt.Sprintf("%s+%d %d\n",
+ loc, len(block), 123456789)
+ }
+ }
+ return result
+}
+
+func (v *MockVolume) Delete(loc string) error {
+ v.gotCall("Delete")
+ if v.Readonly {
+ return MethodDisabledError
+ }
+ if _, ok := v.Store[loc]; ok {
+ if time.Since(v.Timestamps[loc]) < permission_ttl {
+ return nil
+ }
+ delete(v.Store, loc)
+ return nil
+ }
+ return os.ErrNotExist
+}
+
+func (v *MockVolume) Status() *VolumeStatus {
+ var used uint64
+ for _, block := range v.Store {
+ used = used + uint64(len(block))
+ }
+ return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+}
+
+func (v *MockVolume) String() string {
+ return "[MockVolume]"
+}
+
+func (v *MockVolume) Writable() bool {
+ return !v.Readonly
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 84877c0..20bc9c5 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -50,8 +50,9 @@ type IOResponse struct {
// request.
//
type UnixVolume struct {
- root string // path to this volume
- queue chan *IORequest
+ root string // path to this volume
+ queue chan *IORequest
+ readonly bool
}
func (v *UnixVolume) IOHandler() {
@@ -67,14 +68,17 @@ func (v *UnixVolume) IOHandler() {
}
}
-func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
+func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
+ v := &UnixVolume{
+ root: root,
+ queue: nil,
+ readonly: readonly,
+ }
if serialize {
- v = UnixVolume{root, make(chan *IORequest)}
+ v.queue =make(chan *IORequest)
go v.IOHandler()
- } else {
- v = UnixVolume{root, nil}
}
- return
+ return v
}
func (v *UnixVolume) Get(loc string) ([]byte, error) {
@@ -88,6 +92,9 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
}
func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if v.queue == nil {
return v.Write(loc, block)
}
@@ -98,6 +105,9 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
}
func (v *UnixVolume) Touch(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
@@ -263,6 +273,9 @@ func (v *UnixVolume) Delete(loc string) error {
// Delete() will read the correct up-to-date timestamp and choose not to
// delete the file.
+ if v.readonly {
+ return MethodDisabledError
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
@@ -350,6 +363,10 @@ func (v *UnixVolume) String() string {
return fmt.Sprintf("[UnixVolume %s]", v.root)
}
+func (v *UnixVolume) Writable() bool {
+ return !v.readonly
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 7a10fc5..6b39f8f 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -10,15 +10,15 @@ import (
"time"
)
-func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
+func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
d, err := ioutil.TempDir("", "volume_test")
if err != nil {
t.Fatal(err)
}
- return MakeUnixVolume(d, serialize)
+ return MakeUnixVolume(d, serialize, readonly)
}
-func _teardown(v UnixVolume) {
+func _teardown(v *UnixVolume) {
if v.queue != nil {
close(v.queue)
}
@@ -28,7 +28,7 @@ func _teardown(v UnixVolume) {
// store writes a Keep block directly into a UnixVolume, for testing
// UnixVolume methods.
//
-func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
+func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
t.Fatal(err)
@@ -44,7 +44,7 @@ func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
}
func TestGet(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
@@ -58,7 +58,7 @@ func TestGet(t *testing.T) {
}
func TestGetNotFound(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
@@ -74,7 +74,7 @@ func TestGetNotFound(t *testing.T) {
}
func TestPut(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
err := v.Put(TEST_HASH, TEST_BLOCK)
@@ -91,7 +91,7 @@ func TestPut(t *testing.T) {
}
func TestPutBadVolume(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
os.Chmod(v.root, 000)
@@ -101,11 +101,44 @@ func TestPutBadVolume(t *testing.T) {
}
}
+func TestUnixVolumeReadonly(t *testing.T) {
+ v := TempUnixVolume(t, false, false)
+ defer _teardown(v)
+
+ // First write something before marking readonly
+ err := v.Put(TEST_HASH, TEST_BLOCK)
+ if err != nil {
+ t.Error("got err %v, expected nil", err)
+ }
+
+ v.readonly = true
+
+ _, err = v.Get(TEST_HASH)
+ if err != nil {
+ t.Error("got err %v, expected nil", err)
+ }
+
+ err = v.Put(TEST_HASH, TEST_BLOCK)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+
+ err = v.Touch(TEST_HASH)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+
+ err = v.Delete(TEST_HASH)
+ if err != MethodDisabledError {
+ t.Error("got err %v, expected MethodDisabledError", err)
+ }
+}
+
// TestPutTouch
// Test that when applying PUT to a block that already exists,
// the block's modification time is updated.
func TestPutTouch(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
@@ -165,7 +198,7 @@ func TestPutTouch(t *testing.T) {
//
func TestGetSerialized(t *testing.T) {
// Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true)
+ v := TempUnixVolume(t, true, false)
defer _teardown(v)
_store(t, v, TEST_HASH, TEST_BLOCK)
@@ -214,7 +247,7 @@ func TestGetSerialized(t *testing.T) {
func TestPutSerialized(t *testing.T) {
// Create a volume with I/O serialization enabled.
- v := TempUnixVolume(t, true)
+ v := TempUnixVolume(t, true, false)
defer _teardown(v)
sem := make(chan int)
@@ -274,7 +307,7 @@ func TestPutSerialized(t *testing.T) {
}
func TestIsFull(t *testing.T) {
- v := TempUnixVolume(t, false)
+ v := TempUnixVolume(t, false, false)
defer _teardown(v)
full_path := v.root + "/full"
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list