[ARVADOS] created: 666eb6b6a02deacf2157248d01615c4c7539394c
git at public.curoverse.com
git at public.curoverse.com
Wed May 6 13:29:27 EDT 2015
at 666eb6b6a02deacf2157248d01615c4c7539394c (commit)
commit 666eb6b6a02deacf2157248d01615c4c7539394c
Author: Tom Clegg <tom at curoverse.com>
Date: Wed May 6 12:56:34 2015 -0400
5745: Serialize writes and data reads, but allow concurrent requests
to do read-only non-data operations (like finding existing blocks and
checking free disk space) which are likely to be cached by the OS and
therefore not involve any disk activity.
Also:
* Serialize Touch and Delete.
* Make sure to close and delete tempfiles on write errors.
* Update comments.
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index d355e92..4b6a6a7 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -40,35 +40,19 @@ func MakeRESTRouter() *mux.Router {
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
+ // List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // List blocks stored here whose hash has the given prefix.
+ // Privileged client only.
+ rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+ // List volumes: path, device number, bytes used/avail.
rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
- // The PullHandler and TrashHandler process "PUT /pull" and "PUT
- // /trash" requests from Data Manager. These requests instruct
- // Keep to replicate or delete blocks; see
- // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
- // for more details.
- //
- // Each handler parses the JSON list of block management requests
- // in the message body, and replaces any existing pull queue or
- // trash queue with their contentes.
- //
+ // Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+ // Replace the current trash queue.
rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
// Any request which does not match any of these routes gets
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index c6cb00d..71e577f 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -129,7 +129,11 @@ func (vs *volumeSet) Set(value string) error {
if _, err := os.Stat(value); err != nil {
return err
}
- *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+ *vs = append(*vs, &UnixVolume{
+ root: value,
+ serialize: flagSerializeIO,
+ readonly: flagReadonly,
+ })
return nil
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 3b7c993..b33f56e 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -10,98 +10,17 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
-// IORequests are encapsulated Get or Put requests. They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
- KeepGet IOMethod = iota
- KeepPut
-)
-
-type IORequest struct {
- method IOMethod
- loc string
- data []byte
- reply chan *IOResponse
-}
-
-type IOResponse struct {
- data []byte
- err error
-}
-
-// A UnixVolume has the following properties:
-//
-// root
-// the path to the volume's root directory
-// queue
-// A channel of IORequests. If non-nil, all I/O requests for
-// this volume should be queued on this channel; the result
-// will be delivered on the IOResponse channel supplied in the
-// request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- root string // path to this volume
- queue chan *IORequest
- readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
- for req := range v.queue {
- var result IOResponse
- switch req.method {
- case KeepGet:
- result.data, result.err = v.Read(req.loc)
- case KeepPut:
- result.err = v.Write(req.loc, req.data)
- }
- req.reply <- &result
- }
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
- v := &UnixVolume{
- root: root,
- queue: nil,
- readonly: readonly,
- }
- if serialize {
- v.queue = make(chan *IORequest)
- go v.IOHandler()
- }
- return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
- if v.queue == nil {
- return v.Read(loc)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepGet, loc, nil, reply}
- response := <-reply
- return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
- return MethodDisabledError
- }
- if v.queue == nil {
- return v.Write(loc, block)
- }
- reply := make(chan *IOResponse)
- v.queue <- &IORequest{KeepPut, loc, block, reply}
- response := <-reply
- return response.err
+ root string // path to the volume's root directory
+ serialize bool
+ readonly bool
+ mutex sync.Mutex
}
func (v *UnixVolume) Touch(loc string) error {
@@ -114,6 +33,10 @@ func (v *UnixVolume) Touch(loc string) error {
return err
}
defer f.Close()
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if e := lockfile(f); e != nil {
return e
}
@@ -132,28 +55,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
}
}
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
// returns its contents as a byte slice.
//
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError. It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
- buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+ path := v.blockPath(loc)
+ if _, err := os.Stat(path); err != nil {
+ return nil, err
+ }
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
+ buf, err := ioutil.ReadFile(path)
return buf, err
}
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if v.IsFull() {
return FullError
}
@@ -171,8 +98,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
}
bpath := v.blockPath(loc)
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+ tmpfile.Close()
+ os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
@@ -276,6 +209,10 @@ func (v *UnixVolume) Delete(loc string) error {
if v.readonly {
return MethodDisabledError
}
+ if v.serialize {
+ v.mutex.Lock()
+ defer v.mutex.Unlock()
+ }
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 6b39f8f..1320d31 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -15,19 +15,20 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
if err != nil {
t.Fatal(err)
}
- return MakeUnixVolume(d, serialize, readonly)
+ return &UnixVolume{
+ root: d,
+ serialize: serialize,
+ readonly: readonly,
+ }
}
func _teardown(v *UnixVolume) {
- if v.queue != nil {
- close(v.queue)
- }
os.RemoveAll(v.root)
}
-// store writes a Keep block directly into a UnixVolume, for testing
-// UnixVolume methods.
-//
+// _store writes a Keep block directly into a UnixVolume, bypassing
+// the overhead and safeguards of Put(). Useful for storing bogus data
+// and isolating unit tests from Put() behavior.
func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
if err := os.MkdirAll(blockdir, 0755); err != nil {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list