[ARVADOS] created: eca9e074e76ea7e619af5d9b36f8fba428318c6f
git at public.curoverse.com
git at public.curoverse.com
Fri Sep 25 09:15:47 EDT 2015
at eca9e074e76ea7e619af5d9b36f8fba428318c6f (commit)
commit eca9e074e76ea7e619af5d9b36f8fba428318c6f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jul 15 11:32:33 2015 -0400
6611: Send pull lists to keepstore
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 28d558b..627e33d 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -129,8 +129,7 @@ func singlerun() {
&keepServerInfo,
replicationSummary.KeepBlocksNotInCollections)
- summary.WritePullLists(arvLogger, pullLists)
-
+ keep.SendPullLists(arvLogger, kc, pullLists)
keep.SendTrashLists(arvLogger, kc, trashLists)
// Log that we're finished. We force the recording, since go will
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 112823e..9327fcd 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -454,9 +454,15 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
}
+type Locator blockdigest.DigestWithSize
+
+func (l Locator) MarshalJSON() ([]byte, error) {
+ return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
+}
+
type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
+ Locator Locator `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
}
type TrashList []TrashRequest
@@ -511,3 +517,63 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
<-rendezvous
}
}
+
+// One entry in the Pull List
+type PullRequest struct {
+ Locator Locator `json:"locator"`
+ Servers []string `json:"servers"`
+}
+
+// The Pull List for a particular server
+type PullList []PullRequest
+
+func SendPullLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]PullList) {
+ count := 0
+ rendezvous := make(chan bool)
+
+ for url, v := range spl {
+ count += 1
+ log.Printf("Sending pull list to %v", url)
+
+ go (func(url string, v PullList) {
+ defer (func() {
+ rendezvous <- true
+ })()
+
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/pull", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating pull list request for %v error: %v", url, err.Error())
+ return
+ }
+
+ // Add api token header
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = kc.Client.Do(req); err != nil {
+ log.Printf("Error sending pull list to %v error: %v", url, err.Error())
+ return
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ log.Printf("Error sending pull list to %v error: %v", url, err.Error())
+ }
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ })(url, v)
+
+ }
+
+ for i := 0; i < count; i += 1 {
+ <-rendezvous
+ }
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index 3a246a2..f317517 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -14,24 +14,9 @@ import (
"strings"
)
-type Locator blockdigest.DigestWithSize
-
-func (l Locator) MarshalJSON() ([]byte, error) {
- return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// One entry in the Pull List
-type PullRequest struct {
- Locator Locator `json:"locator"`
- Servers []string `json:"servers"`
-}
-
-// The Pull List for a particular server
-type PullList []PullRequest
-
// PullListByLocator implements sort.Interface for PullList based on
// the Digest.
-type PullListByLocator PullList
+type PullListByLocator keep.PullList
func (a PullListByLocator) Len() int { return len(a) }
func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@@ -66,8 +51,8 @@ type PullServers struct {
func ComputePullServers(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
blockToDesiredReplication map[blockdigest.DigestWithSize]int,
- underReplicated BlockSet) (m map[Locator]PullServers) {
- m = map[Locator]PullServers{}
+ underReplicated BlockSet) (m map[keep.Locator]PullServers) {
+ m = map[keep.Locator]PullServers{}
// We use CanonicalString to avoid filling memory with dupicate
// copies of the same string.
var cs CanonicalString
@@ -100,7 +85,7 @@ func ComputePullServers(kc *keepclient.KeepClient,
roots := keepclient.NewRootSorter(kc.LocalRoots(),
block.String()).GetSortedRoots()
- l := Locator(block)
+ l := keep.Locator(block)
m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
roots, numCopiesMissing)
}
@@ -148,8 +133,8 @@ func RemoveProtocolPrefix(url string) string {
}
// Produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
- spl = map[string]PullList{}
+func BuildPullLists(lps map[keep.Locator]PullServers) (spl map[string]keep.PullList) {
+ spl = map[string]keep.PullList{}
// We don't worry about canonicalizing our strings here, because we
// assume lps was created by ComputePullServers() which already
// canonicalized the strings for us.
@@ -157,10 +142,10 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
for _, destination := range pullServers.To {
pullList, pullListExists := spl[destination]
if !pullListExists {
- pullList = PullList{}
+ pullList = keep.PullList{}
}
spl[destination] = append(pullList,
- PullRequest{Locator: locator, Servers: pullServers.From})
+ keep.PullRequest{Locator: locator, Servers: pullServers.From})
}
}
return
@@ -172,7 +157,7 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
// This is just a hack for prototyping, it is not expected to be used
// in production.
func WritePullLists(arvLogger *logger.Logger,
- pullLists map[string]PullList) {
+ pullLists map[string]keep.PullList) {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index efb40e2..bb2fb80 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -46,7 +46,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
_, writable := writableServers[srv]
if writable {
- m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: keep.Locator(block), BlockMtime: block_on_server.Mtime})
}
}
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 3dfdce2..33511bd 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
// actually deleting anything.
var never_delete = true
+// accept but don't process pull lists
+var never_pull = true
+
var maxBuffers = 128
var bufs *bufferPool
@@ -233,8 +236,13 @@ func main() {
&never_delete,
"never-delete",
true,
- "If set, nothing will be deleted. HTTP 405 will be returned "+
- "for valid DELETE requests.")
+ "If true, nothing will be deleted. HTTP 405 will be returned "+
+ "for valid DELETE requests, and don't process trash lists.")
+ flag.BoolVar(
+ &never_pull,
+ "never-pull",
+ true,
+ "If true, don't process pull lists")
flag.StringVar(
&blob_signing_key_file,
"permission-key-file",
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 3d67cf2..20f3471 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -40,6 +40,11 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
Write to storage
*/
func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
+
+ if never_pull {
+ return errors.New(fmt.Sprintf("never_pull is true"))
+ }
+
keepClient.Arvados.ApiToken = token
service_roots := make(map[string]string)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list