[ARVADOS] updated: 20fec88ce889c3aa0eb020361f4001c362438283
git at public.curoverse.com
git at public.curoverse.com
Tue Aug 26 21:19:54 EDT 2014
Summary of changes:
services/keepstore/handler_test.go | 8 +--
services/keepstore/handlers.go | 22 ++++-----
services/keepstore/keepstore.go | 12 ++---
services/keepstore/pull_list/pull_list.go | 81 +++++++++++++++++++++++++++++++
services/keepstore/replicator/replica.go | 80 ------------------------------
5 files changed, 102 insertions(+), 101 deletions(-)
create mode 100644 services/keepstore/pull_list/pull_list.go
delete mode 100644 services/keepstore/replicator/replica.go
via 20fec88ce889c3aa0eb020361f4001c362438283 (commit)
from 6c29e76287d9df651a0bc54d7c0a7a4611862ada (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 20fec88ce889c3aa0eb020361f4001c362438283
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Aug 26 21:18:51 2014 -0400
3414: rename replicator to pull_list
Renamed replicator.Replicator to pull_list.Manager.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 9d60312..34e711f 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -586,12 +586,12 @@ func TestDeleteHandler(t *testing.T) {
// 4. Invalid pull request from the data manager
// (expected result: 400 Bad Request)
//
-// Test that in the end, the replicator received a good pull list with
+// Test that in the end, the pull manager received a good pull list with
// the expected number of requests.
//
// TODO(twp): test concurrency: launch 100 goroutines to update the
// pull list simultaneously. Make sure that none of them return 400
-// Bad Request and that replica.GetList() returns a valid list.
+// Bad Request and that pullmgr.GetList() returns a valid list.
//
func TestPullHandler(t *testing.T) {
defer teardown()
@@ -661,9 +661,9 @@ func TestPullHandler(t *testing.T) {
ExpectBody(t, tst.name, tst.response_body, response)
}
- // The Keep replicator should have received one good list with 3
+ // The Keep pull manager should have received one good list with 3
// requests on it.
- var saved_pull_list = replica.GetList()
+ var saved_pull_list = pullmgr.GetList()
if len(saved_pull_list) != 3 {
t.Errorf(
"saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a488e37..2528fd6 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -13,7 +13,7 @@ import (
"crypto/md5"
"encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/replicator"
+ "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"github.com/gorilla/mux"
"io"
"log"
@@ -61,7 +61,7 @@ func MakeRESTRouter() *mux.Router {
// The PullHandler processes "PUT /pull" commands from Data Manager.
// It parses the JSON list of pull requests in the request body, and
- // delivers them to the PullBlocks goroutine for replication.
+ // delivers them to the pull list manager for replication.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
// Any request which does not match any of these routes gets
@@ -446,26 +446,26 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
}
// Parse the request body.
- var pull_list []replicator.PullRequest
+ var plist []pull_list.PullRequest
r := json.NewDecoder(req.Body)
- if err := r.Decode(&pull_list); err != nil {
+ if err := r.Decode(&plist); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
}
// We have a properly formatted pull list sent from the data
- // manager. Report success and send the list to the keep
- // replicator for further handling.
- log.Printf("%s %s: received %s\n", req.Method, req.URL, pull_list)
+ // manager. Report success and send the list to the pull list
+ // manager for further handling.
+ log.Printf("%s %s: received %s\n", req.Method, req.URL, plist)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
- fmt.Sprintf("Received %d pull requests\n", len(pull_list))))
+ fmt.Sprintf("Received %d pull requests\n", len(plist))))
- if replica == nil {
- replica = replicator.New()
+ if pullmgr == nil {
+ pullmgr = pull_list.NewManager()
}
- replica.SetList(pull_list)
+ pullmgr.SetList(plist)
}
// ==============================
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index f1e7aa9..79ce968 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,7 +4,7 @@ import (
"bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/replicator"
+ "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"io/ioutil"
"log"
"net"
@@ -92,11 +92,11 @@ func (e *KeepError) Error() string {
// Initialized by the --volumes flag (or by FindKeepVolumes).
var KeepVM VolumeManager
-// The KeepReplica is responsible for pulling blocks from other
-// Keep servers to ensure replication. When Keep receives a new
-// "pull list" from Data Manager, KeepReplica is responsible for
-// fetching blocks on the list.
-var replica *replicator.Replicator
+// The pull list manager is responsible for pulling blocks from other
+// Keep servers to ensure replication. When Keep receives a new "pull
+// list" from Data Manager, the pull manager is responsible for fetching
+// blocks on the list.
+var pullmgr *pull_list.Manager
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
diff --git a/services/keepstore/pull_list/pull_list.go b/services/keepstore/pull_list/pull_list.go
new file mode 100644
index 0000000..71d7821
--- /dev/null
+++ b/services/keepstore/pull_list/pull_list.go
@@ -0,0 +1,81 @@
+package pull_list
+
+/* The pull_list package manages a list of pull requests sent
+ by Data Manager.
+
+ The interface is:
+
+ pull_list.NewManager() creates and returns a pull_list.Manager. A
+ listener runs in a goroutine, waiting for new requests on its input
+ channels.
+
+ pull_list.SetList() assigns a new pull list to the manager. Any
+ existing list is discarded.
+
+ pull_list.GetList() reports the manager's current pull list.
+
+ pull_list.Close() shuts down the pull list manager.
+*/
+
+type PullRequest struct {
+ Locator string
+ Servers []string
+}
+
+type Manager struct {
+ setlist chan []PullRequest // input channel for setting new lists
+ getlist chan []PullRequest // output channel for getting existing list
+}
+
+// NewManager returns a new Manager object. It launches a goroutine that
+// waits for pull requests.
+//
+func NewManager() *Manager {
+ r := Manager{
+ make(chan []PullRequest),
+ make(chan []PullRequest),
+ }
+ go r.listen()
+ return &r
+}
+
+// SetList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (r *Manager) SetList(pr []PullRequest) {
+ r.setlist <- pr
+}
+
+// GetList reports the contents of the current pull list.
+func (r *Manager) GetList() []PullRequest {
+ return <-r.getlist
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (r *Manager) Close() {
+ close(r.setlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (r *Manager) listen() {
+ var current []PullRequest
+ for {
+ select {
+ case p, ok := <-r.setlist:
+ if ok {
+ current = p
+ } else {
+ // The input channel is closed; time to shut down
+ close(r.getlist)
+ return
+ }
+ case r.getlist <- current:
+ // no-op
+ }
+ }
+}
diff --git a/services/keepstore/replicator/replica.go b/services/keepstore/replicator/replica.go
deleted file mode 100644
index e314b86..0000000
--- a/services/keepstore/replicator/replica.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package replicator
-
-/* The Keep replicator package fulfills replication pull requests sent
- by Data Manager.
-
- The interface is:
-
- replicator.New() launches a replication goroutine and returns the
- new Replicator object.
-
- replicator.SetList() assigns a new pull list to the goroutine. Any
- existing list is discarded.
-
- replicator.GetList() reports the goroutine's current pull list.
-
- replicator.Close() shuts down the replicator.
-*/
-
-type PullRequest struct {
- Locator string
- Servers []string
-}
-
-type Replicator struct {
- queue chan []PullRequest
- dump chan []PullRequest
-}
-
-// New returns a new Replicator object. It launches a goroutine that
-// waits for pull requests.
-//
-func New() *Replicator {
- r := Replicator{
- make(chan []PullRequest),
- make(chan []PullRequest),
- }
- go r.listen()
- return &r
-}
-
-// SetList sends a new list of pull requests to the replicator goroutine.
-// The replicator will discard any outstanding pull list and begin
-// working on the new list.
-//
-func (r *Replicator) SetList(pr []PullRequest) {
- r.queue <- pr
-}
-
-// GetList reports the contents of the current pull list.
-func (r *Replicator) GetList() []PullRequest {
- return <-r.dump
-}
-
-// Close shuts down the replicator and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
-//
-func (r *Replicator) Close() {
- close(r.queue)
-}
-
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-func (r *Replicator) listen() {
- var current []PullRequest
- for {
- select {
- case p, ok := <-r.queue:
- if ok {
- current = p
- } else {
- // The input channel is closed; time to shut down
- close(r.dump)
- return
- }
- case r.dump <- current:
- // no-op
- }
- }
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list