[ARVADOS] updated: e6c90436e715fd6da88782cbe3f72bd6c2443036

git at public.curoverse.com git at public.curoverse.com
Thu May 7 12:50:09 EDT 2015


Summary of changes:
 services/crunchstat/crunchstat.go                  |  8 +++++---
 services/keepstore/pull_worker_integration_test.go | 19 +++++++++++++++++--
 services/keepstore/pull_worker_test.go             | 17 +++++++++--------
 3 files changed, 31 insertions(+), 13 deletions(-)

  discards  867263b6513f899803847c6dacb6ea2e4d4df4b9 (commit)
  discards  666eb6b6a02deacf2157248d01615c4c7539394c (commit)
       via  e6c90436e715fd6da88782cbe3f72bd6c2443036 (commit)
       via  cc303a0ef797c1c752b4fd86e48e2c84fc7d96ca (commit)
       via  200f7004f921a68ec40b407dfe31f1db95e98fb9 (commit)
       via  ae6b514a98d89ce0bdfd47edd4508c42761cb049 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (867263b6513f899803847c6dacb6ea2e4d4df4b9)
            \
             N -- N -- N (e6c90436e715fd6da88782cbe3f72bd6c2443036)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e6c90436e715fd6da88782cbe3f72bd6c2443036
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 12:48:48 2015 -0400

    5745: Fix test order dependency (restore mocked methods), tidy up test data.

diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go
index 7a930d5..762abff 100644
--- a/services/keepstore/pull_worker_integration_test.go
+++ b/services/keepstore/pull_worker_integration_test.go
@@ -1,9 +1,12 @@
 package main
 
 import (
+	"bytes"
+	"errors"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"io"
 	"net/http"
 	"os"
 	"strings"
@@ -104,6 +107,7 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
 func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
 
 	// Override PutContent to mock PutBlock functionality
+	defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
 	PutContent = func(content []byte, locator string) (err error) {
 		if string(content) != testData.Content {
 			t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
@@ -111,16 +115,27 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
 		return
 	}
 
+	// Override GetContent to mock keepclient Get functionality
+	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
+		reader io.ReadCloser, contentLength int64, url string, err error) {
+		if testData.GetError != "" {
+			return nil, 0, "", errors.New(testData.GetError)
+		}
+		rdr := &ClosingBuffer{bytes.NewBufferString(testData.Content)}
+		return rdr, int64(len(testData.Content)), "", nil
+	}
+
 	keepClient.Arvados.ApiToken = GenerateRandomApiToken()
 	err := PullItemAndProcess(pullRequest, keepClient.Arvados.ApiToken, keepClient)
 
 	if len(testData.GetError) > 0 {
 		if (err == nil) || (!strings.Contains(err.Error(), testData.GetError)) {
-			t.Errorf("Got error %v", err)
+			t.Errorf("Got error %v, expected %v", err, testData.GetError)
 		}
 	} else {
 		if err != nil {
-			t.Errorf("Got error %v", err)
+			t.Errorf("Got error %v, expected nil", err)
 		}
 	}
 }
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 124c9b8..0833bc6 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -56,14 +56,13 @@ func RunTestPullWorker(c *C) {
 
 var first_pull_list = []byte(`[
 		{
-			"locator":"locator1",
+			"locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
 			"servers":[
 				"server_1",
 				"server_2"
 		 	]
-		},
-    {
-			"locator":"locator2",
+		},{
+			"locator":"37b51d194a7513e45b56f6524f2d51f2+3",
 			"servers":[
 				"server_3"
 		 	]
@@ -72,10 +71,10 @@ var first_pull_list = []byte(`[
 
 var second_pull_list = []byte(`[
 		{
-			"locator":"locator3",
+			"locator":"73feffa4b7f6bb68e44cf984c85f6e88+3",
 			"servers":[
 				"server_1",
-        "server_2"
+				"server_2"
 		 	]
 		}
 	]`)
@@ -244,6 +243,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	testPullLists[testData.name] = testData.response_body
 
 	// Override GetContent to mock keepclient Get functionality
+	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
 	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
 		reader io.ReadCloser, contentLength int64, url string, err error) {
 
@@ -262,6 +262,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	}
 
 	// Override PutContent to mock PutBlock functionality
+	defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
 	PutContent = func(content []byte, locator string) (err error) {
 		if testData.put_error {
 			err = errors.New("Error putting data")
@@ -274,8 +275,8 @@ func performTest(testData PullWorkerTestData, c *C) {
 	}
 
 	response := IssueRequest(&testData.req)
-	c.Assert(testData.response_code, Equals, response.Code)
-	c.Assert(testData.response_body, Equals, response.Body.String())
+	c.Assert(response.Code, Equals, testData.response_code)
+	c.Assert(response.Body.String(), Equals, testData.response_body)
 
 	expectWorkerChannelEmpty(c, pullq.NextItem)
 

commit cc303a0ef797c1c752b4fd86e48e2c84fc7d96ca
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed May 6 12:56:34 2015 -0400

    5745: Serialize writes and data reads, but allow concurrent requests
    to do read-only non-data operations (like finding existing blocks and
    checking free disk space) which are likely to be cached by the OS and
    therefore not involve any disk activity.
    
    Also:
    * Serialize Touch and Delete.
    * Make sure to close and delete tempfiles on write errors.
    * Update comments.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 6492045..75b56eb 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -40,35 +40,19 @@ func MakeRESTRouter() *mux.Router {
 
 	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
 	rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
-	// For IndexHandler we support:
-	//   /index           - returns all locators
-	//   /index/{prefix}  - returns all locators that begin with {prefix}
-	//      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
-	//      If {prefix} is the empty string, return an index of all locators
-	//      (so /index and /index/ behave identically)
-	//      A client may supply a full 32-digit locator string, in which
-	//      case the server will return an index with either zero or one
-	//      entries. This usage allows a client to check whether a block is
-	//      present, and its size and upload time, without retrieving the
-	//      entire block.
-	//
+	// List all blocks stored here. Privileged client only.
 	rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
-	rest.HandleFunc(
-		`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+	// List blocks stored here whose hash has the given prefix.
+	// Privileged client only.
+	rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+	// List volumes: path, device number, bytes used/avail.
 	rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
-	// The PullHandler and TrashHandler process "PUT /pull" and "PUT
-	// /trash" requests from Data Manager.  These requests instruct
-	// Keep to replicate or delete blocks; see
-	// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
-	// for more details.
-	//
-	// Each handler parses the JSON list of block management requests
-	// in the message body, and replaces any existing pull queue or
-	// trash queue with their contentes.
-	//
+	// Replace the current pull queue.
 	rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+	// Replace the current trash queue.
 	rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
 	// Any request which does not match any of these routes gets
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index c6cb00d..71e577f 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -129,7 +129,11 @@ func (vs *volumeSet) Set(value string) error {
 	if _, err := os.Stat(value); err != nil {
 		return err
 	}
-	*vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+	*vs = append(*vs, &UnixVolume{
+		root:      value,
+		serialize: flagSerializeIO,
+		readonly:  flagReadonly,
+	})
 	return nil
 }
 
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 8d23d11..bcf57c1 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -11,98 +11,17 @@ import (
 	"path/filepath"
 	"strconv"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 )
 
-// IORequests are encapsulated Get or Put requests.  They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
-	KeepGet IOMethod = iota
-	KeepPut
-)
-
-type IORequest struct {
-	method IOMethod
-	loc    string
-	data   []byte
-	reply  chan *IOResponse
-}
-
-type IOResponse struct {
-	data []byte
-	err  error
-}
-
-// A UnixVolume has the following properties:
-//
-//   root
-//       the path to the volume's root directory
-//   queue
-//       A channel of IORequests. If non-nil, all I/O requests for
-//       this volume should be queued on this channel; the result
-//       will be delivered on the IOResponse channel supplied in the
-//       request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-	root     string // path to this volume
-	queue    chan *IORequest
-	readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
-	for req := range v.queue {
-		var result IOResponse
-		switch req.method {
-		case KeepGet:
-			result.data, result.err = v.Read(req.loc)
-		case KeepPut:
-			result.err = v.Write(req.loc, req.data)
-		}
-		req.reply <- &result
-	}
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
-	v := &UnixVolume{
-		root:     root,
-		queue:    nil,
-		readonly: readonly,
-	}
-	if serialize {
-		v.queue = make(chan *IORequest)
-		go v.IOHandler()
-	}
-	return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-	if v.queue == nil {
-		return v.Read(loc)
-	}
-	reply := make(chan *IOResponse)
-	v.queue <- &IORequest{KeepGet, loc, nil, reply}
-	response := <-reply
-	return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
-	if v.readonly {
-		return MethodDisabledError
-	}
-	if v.queue == nil {
-		return v.Write(loc, block)
-	}
-	reply := make(chan *IOResponse)
-	v.queue <- &IORequest{KeepPut, loc, block, reply}
-	response := <-reply
-	return response.err
+	root      string // path to the volume's root directory
+	serialize bool
+	readonly  bool
+	mutex     sync.Mutex
 }
 
 func (v *UnixVolume) Touch(loc string) error {
@@ -115,6 +34,10 @@ func (v *UnixVolume) Touch(loc string) error {
 		return err
 	}
 	defer f.Close()
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	if e := lockfile(f); e != nil {
 		return e
 	}
@@ -133,28 +56,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	}
 }
 
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError.  It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
-	buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+	path := v.blockPath(loc)
+	if _, err := os.Stat(path); err != nil {
+		return nil, err
+	}
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
+	buf, err := ioutil.ReadFile(path)
 	return buf, err
 }
 
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+	if v.readonly {
+		return MethodDisabledError
+	}
 	if v.IsFull() {
 		return FullError
 	}
@@ -172,8 +99,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
 	}
 	bpath := v.blockPath(loc)
 
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	if _, err := tmpfile.Write(block); err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+		tmpfile.Close()
+		os.Remove(tmpfile.Name())
 		return err
 	}
 	if err := tmpfile.Close(); err != nil {
@@ -270,6 +203,10 @@ func (v *UnixVolume) Delete(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
+	if v.serialize {
+		v.mutex.Lock()
+		defer v.mutex.Unlock()
+	}
 	p := v.blockPath(loc)
 	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
 	if err != nil {
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 6b39f8f..1320d31 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -15,19 +15,20 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
 	if err != nil {
 		t.Fatal(err)
 	}
-	return MakeUnixVolume(d, serialize, readonly)
+	return &UnixVolume{
+		root:      d,
+		serialize: serialize,
+		readonly:  readonly,
+	}
 }
 
 func _teardown(v *UnixVolume) {
-	if v.queue != nil {
-		close(v.queue)
-	}
 	os.RemoveAll(v.root)
 }
 
-// store writes a Keep block directly into a UnixVolume, for testing
-// UnixVolume methods.
-//
+// _store writes a Keep block directly into a UnixVolume, bypassing
+// the overhead and safeguards of Put(). Useful for storing bogus data
+// and isolating unit tests from Put() behavior.
 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
 	blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
 	if err := os.MkdirAll(blockdir, 0755); err != nil {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list