[ARVADOS] created: 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e
git at public.curoverse.com
git at public.curoverse.com
Thu May 7 13:10:19 EDT 2015
at 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e (commit)
commit 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 7 12:38:12 2015 -0400
5748: Return the real decoder error for unparseable trash and pull requests.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5b65cd8..6823ad0 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -636,7 +636,7 @@ func TestPullHandler(t *testing.T) {
"Invalid pull request from the data manager",
RequestTester{"/pull", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
@@ -740,7 +740,7 @@ func TestTrashHandler(t *testing.T) {
"Invalid trash list from the data manager",
RequestTester{"/trash", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
@@ -798,7 +798,7 @@ func ExpectBody(
testname string,
expected_body string,
response *httptest.ResponseRecorder) {
- if response.Body.String() != expected_body {
+ if expected_body != "" && response.Body.String() != expected_body {
t.Errorf("%s: expected response body '%s', got %+v",
testname, expected_body, response)
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f134186..496a29b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -409,7 +409,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
var pr []PullRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
@@ -443,7 +443,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
var trash []TrashRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&trash); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
commit 80fe29b48bea0e10078831103280f7f3ae46eee2
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 7 10:40:44 2015 -0400
5748: Use a buffer pool instead of calling runtime.GC() during each GET.
diff --git a/services/crunchstat/.gitignore b/services/crunchstat/.gitignore
new file mode 100644
index 0000000..c26270a
--- /dev/null
+++ b/services/crunchstat/.gitignore
@@ -0,0 +1 @@
+crunchstat
diff --git a/services/keepproxy/.gitignore b/services/keepproxy/.gitignore
new file mode 100644
index 0000000..a4c8ad9
--- /dev/null
+++ b/services/keepproxy/.gitignore
@@ -0,0 +1 @@
+keepproxy
diff --git a/services/keepstore/.gitignore b/services/keepstore/.gitignore
new file mode 100644
index 0000000..c195c4a
--- /dev/null
+++ b/services/keepstore/.gitignore
@@ -0,0 +1 @@
+keepstore
diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go
new file mode 100644
index 0000000..a87f5ac
--- /dev/null
+++ b/services/keepstore/bufferpool.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "sync"
+)
+
+type bufferPool struct {
+ // limiter has a "true" placeholder for each in-use buffer.
+ limiter chan bool
+ // Pool has unused buffers.
+ sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+ p := bufferPool{}
+ p.New = func() interface{} {
+ return make([]byte, bufSize)
+ }
+ p.limiter = make(chan bool, count)
+ return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+ p.limiter <- true
+ buf := p.Pool.Get().([]byte)
+ if cap(buf) < size {
+ panic("assertion failed: bufferPool Get with size>max")
+ }
+ return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+ p.Pool.Put(buf)
+ <-p.limiter
+}
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
new file mode 100644
index 0000000..4161c4e
--- /dev/null
+++ b/services/keepstore/bufferpool_test.go
@@ -0,0 +1,77 @@
+package main
+
+import (
+ . "gopkg.in/check.v1"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+ TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(1)
+ bufs.Get(2)
+ bufs.Put(b1)
+ b3 := bufs.Get(3)
+ c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+ bufs := newBufferPool(3, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+ race := make(chan string, 2)
+ go func() {
+ bufs.Get(10)
+ time.Sleep(time.Millisecond)
+ race <- "Get"
+ }()
+ go func() {
+ time.Sleep(10*time.Millisecond)
+ bufs.Put(unused)
+ race <- "Put"
+ }()
+ c.Check(<-race, Equals, expectWin)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+ bufs := newBufferPool(2, 10)
+ bufs.Get(10)
+ last := bufs.Get(10)
+ // The buffer pool is allowed to throw away unused buffers
+ // (e.g., during sync.Pool's garbage collection hook, in the
+ // the current implementation). However, if unused buffers are
+ // getting thrown away and reallocated more than {arbitrary
+ // frequency threshold} during a busy loop, it's not acting
+ // much like a buffer pool.
+ allocs := 1000
+ reuses := 0
+ for i := 0; i < allocs; i++ {
+ bufs.Put(last)
+ next := bufs.Get(10)
+ copy(last, []byte("last"))
+ copy(next, []byte("next"))
+ if last[0] == 'n' {
+ reuses++
+ }
+ last = next
+ }
+ c.Check(reuses > allocs * 95/100, Equals, true)
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 75b56eb..f134186 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,13 +112,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
+ defer bufs.Put(block)
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
@@ -126,11 +120,9 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
@@ -159,17 +151,17 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
- return
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
+ bufs.Put(buf)
return
}
err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
if err != nil {
ke := err.(*KeepError)
http.Error(resp, ke.Error(), ke.HTTPCode)
@@ -178,7 +170,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// Success; add a size hint, sign the locator if possible, and
// return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
api_token := GetApiToken(req)
if PermissionSecret != nil && api_token != "" {
expiry := time.Now().Add(blob_signature_ttl)
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 71e577f..66c275c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
// actually deleting anything.
var never_delete = false
+var maxBuffers = 128
+var bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
// ==========
// Error types.
//
@@ -276,9 +279,19 @@ func main() {
"pid",
"",
"Path to write pid file")
+ flag.IntVar(
+ &maxBuffers,
+ "max-buffers",
+ maxBuffers,
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
flag.Parse()
+ if maxBuffers < 0 {
+ log.Fatal("-max-buffers must be greater than zero.")
+ }
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
if len(volumes) == 0 {
if volumes.Discover() == 0 {
log.Fatal("No volumes found.")
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 66e0810..2615019 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -65,7 +65,9 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- return block, nil
+ buf := bufs.Get(len(block))
+ copy(buf, block)
+ return buf, nil
}
return nil, os.ErrNotExist
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index bcf57c1..4325e8f 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -63,15 +63,33 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
// slice and whatever non-nil error was returned by Stat or ReadFile.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- if _, err := os.Stat(path); err != nil {
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ if stat.Size() < 0 {
+ return nil, os.ErrInvalid
+ } else if stat.Size() == 0 {
+ return []byte{}, nil
+ } else if stat.Size() > BLOCKSIZE {
+ return nil, TooLongError
+ }
+ f, err := os.Open(path)
+ if err != nil {
return nil, err
}
+ defer f.Close()
+ buf := bufs.Get(int(stat.Size()))
if v.serialize {
v.mutex.Lock()
defer v.mutex.Unlock()
}
- buf, err := ioutil.ReadFile(path)
- return buf, err
+ _, err = io.ReadFull(f, buf)
+ if err != nil {
+ bufs.Put(buf)
+ return nil, err
+ }
+ return buf, nil
}
// Put stores a block of data identified by the locator string
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list