[ARVADOS] created: 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e

git at public.curoverse.com git at public.curoverse.com
Thu May 7 13:10:19 EDT 2015


        at  978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e (commit)


commit 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 12:38:12 2015 -0400

    5748: Return the real decoder error for unparseable trash and pull requests.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5b65cd8..6823ad0 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -636,7 +636,7 @@ func TestPullHandler(t *testing.T) {
 			"Invalid pull request from the data manager",
 			RequestTester{"/pull", data_manager_token, "PUT", bad_json},
 			http.StatusBadRequest,
-			"Bad Request\n",
+			"",
 		},
 	}
 
@@ -740,7 +740,7 @@ func TestTrashHandler(t *testing.T) {
 			"Invalid trash list from the data manager",
 			RequestTester{"/trash", data_manager_token, "PUT", bad_json},
 			http.StatusBadRequest,
-			"Bad Request\n",
+			"",
 		},
 	}
 
@@ -798,7 +798,7 @@ func ExpectBody(
 	testname string,
 	expected_body string,
 	response *httptest.ResponseRecorder) {
-	if response.Body.String() != expected_body {
+	if expected_body != "" && response.Body.String() != expected_body {
 		t.Errorf("%s: expected response body '%s', got %+v",
 			testname, expected_body, response)
 	}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f134186..496a29b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -409,7 +409,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
 	var pr []PullRequest
 	r := json.NewDecoder(req.Body)
 	if err := r.Decode(&pr); err != nil {
-		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		http.Error(resp, err.Error(), BadRequestError.HTTPCode)
 		return
 	}
 
@@ -443,7 +443,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 	var trash []TrashRequest
 	r := json.NewDecoder(req.Body)
 	if err := r.Decode(&trash); err != nil {
-		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		http.Error(resp, err.Error(), BadRequestError.HTTPCode)
 		return
 	}
 

commit 80fe29b48bea0e10078831103280f7f3ae46eee2
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 10:40:44 2015 -0400

    5748: Use a buffer pool instead of calling runtime.GC() during each GET.

diff --git a/services/crunchstat/.gitignore b/services/crunchstat/.gitignore
new file mode 100644
index 0000000..c26270a
--- /dev/null
+++ b/services/crunchstat/.gitignore
@@ -0,0 +1 @@
+crunchstat
diff --git a/services/keepproxy/.gitignore b/services/keepproxy/.gitignore
new file mode 100644
index 0000000..a4c8ad9
--- /dev/null
+++ b/services/keepproxy/.gitignore
@@ -0,0 +1 @@
+keepproxy
diff --git a/services/keepstore/.gitignore b/services/keepstore/.gitignore
new file mode 100644
index 0000000..c195c4a
--- /dev/null
+++ b/services/keepstore/.gitignore
@@ -0,0 +1 @@
+keepstore
diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go
new file mode 100644
index 0000000..a87f5ac
--- /dev/null
+++ b/services/keepstore/bufferpool.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+	"sync"
+)
+
+type bufferPool struct {
+	// limiter has a "true" placeholder for each in-use buffer.
+	limiter chan bool
+	// Pool has unused buffers.
+	sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+	p := bufferPool{}
+	p.New = func() interface{} {
+		return make([]byte, bufSize)
+	}
+	p.limiter = make(chan bool, count)
+	return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+	p.limiter <- true
+	buf := p.Pool.Get().([]byte)
+	if cap(buf) < size {
+		panic("assertion failed: bufferPool Get with size>max")
+	}
+	return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+	p.Pool.Put(buf)
+	<-p.limiter
+}
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
new file mode 100644
index 0000000..4161c4e
--- /dev/null
+++ b/services/keepstore/bufferpool_test.go
@@ -0,0 +1,77 @@
+package main
+
+import (
+	. "gopkg.in/check.v1"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+	TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+	bufs := newBufferPool(2, 10)
+	b1 := bufs.Get(1)
+	bufs.Get(2)
+	bufs.Put(b1)
+	b3 := bufs.Get(3)
+	c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+	bufs := newBufferPool(3, 10)
+	b1 := bufs.Get(10)
+	bufs.Get(10)
+	testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+	bufs := newBufferPool(2, 10)
+	b1 := bufs.Get(10)
+	bufs.Get(10)
+	testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+	race := make(chan string, 2)
+	go func() {
+		bufs.Get(10)
+		time.Sleep(time.Millisecond)
+		race <- "Get"
+	}()
+	go func() {
+		time.Sleep(10*time.Millisecond)
+		bufs.Put(unused)
+		race <- "Put"
+	}()
+	c.Check(<-race, Equals, expectWin)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+	bufs := newBufferPool(2, 10)
+	bufs.Get(10)
+	last := bufs.Get(10)
+	// The buffer pool is allowed to throw away unused buffers
+	// (e.g., during sync.Pool's garbage collection hook, in the
+	// the current implementation). However, if unused buffers are
+	// getting thrown away and reallocated more than {arbitrary
+	// frequency threshold} during a busy loop, it's not acting
+	// much like a buffer pool.
+	allocs := 1000
+	reuses := 0
+	for i := 0; i < allocs; i++ {
+		bufs.Put(last)
+		next := bufs.Get(10)
+		copy(last, []byte("last"))
+		copy(next, []byte("next"))
+		if last[0] == 'n' {
+			reuses++
+		}
+		last = next
+	}
+	c.Check(reuses > allocs * 95/100, Equals, true)
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 75b56eb..f134186 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,13 +112,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 
 	block, err := GetBlock(hash, false)
-
-	// Garbage collect after each GET. Fixes #2865.
-	// TODO(twp): review Keep memory usage and see if there's
-	// a better way to do this than blindly garbage collecting
-	// after every block.
-	defer runtime.GC()
-
+	defer bufs.Put(block)
 	if err != nil {
 		// This type assertion is safe because the only errors
 		// GetBlock can return are DiskHashError or NotFoundError.
@@ -126,11 +120,9 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
-	_, err = resp.Write(block)
-
-	return
+	resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+	resp.Header().Set("Content-Type", "application/octet-stream")
+	resp.Write(block)
 }
 
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
@@ -159,17 +151,17 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	buf := make([]byte, req.ContentLength)
-	nread, err := io.ReadFull(req.Body, buf)
+	buf := bufs.Get(int(req.ContentLength))
+	_, err := io.ReadFull(req.Body, buf)
 	if err != nil {
 		http.Error(resp, err.Error(), 500)
-		return
-	} else if int64(nread) < req.ContentLength {
-		http.Error(resp, "request truncated", 500)
+		bufs.Put(buf)
 		return
 	}
 
 	err = PutBlock(buf, hash)
+	bufs.Put(buf)
+
 	if err != nil {
 		ke := err.(*KeepError)
 		http.Error(resp, ke.Error(), ke.HTTPCode)
@@ -178,7 +170,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
 	// Success; add a size hint, sign the locator if possible, and
 	// return it to the client.
-	return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+	return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
 	api_token := GetApiToken(req)
 	if PermissionSecret != nil && api_token != "" {
 		expiry := time.Now().Add(blob_signature_ttl)
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 71e577f..66c275c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
 // actually deleting anything.
 var never_delete = false
 
+var maxBuffers = 128
+var bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
 // ==========
 // Error types.
 //
@@ -276,9 +279,19 @@ func main() {
 		"pid",
 		"",
 		"Path to write pid file")
+	flag.IntVar(
+		&maxBuffers,
+		"max-buffers",
+		maxBuffers,
+		fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
 
 	flag.Parse()
 
+	if maxBuffers < 0 {
+		log.Fatal("-max-buffers must be greater than zero.")
+	}
+	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
 	if len(volumes) == 0 {
 		if volumes.Discover() == 0 {
 			log.Fatal("No volumes found.")
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 66e0810..2615019 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -65,7 +65,9 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
 	if v.Bad {
 		return nil, errors.New("Bad volume")
 	} else if block, ok := v.Store[loc]; ok {
-		return block, nil
+		buf := bufs.Get(len(block))
+		copy(buf, block)
+		return buf, nil
 	}
 	return nil, os.ErrNotExist
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index bcf57c1..4325e8f 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -63,15 +63,33 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 // slice and whatever non-nil error was returned by Stat or ReadFile.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	path := v.blockPath(loc)
-	if _, err := os.Stat(path); err != nil {
+	stat, err := os.Stat(path)
+	if err != nil {
+		return nil, err
+	}
+	if stat.Size() < 0 {
+		return nil, os.ErrInvalid
+	} else if stat.Size() == 0 {
+		return []byte{}, nil
+	} else if stat.Size() > BLOCKSIZE {
+		return nil, TooLongError
+	}
+	f, err := os.Open(path)
+	if err != nil {
 		return nil, err
 	}
+	defer f.Close()
+	buf := bufs.Get(int(stat.Size()))
 	if v.serialize {
 		v.mutex.Lock()
 		defer v.mutex.Unlock()
 	}
-	buf, err := ioutil.ReadFile(path)
-	return buf, err
+	_, err = io.ReadFull(f, buf)
+	if err != nil {
+		bufs.Put(buf)
+		return nil, err
+	}
+	return buf, nil
 }
 
 // Put stores a block of data identified by the locator string

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list