[ARVADOS] created: 666eb6b6a02deacf2157248d01615c4c7539394c

git at public.curoverse.com git at public.curoverse.com
Wed May 6 13:29:27 EDT 2015


        at  666eb6b6a02deacf2157248d01615c4c7539394c (commit)


commit 666eb6b6a02deacf2157248d01615c4c7539394c
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed May 6 12:56:34 2015 -0400

    5745: Serialize writes and data reads, but allow concurrent requests
    to do read-only non-data operations (like finding existing blocks and
    checking free disk space) which are likely to be cached by the OS and
    therefore not involve any disk activity.
    
    Also:
    * Serialize Touch and Delete.
    * Make sure to close and delete tempfiles on write errors.
    * Update comments.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index d355e92..4b6a6a7 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -40,35 +40,19 @@ func MakeRESTRouter() *mux.Router {
 
 	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
 	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
-	// For IndexHandler we support:
-	//   /index           - returns all locators
-	//   /index/{prefix}  - returns all locators that begin with {prefix}
-	//      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
-	//      If {prefix} is the empty string, return an index of all locators
-	//      (so /index and /index/ behave identically)
-	//      A client may supply a full 32-digit locator string, in which
-	//      case the server will return an index with either zero or one
-	//      entries. This usage allows a client to check whether a block is
-	//      present, and its size and upload time, without retrieving the
-	//      entire block.
-	//
+	// List all blocks stored here. Privileged client only.
 	rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
-	rest.HandleFunc(
-		`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+	// List blocks stored here whose hash has the given prefix.
+	// Privileged client only.
+	rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+	// List volumes: path, device number, bytes used/avail.
 	rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
-	// The PullHandler and TrashHandler process "PUT /pull" and "PUT
-	// /trash" requests from Data Manager.  These requests instruct
-	// Keep to replicate or delete blocks; see
-	// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
-	// for more details.
-	//
-	// Each handler parses the JSON list of block management requests
-	// in the message body, and replaces any existing pull queue or
-	// trash queue with their contentes.
-	//
+	// Replace the current pull queue.
 	rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+	// Replace the current trash queue.
 	rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
 	// Any request which does not match any of these routes gets
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index c6cb00d..71e577f 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -129,7 +129,11 @@ func (vs *volumeSet) Set(value string) error {
 	if _, err := os.Stat(value); err != nil {
 		return err
 	}
-	*vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+	*vs = append(*vs, &UnixVolume{
+		root:      value,
+		serialize: flagSerializeIO,
+		readonly:  flagReadonly,
+	})
 	return nil
 }
 
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 3b7c993..b33f56e 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -10,98 +10,17 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 )
 
-// IORequests are encapsulated Get or Put requests.  They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
-	KeepGet IOMethod = iota
-	KeepPut
-)
-
-type IORequest struct {
-	method IOMethod
-	loc    string
-	data   []byte
-	reply  chan *IOResponse
-}
-
-type IOResponse struct {
-	data []byte
-	err  error
-}
-
-// A UnixVolume has the following properties:
-//
-//   root
-//       the path to the volume's root directory
-//   queue
-//       A channel of IORequests. If non-nil, all I/O requests for
-//       this volume should be queued on this channel; the result
-//       will be delivered on the IOResponse channel supplied in the
-//       request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-	root     string // path to this volume
-	queue    chan *IORequest
-	readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
-	for req := range v.queue {
-		var result IOResponse
-		switch req.method {
-		case KeepGet:
-			result.data, result.err = v.Read(req.loc)
-		case KeepPut:
-			result.err = v.Write(req.loc, req.data)
-		}
-		req.reply <- &result
-	}
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
-	v := &UnixVolume{
-		root:     root,
-		queue:    nil,
-		readonly: readonly,
-	}
-	if serialize {
-		v.queue = make(chan *IORequest)
-		go v.IOHandler()
-	}
-	return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-	if v.queue == nil {
-		return v.Read(loc)
-	}
-	reply := make(chan *IOResponse)
-	v.queue <- &IORequest{KeepGet, loc, nil, reply}
-	response := <-reply
-	return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
-	if v.readonly {
-		return MethodDisabledError
-	}
-	if v.queue == nil {
-		return v.Write(loc, block)
-	}
-	reply := make(chan *IOResponse)
-	v.queue <- &IORequest{KeepPut, loc, block, reply}
-	response := <-reply
-	return response.err
+	root      string // path to the volume's root directory
+	serialize bool
+	readonly  bool
+	mutex     sync.Mutex
 }
 
 func (v *UnixVolume) Touch(loc string) error {
@@ -114,6 +33,10 @@ func (v *UnixVolume) Touch(loc string) error {
 		return err
 	}
 	defer f.Close()
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	if e := lockfile(f); e != nil {
 		return e
 	}
@@ -132,28 +55,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	}
 }
 
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError.  It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
-	buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+	path := v.blockPath(loc)
+	if _, err := os.Stat(path); err != nil {
+		return nil, err
+	}
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
+	buf, err := ioutil.ReadFile(path)
 	return buf, err
 }
 
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+	if v.readonly {
+		return MethodDisabledError
+	}
 	if v.IsFull() {
 		return FullError
 	}
@@ -171,8 +98,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
 	}
 	bpath := v.blockPath(loc)
 
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	if _, err := tmpfile.Write(block); err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+		tmpfile.Close()
+		os.Remove(tmpfile.Name())
 		return err
 	}
 	if err := tmpfile.Close(); err != nil {
@@ -276,6 +209,10 @@ func (v *UnixVolume) Delete(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	p := v.blockPath(loc)
 	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
 	if err != nil {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 6b39f8f..1320d31 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -15,19 +15,20 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
 	if err != nil {
 		t.Fatal(err)
 	}
-	return MakeUnixVolume(d, serialize, readonly)
+	return &UnixVolume{
+		root:      d,
+		serialize: serialize,
+		readonly:  readonly,
+	}
 }
 
 func _teardown(v *UnixVolume) {
-	if v.queue != nil {
-		close(v.queue)
-	}
 	os.RemoveAll(v.root)
 }
 
-// store writes a Keep block directly into a UnixVolume, for testing
-// UnixVolume methods.
-//
+// _store writes a Keep block directly into a UnixVolume, bypassing
+// the overhead and safeguards of Put(). Useful for storing bogus data
+// and isolating unit tests from Put() behavior.
 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
 	blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
 	if err := os.MkdirAll(blockdir, 0755); err != nil {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list