[ARVADOS] updated: 649ff5c9fe2976b06a17243a7b1a9e581a31e6dc
git at public.curoverse.com
git at public.curoverse.com
Thu May 7 15:37:23 EDT 2015
Summary of changes:
discards 978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e (commit)
discards 80fe29b48bea0e10078831103280f7f3ae46eee2 (commit)
via 649ff5c9fe2976b06a17243a7b1a9e581a31e6dc (commit)
via d38f970aa47a158ca840c981906d835e86b91d9f (commit)
via fde21d34d011af2123668983c559632221390fd4 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (978afd40f92951aa2952ef4ad9ccbc07d2bf4a3e)
\
N -- N -- N (649ff5c9fe2976b06a17243a7b1a9e581a31e6dc)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 649ff5c9fe2976b06a17243a7b1a9e581a31e6dc
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 7 12:38:12 2015 -0400
5748: Return the real decoder error for unparseable trash and pull requests.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5b65cd8..6823ad0 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -636,7 +636,7 @@ func TestPullHandler(t *testing.T) {
"Invalid pull request from the data manager",
RequestTester{"/pull", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
@@ -740,7 +740,7 @@ func TestTrashHandler(t *testing.T) {
"Invalid trash list from the data manager",
RequestTester{"/trash", data_manager_token, "PUT", bad_json},
http.StatusBadRequest,
- "Bad Request\n",
+ "",
},
}
@@ -798,7 +798,7 @@ func ExpectBody(
testname string,
expected_body string,
response *httptest.ResponseRecorder) {
- if response.Body.String() != expected_body {
+ if expected_body != "" && response.Body.String() != expected_body {
t.Errorf("%s: expected response body '%s', got %+v",
testname, expected_body, response)
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f134186..496a29b 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -409,7 +409,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
var pr []PullRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&pr); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
@@ -443,7 +443,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
var trash []TrashRequest
r := json.NewDecoder(req.Body)
if err := r.Decode(&trash); err != nil {
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+ http.Error(resp, err.Error(), BadRequestError.HTTPCode)
return
}
commit d38f970aa47a158ca840c981906d835e86b91d9f
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 7 10:40:44 2015 -0400
5748: Use a buffer pool instead of calling runtime.GC() during each GET.
diff --git a/services/crunchstat/.gitignore b/services/crunchstat/.gitignore
new file mode 100644
index 0000000..c26270a
--- /dev/null
+++ b/services/crunchstat/.gitignore
@@ -0,0 +1 @@
+crunchstat
diff --git a/services/keepproxy/.gitignore b/services/keepproxy/.gitignore
new file mode 100644
index 0000000..a4c8ad9
--- /dev/null
+++ b/services/keepproxy/.gitignore
@@ -0,0 +1 @@
+keepproxy
diff --git a/services/keepstore/.gitignore b/services/keepstore/.gitignore
new file mode 100644
index 0000000..c195c4a
--- /dev/null
+++ b/services/keepstore/.gitignore
@@ -0,0 +1 @@
+keepstore
diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go
new file mode 100644
index 0000000..a87f5ac
--- /dev/null
+++ b/services/keepstore/bufferpool.go
@@ -0,0 +1,35 @@
+package main
+
+import (
+ "sync"
+)
+
+type bufferPool struct {
+ // limiter has a "true" placeholder for each in-use buffer.
+ limiter chan bool
+ // Pool has unused buffers.
+ sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+ p := bufferPool{}
+ p.New = func() interface{} {
+ return make([]byte, bufSize)
+ }
+ p.limiter = make(chan bool, count)
+ return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+ p.limiter <- true
+ buf := p.Pool.Get().([]byte)
+ if cap(buf) < size {
+ panic("assertion failed: bufferPool Get with size>max")
+ }
+ return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+ p.Pool.Put(buf)
+ <-p.limiter
+}
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
new file mode 100644
index 0000000..4161c4e
--- /dev/null
+++ b/services/keepstore/bufferpool_test.go
@@ -0,0 +1,77 @@
+package main
+
+import (
+ . "gopkg.in/check.v1"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+ TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(1)
+ bufs.Get(2)
+ bufs.Put(b1)
+ b3 := bufs.Get(3)
+ c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+ bufs := newBufferPool(3, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+ bufs := newBufferPool(2, 10)
+ b1 := bufs.Get(10)
+ bufs.Get(10)
+ testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+ race := make(chan string, 2)
+ go func() {
+ bufs.Get(10)
+ time.Sleep(time.Millisecond)
+ race <- "Get"
+ }()
+ go func() {
+ time.Sleep(10*time.Millisecond)
+ bufs.Put(unused)
+ race <- "Put"
+ }()
+ c.Check(<-race, Equals, expectWin)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+ bufs := newBufferPool(2, 10)
+ bufs.Get(10)
+ last := bufs.Get(10)
+ // The buffer pool is allowed to throw away unused buffers
+ // (e.g., during sync.Pool's garbage collection hook, in the
+ // the current implementation). However, if unused buffers are
+ // getting thrown away and reallocated more than {arbitrary
+ // frequency threshold} during a busy loop, it's not acting
+ // much like a buffer pool.
+ allocs := 1000
+ reuses := 0
+ for i := 0; i < allocs; i++ {
+ bufs.Put(last)
+ next := bufs.Get(10)
+ copy(last, []byte("last"))
+ copy(next, []byte("next"))
+ if last[0] == 'n' {
+ reuses++
+ }
+ last = next
+ }
+ c.Check(reuses > allocs * 95/100, Equals, true)
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 75b56eb..f134186 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,13 +112,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
}
block, err := GetBlock(hash, false)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
+ defer bufs.Put(block)
if err != nil {
// This type assertion is safe because the only errors
// GetBlock can return are DiskHashError or NotFoundError.
@@ -126,11 +120,9 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
-
- return
+ resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ resp.Write(block)
}
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
@@ -159,17 +151,17 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
+ buf := bufs.Get(int(req.ContentLength))
+ _, err := io.ReadFull(req.Body, buf)
if err != nil {
http.Error(resp, err.Error(), 500)
- return
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
+ bufs.Put(buf)
return
}
err = PutBlock(buf, hash)
+ bufs.Put(buf)
+
if err != nil {
ke := err.(*KeepError)
http.Error(resp, ke.Error(), ke.HTTPCode)
@@ -178,7 +170,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// Success; add a size hint, sign the locator if possible, and
// return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+ return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
api_token := GetApiToken(req)
if PermissionSecret != nil && api_token != "" {
expiry := time.Now().Add(blob_signature_ttl)
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 71e577f..66c275c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
// actually deleting anything.
var never_delete = false
+var maxBuffers = 128
+var bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
// ==========
// Error types.
//
@@ -276,9 +279,19 @@ func main() {
"pid",
"",
"Path to write pid file")
+ flag.IntVar(
+ &maxBuffers,
+ "max-buffers",
+ maxBuffers,
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
flag.Parse()
+ if maxBuffers < 0 {
+ log.Fatal("-max-buffers must be greater than zero.")
+ }
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
if len(volumes) == 0 {
if volumes.Discover() == 0 {
log.Fatal("No volumes found.")
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 66e0810..2615019 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -65,7 +65,9 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
- return block, nil
+ buf := bufs.Get(len(block))
+ copy(buf, block)
+ return buf, nil
}
return nil, os.ErrNotExist
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index bcf57c1..4325e8f 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -63,15 +63,33 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
// slice and whatever non-nil error was returned by Stat or ReadFile.
func (v *UnixVolume) Get(loc string) ([]byte, error) {
path := v.blockPath(loc)
- if _, err := os.Stat(path); err != nil {
+ stat, err := os.Stat(path)
+ if err != nil {
+ return nil, err
+ }
+ if stat.Size() < 0 {
+ return nil, os.ErrInvalid
+ } else if stat.Size() == 0 {
+ return []byte{}, nil
+ } else if stat.Size() > BLOCKSIZE {
+ return nil, TooLongError
+ }
+ f, err := os.Open(path)
+ if err != nil {
return nil, err
}
+ defer f.Close()
+ buf := bufs.Get(int(stat.Size()))
if v.serialize {
v.mutex.Lock()
defer v.mutex.Unlock()
}
- buf, err := ioutil.ReadFile(path)
- return buf, err
+ _, err = io.ReadFull(f, buf)
+ if err != nil {
+ bufs.Put(buf)
+ return nil, err
+ }
+ return buf, nil
}
// Put stores a block of data identified by the locator string
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list