[ARVADOS] updated: 10c7a157c03405a6986fdd8924d7f45a71803875

git at public.curoverse.com git at public.curoverse.com
Thu May 7 17:52:57 EDT 2015


Summary of changes:
 services/keepstore/bufferpool.go      | 13 +++++++--
 services/keepstore/bufferpool_test.go | 10 ++++++-
 services/keepstore/handlers.go        |  2 +-
 services/keepstore/keepstore.go       | 51 +++++++++++++++++++++--------------
 services/keepstore/volume.go          |  3 +++
 services/keepstore/volume_unix.go     |  2 +-
 6 files changed, 56 insertions(+), 25 deletions(-)

  discards  649ff5c9fe2976b06a17243a7b1a9e581a31e6dc (commit)
  discards  d38f970aa47a158ca840c981906d835e86b91d9f (commit)
       via  10c7a157c03405a6986fdd8924d7f45a71803875 (commit)
       via  8522f4fd87725ade1f6d9878f50b85530b383baf (commit)
       via  1003636e14221bfd43cf29c1c81e1e6bc31f7445 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (649ff5c9fe2976b06a17243a7b1a9e581a31e6dc)
            \
             N -- N -- N (10c7a157c03405a6986fdd8924d7f45a71803875)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 10c7a157c03405a6986fdd8924d7f45a71803875
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 17:55:12 2015 -0400

    5748: Check for errors when writing pidfile. Keep it locked while running.
    
    * Handle SIGINT (the same way as SIGTERM)
    
    * Improve startup/shutdown logs

diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index c0b8aec..175897c 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -202,7 +202,8 @@ func (vs *volumeSet) Discover() int {
 // permission arguments).
 
 func main() {
-	log.Println("Keep started: pid", os.Getpid())
+	log.Println("keepstore starting, pid", os.Getpid())
+	defer log.Println("keepstore exiting, pid", os.Getpid())
 
 	var (
 		data_manager_token_file string
@@ -278,7 +279,7 @@ func main() {
 		&pidfile,
 		"pid",
 		"",
-		"Path to write pid file")
+		"Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
 	flag.IntVar(
 		&maxBuffers,
 		"max-buffers",
@@ -292,6 +293,31 @@ func main() {
 	}
 	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
 
+	if pidfile != "" {
+		f, err := os.OpenFile(pidfile, os.O_RDWR | os.O_CREATE, 0777)
+		if err != nil {
+			log.Fatalf("open pidfile (%s): %s", pidfile, err)
+		}
+		err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX | syscall.LOCK_NB)
+		if err != nil {
+			log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+		}
+		err = f.Truncate(0)
+		if err != nil {
+			log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+		}
+		_, err = fmt.Fprint(f, os.Getpid())
+		if err != nil {
+			log.Fatalf("write pidfile (%s): %s", pidfile, err)
+		}
+		err = f.Sync()
+		if err != nil {
+			log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+		}
+		defer f.Close()
+		defer os.Remove(pidfile)
+	}
+
 	if len(volumes) == 0 {
 		if volumes.Discover() == 0 {
 			log.Fatal("No volumes found.")
@@ -374,24 +400,9 @@ func main() {
 		listener.Close()
 	}(term)
 	signal.Notify(term, syscall.SIGTERM)
+	signal.Notify(term, syscall.SIGINT)
 
-	if pidfile != "" {
-		f, err := os.Create(pidfile)
-		if err == nil {
-			fmt.Fprint(f, os.Getpid())
-			f.Close()
-		} else {
-			log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
-		}
-	}
-
-	// Start listening for requests.
+	log.Println("listening at", listen)
 	srv := &http.Server{Addr: listen}
 	srv.Serve(listener)
-
-	log.Println("shutting down")
-
-	if pidfile != "" {
-		os.Remove(pidfile)
-	}
 }

commit 8522f4fd87725ade1f6d9878f50b85530b383baf
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 12:38:12 2015 -0400

    5748: Return the real decoder error for unparseable trash and pull requests.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5b65cd8..6823ad0 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -636,7 +636,7 @@ func TestPullHandler(t *testing.T) {
 			"Invalid pull request from the data manager",
 			RequestTester{"/pull", data_manager_token, "PUT", bad_json},
 			http.StatusBadRequest,
-			"Bad Request\n",
+			"",
 		},
 	}
 
@@ -740,7 +740,7 @@ func TestTrashHandler(t *testing.T) {
 			"Invalid trash list from the data manager",
 			RequestTester{"/trash", data_manager_token, "PUT", bad_json},
 			http.StatusBadRequest,
-			"Bad Request\n",
+			"",
 		},
 	}
 
@@ -798,7 +798,7 @@ func ExpectBody(
 	testname string,
 	expected_body string,
 	response *httptest.ResponseRecorder) {
-	if response.Body.String() != expected_body {
+	if expected_body != "" && response.Body.String() != expected_body {
 		t.Errorf("%s: expected response body '%s', got %+v",
 			testname, expected_body, response)
 	}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index e0e3c61..cf5dfca 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -409,7 +409,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
 	var pr []PullRequest
 	r := json.NewDecoder(req.Body)
 	if err := r.Decode(&pr); err != nil {
-		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		http.Error(resp, err.Error(), BadRequestError.HTTPCode)
 		return
 	}
 
@@ -443,7 +443,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
 	var trash []TrashRequest
 	r := json.NewDecoder(req.Body)
 	if err := r.Decode(&trash); err != nil {
-		http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
+		http.Error(resp, err.Error(), BadRequestError.HTTPCode)
 		return
 	}
 

commit 1003636e14221bfd43cf29c1c81e1e6bc31f7445
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 7 10:40:44 2015 -0400

    5748: Use a buffer pool instead of calling runtime.GC() during each GET.

diff --git a/services/crunchstat/.gitignore b/services/crunchstat/.gitignore
new file mode 100644
index 0000000..c26270a
--- /dev/null
+++ b/services/crunchstat/.gitignore
@@ -0,0 +1 @@
+crunchstat
diff --git a/services/keepproxy/.gitignore b/services/keepproxy/.gitignore
new file mode 100644
index 0000000..a4c8ad9
--- /dev/null
+++ b/services/keepproxy/.gitignore
@@ -0,0 +1 @@
+keepproxy
diff --git a/services/keepstore/.gitignore b/services/keepstore/.gitignore
new file mode 100644
index 0000000..c195c4a
--- /dev/null
+++ b/services/keepstore/.gitignore
@@ -0,0 +1 @@
+keepstore
diff --git a/services/keepstore/bufferpool.go b/services/keepstore/bufferpool.go
new file mode 100644
index 0000000..373bfc7
--- /dev/null
+++ b/services/keepstore/bufferpool.go
@@ -0,0 +1,44 @@
+package main
+
+import (
+	"log"
+	"sync"
+	"time"
+)
+
+type bufferPool struct {
+	// limiter has a "true" placeholder for each in-use buffer.
+	limiter chan bool
+	// Pool has unused buffers.
+	sync.Pool
+}
+
+func newBufferPool(count int, bufSize int) *bufferPool {
+	p := bufferPool{}
+	p.New = func() interface{} {
+		return make([]byte, bufSize)
+	}
+	p.limiter = make(chan bool, count)
+	return &p
+}
+
+func (p *bufferPool) Get(size int) []byte {
+	select {
+	case p.limiter <- true:
+	default:
+		t0 := time.Now()
+		log.Printf("reached max buffers (%d), waiting", cap(p.limiter))
+		p.limiter <- true
+		log.Printf("waited %v for a buffer", time.Since(t0))
+	}
+	buf := p.Pool.Get().([]byte)
+	if cap(buf) < size {
+		log.Fatalf("bufferPool Get(size=%d) but max=%d", size, cap(buf))
+	}
+	return buf[:size]
+}
+
+func (p *bufferPool) Put(buf []byte) {
+	p.Pool.Put(buf)
+	<-p.limiter
+}
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
new file mode 100644
index 0000000..b2f63b1
--- /dev/null
+++ b/services/keepstore/bufferpool_test.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+	. "gopkg.in/check.v1"
+	"testing"
+	"time"
+)
+
+// Gocheck boilerplate
+func TestBufferPool(t *testing.T) {
+	TestingT(t)
+}
+var _ = Suite(&BufferPoolSuite{})
+type BufferPoolSuite struct {}
+
+// Initialize a default-sized buffer pool for the benefit of test
+// suites that don't run main().
+func init() {
+	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
+	bufs := newBufferPool(2, 10)
+	b1 := bufs.Get(1)
+	bufs.Get(2)
+	bufs.Put(b1)
+	b3 := bufs.Get(3)
+	c.Check(len(b3), Equals, 3)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolUnderLimit(c *C) {
+	bufs := newBufferPool(3, 10)
+	b1 := bufs.Get(10)
+	bufs.Get(10)
+	testBufferPoolRace(c, bufs, b1, "Get")
+}
+
+func (s *BufferPoolSuite) TestBufferPoolAtLimit(c *C) {
+	bufs := newBufferPool(2, 10)
+	b1 := bufs.Get(10)
+	bufs.Get(10)
+	testBufferPoolRace(c, bufs, b1, "Put")
+}
+
+func testBufferPoolRace(c *C, bufs *bufferPool, unused []byte, expectWin string) {
+	race := make(chan string)
+	go func() {
+		bufs.Get(10)
+		time.Sleep(time.Millisecond)
+		race <- "Get"
+	}()
+	go func() {
+		time.Sleep(10*time.Millisecond)
+		bufs.Put(unused)
+		race <- "Put"
+	}()
+	c.Check(<-race, Equals, expectWin)
+	c.Check(<-race, Not(Equals), expectWin)
+	close(race)
+}
+
+func (s *BufferPoolSuite) TestBufferPoolReuse(c *C) {
+	bufs := newBufferPool(2, 10)
+	bufs.Get(10)
+	last := bufs.Get(10)
+	// The buffer pool is allowed to throw away unused buffers
+	// (e.g., during sync.Pool's garbage collection hook, in the
+	// the current implementation). However, if unused buffers are
+	// getting thrown away and reallocated more than {arbitrary
+	// frequency threshold} during a busy loop, it's not acting
+	// much like a buffer pool.
+	allocs := 1000
+	reuses := 0
+	for i := 0; i < allocs; i++ {
+		bufs.Put(last)
+		next := bufs.Get(10)
+		copy(last, []byte("last"))
+		copy(next, []byte("next"))
+		if last[0] == 'n' {
+			reuses++
+		}
+		last = next
+	}
+	c.Check(reuses > allocs * 95/100, Equals, true)
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 75b56eb..e0e3c61 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -112,25 +112,17 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 
 	block, err := GetBlock(hash, false)
-
-	// Garbage collect after each GET. Fixes #2865.
-	// TODO(twp): review Keep memory usage and see if there's
-	// a better way to do this than blindly garbage collecting
-	// after every block.
-	defer runtime.GC()
-
 	if err != nil {
 		// This type assertion is safe because the only errors
 		// GetBlock can return are DiskHashError or NotFoundError.
 		http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
 		return
 	}
+	defer bufs.Put(block)
 
-	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
-
-	_, err = resp.Write(block)
-
-	return
+	resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
+	resp.Header().Set("Content-Type", "application/octet-stream")
+	resp.Write(block)
 }
 
 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
@@ -159,17 +151,17 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	buf := make([]byte, req.ContentLength)
-	nread, err := io.ReadFull(req.Body, buf)
+	buf := bufs.Get(int(req.ContentLength))
+	_, err := io.ReadFull(req.Body, buf)
 	if err != nil {
 		http.Error(resp, err.Error(), 500)
-		return
-	} else if int64(nread) < req.ContentLength {
-		http.Error(resp, "request truncated", 500)
+		bufs.Put(buf)
 		return
 	}
 
 	err = PutBlock(buf, hash)
+	bufs.Put(buf)
+
 	if err != nil {
 		ke := err.(*KeepError)
 		http.Error(resp, ke.Error(), ke.HTTPCode)
@@ -178,7 +170,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
 	// Success; add a size hint, sign the locator if possible, and
 	// return it to the client.
-	return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
+	return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
 	api_token := GetApiToken(req)
 	if PermissionSecret != nil && api_token != "" {
 		expiry := time.Now().Add(blob_signature_ttl)
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 71e577f..c0b8aec 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
 // actually deleting anything.
 var never_delete = false
 
+var maxBuffers = 128
+var bufs *bufferPool
+
 // ==========
 // Error types.
 //
@@ -276,9 +279,19 @@ func main() {
 		"pid",
 		"",
 		"Path to write pid file")
+	flag.IntVar(
+		&maxBuffers,
+		"max-buffers",
+		maxBuffers,
+		fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
 
 	flag.Parse()
 
+	if maxBuffers < 0 {
+		log.Fatal("-max-buffers must be greater than zero.")
+	}
+	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+
 	if len(volumes) == 0 {
 		if volumes.Discover() == 0 {
 			log.Fatal("No volumes found.")
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 1b52949..64fea34 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -11,6 +11,9 @@ import (
 )
 
 type Volume interface {
+	// Get a block. IFF the returned error is nil, the caller must
+	// put the returned slice back into the buffer pool when it's
+	// finished with it.
 	Get(loc string) ([]byte, error)
 	Put(loc string, block []byte) error
 	Touch(loc string) error
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 66e0810..2615019 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -65,7 +65,9 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
 	if v.Bad {
 		return nil, errors.New("Bad volume")
 	} else if block, ok := v.Store[loc]; ok {
-		return block, nil
+		buf := bufs.Get(len(block))
+		copy(buf, block)
+		return buf, nil
 	}
 	return nil, os.ErrNotExist
 }
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index bcf57c1..61a98b5 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -63,15 +63,33 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 // slice and whatever non-nil error was returned by Stat or ReadFile.
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
 	path := v.blockPath(loc)
-	if _, err := os.Stat(path); err != nil {
+	stat, err := os.Stat(path)
+	if err != nil {
+		return nil, err
+	}
+	if stat.Size() < 0 {
+		return nil, os.ErrInvalid
+	} else if stat.Size() == 0 {
+		return bufs.Get(0), nil
+	} else if stat.Size() > BLOCKSIZE {
+		return nil, TooLongError
+	}
+	f, err := os.Open(path)
+	if err != nil {
 		return nil, err
 	}
+	defer f.Close()
+	buf := bufs.Get(int(stat.Size()))
 	if v.serialize {
 		v.mutex.Lock()
 		defer v.mutex.Unlock()
 	}
-	buf, err := ioutil.ReadFile(path)
-	return buf, err
+	_, err = io.ReadFull(f, buf)
+	if err != nil {
+		bufs.Put(buf)
+		return nil, err
+	}
+	return buf, nil
 }
 
 // Put stores a block of data identified by the locator string

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list