[ARVADOS] updated: 577bd728e410d6a9b57b11578264b61220a50d76
git at public.curoverse.com
git at public.curoverse.com
Thu Sep 11 14:58:04 EDT 2014
Summary of changes:
services/keepstore/handler_test.go | 13 +++--
services/keepstore/handlers.go | 24 ++++++---
services/keepstore/keepstore.go | 3 +-
services/keepstore/pull_list/pull_list.go | 81 -------------------------------
4 files changed, 26 insertions(+), 95 deletions(-)
delete mode 100644 services/keepstore/pull_list/pull_list.go
via 577bd728e410d6a9b57b11578264b61220a50d76 (commit)
from 0bf498cd8a240f70517d44e3dab0fa000468531d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 577bd728e410d6a9b57b11578264b61220a50d76
Author: Tim Pierce <twp at curoverse.com>
Date: Thu Sep 11 14:44:48 2014 -0400
3705: replace pullmgr with a BlockWorkList
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 34e711f..e246b5e 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -663,11 +663,14 @@ func TestPullHandler(t *testing.T) {
// The Keep pull manager should have received one good list with 3
// requests on it.
- var saved_pull_list = pullmgr.GetList()
- if len(saved_pull_list) != 3 {
- t.Errorf(
- "saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
- len(saved_pull_list), saved_pull_list)
+ var output_list = make([]PullRequest, 3)
+ for i := 0; i < 3; i++ {
+ item := <-pullmgr.NextItem
+ if pr, ok := item.Value.(PullRequest); ok {
+ output_list[i] = pr
+ } else {
+ t.Errorf("item %v could not be parsed as a PullRequest", item)
+ }
}
}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 84fa6a6..8095207 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -10,10 +10,10 @@ package main
import (
"bufio"
"bytes"
+ "container/list"
"crypto/md5"
"encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"github.com/gorilla/mux"
"io"
"log"
@@ -436,6 +436,11 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
If the JSON unmarshalling fails, return 400 Bad Request.
*/
+type PullRequest struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+}
+
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
api_token := GetApiToken(req)
@@ -446,9 +451,9 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
}
// Parse the request body.
- var plist []pull_list.PullRequest
+ var pr []PullRequest
r := json.NewDecoder(req.Body)
- if err := r.Decode(&plist); err != nil {
+ if err := r.Decode(&pr); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
@@ -457,15 +462,20 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
// We have a properly formatted pull list sent from the data
// manager. Report success and send the list to the pull list
// manager for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, plist)
+ log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
- fmt.Sprintf("Received %d pull requests\n", len(plist))))
+ fmt.Sprintf("Received %d pull requests\n", len(pr))))
+
+ plist := list.New()
+ for _, p := range pr {
+ plist.PushBack(p)
+ }
if pullmgr == nil {
- pullmgr = pull_list.NewManager()
+ pullmgr = NewBlockWorkList()
}
- pullmgr.SetList(plist)
+ pullmgr.ReplaceList(plist)
}
// ==============================
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index e5bd3bf..06054f5 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -4,7 +4,6 @@ import (
"bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"io/ioutil"
"log"
"net"
@@ -97,7 +96,7 @@ var KeepVM VolumeManager
// keepstore servers in order to increase data replication) with
// atomic update methods that are safe to use from multiple
// goroutines.
-var pullmgr *pull_list.Manager
+var pullmgr *BlockWorkList
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
diff --git a/services/keepstore/pull_list/pull_list.go b/services/keepstore/pull_list/pull_list.go
deleted file mode 100644
index 71d7821..0000000
--- a/services/keepstore/pull_list/pull_list.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package pull_list
-
-/* The pull_list package manages a list of pull requests sent
- by Data Manager.
-
- The interface is:
-
- pull_list.NewManager() creates and returns a pull_list.Manager. A
- listener runs in a goroutine, waiting for new requests on its input
- channels.
-
- pull_list.SetList() assigns a new pull list to the manager. Any
- existing list is discarded.
-
- pull_list.GetList() reports the manager's current pull list.
-
- pull_list.Close() shuts down the pull list manager.
-*/
-
-type PullRequest struct {
- Locator string
- Servers []string
-}
-
-type Manager struct {
- setlist chan []PullRequest // input channel for setting new lists
- getlist chan []PullRequest // output channel for getting existing list
-}
-
-// NewManager returns a new Manager object. It launches a goroutine that
-// waits for pull requests.
-//
-func NewManager() *Manager {
- r := Manager{
- make(chan []PullRequest),
- make(chan []PullRequest),
- }
- go r.listen()
- return &r
-}
-
-// SetList sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
-//
-func (r *Manager) SetList(pr []PullRequest) {
- r.setlist <- pr
-}
-
-// GetList reports the contents of the current pull list.
-func (r *Manager) GetList() []PullRequest {
- return <-r.getlist
-}
-
-// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
-//
-func (r *Manager) Close() {
- close(r.setlist)
-}
-
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-func (r *Manager) listen() {
- var current []PullRequest
- for {
- select {
- case p, ok := <-r.setlist:
- if ok {
- current = p
- } else {
- // The input channel is closed; time to shut down
- close(r.getlist)
- return
- }
- case r.getlist <- current:
- // no-op
- }
- }
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list