[ARVADOS] created: eca9e074e76ea7e619af5d9b36f8fba428318c6f

git at public.curoverse.com git at public.curoverse.com
Fri Sep 25 09:15:47 EDT 2015


        at  eca9e074e76ea7e619af5d9b36f8fba428318c6f (commit)


commit eca9e074e76ea7e619af5d9b36f8fba428318c6f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jul 15 11:32:33 2015 -0400

    6611: Send pull lists to keepstore

diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 28d558b..627e33d 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -129,8 +129,7 @@ func singlerun() {
 		&keepServerInfo,
 		replicationSummary.KeepBlocksNotInCollections)
 
-	summary.WritePullLists(arvLogger, pullLists)
-
+	keep.SendPullLists(arvLogger, kc, pullLists)
 	keep.SendTrashLists(arvLogger, kc, trashLists)
 
 	// Log that we're finished. We force the recording, since go will
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 112823e..9327fcd 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -454,9 +454,15 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
 
 }
 
+type Locator blockdigest.DigestWithSize
+
+func (l Locator) MarshalJSON() ([]byte, error) {
+	return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
+}
+
 type TrashRequest struct {
-	Locator    string `json:"locator"`
-	BlockMtime int64  `json:"block_mtime"`
+	Locator    Locator `json:"locator"`
+	BlockMtime int64   `json:"block_mtime"`
 }
 
 type TrashList []TrashRequest
@@ -511,3 +517,63 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
 		<-rendezvous
 	}
 }
+
+// One entry in the Pull List
+type PullRequest struct {
+	Locator Locator  `json:"locator"`
+	Servers []string `json:"servers"`
+}
+
+// The Pull List for a particular server
+type PullList []PullRequest
+
+func SendPullLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]PullList) {
+	count := 0
+	rendezvous := make(chan bool)
+
+	for url, v := range spl {
+		count += 1
+		log.Printf("Sending pull list to %v", url)
+
+		go (func(url string, v PullList) {
+			defer (func() {
+				rendezvous <- true
+			})()
+
+			pipeReader, pipeWriter := io.Pipe()
+			go (func() {
+				enc := json.NewEncoder(pipeWriter)
+				enc.Encode(v)
+				pipeWriter.Close()
+			})()
+
+			req, err := http.NewRequest("PUT", fmt.Sprintf("%s/pull", url), pipeReader)
+			if err != nil {
+				log.Printf("Error creating pull list request for %v error: %v", url, err.Error())
+				return
+			}
+
+			// Add api token header
+			req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+			// Make the request
+			var resp *http.Response
+			if resp, err = kc.Client.Do(req); err != nil {
+				log.Printf("Error sending pull list to %v error: %v", url, err.Error())
+				return
+			}
+
+			if resp.StatusCode != http.StatusOK {
+				log.Printf("Error sending pull list to %v error: %v", url, err.Error())
+			}
+
+			io.Copy(ioutil.Discard, resp.Body)
+			resp.Body.Close()
+		})(url, v)
+
+	}
+
+	for i := 0; i < count; i += 1 {
+		<-rendezvous
+	}
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index 3a246a2..f317517 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -14,24 +14,9 @@ import (
 	"strings"
 )
 
-type Locator blockdigest.DigestWithSize
-
-func (l Locator) MarshalJSON() ([]byte, error) {
-	return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// One entry in the Pull List
-type PullRequest struct {
-	Locator Locator  `json:"locator"`
-	Servers []string `json:"servers"`
-}
-
-// The Pull List for a particular server
-type PullList []PullRequest
-
 // PullListByLocator implements sort.Interface for PullList based on
 // the Digest.
-type PullListByLocator PullList
+type PullListByLocator keep.PullList
 
 func (a PullListByLocator) Len() int      { return len(a) }
 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
@@ -66,8 +51,8 @@ type PullServers struct {
 func ComputePullServers(kc *keepclient.KeepClient,
 	keepServerInfo *keep.ReadServers,
 	blockToDesiredReplication map[blockdigest.DigestWithSize]int,
-	underReplicated BlockSet) (m map[Locator]PullServers) {
-	m = map[Locator]PullServers{}
+	underReplicated BlockSet) (m map[keep.Locator]PullServers) {
+	m = map[keep.Locator]PullServers{}
 	// We use CanonicalString to avoid filling memory with dupicate
 	// copies of the same string.
 	var cs CanonicalString
@@ -100,7 +85,7 @@ func ComputePullServers(kc *keepclient.KeepClient,
 				roots := keepclient.NewRootSorter(kc.LocalRoots(),
 					block.String()).GetSortedRoots()
 
-				l := Locator(block)
+				l := keep.Locator(block)
 				m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
 					roots, numCopiesMissing)
 			}
@@ -148,8 +133,8 @@ func RemoveProtocolPrefix(url string) string {
 }
 
 // Produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
-	spl = map[string]PullList{}
+func BuildPullLists(lps map[keep.Locator]PullServers) (spl map[string]keep.PullList) {
+	spl = map[string]keep.PullList{}
 	// We don't worry about canonicalizing our strings here, because we
 	// assume lps was created by ComputePullServers() which already
 	// canonicalized the strings for us.
@@ -157,10 +142,10 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
 		for _, destination := range pullServers.To {
 			pullList, pullListExists := spl[destination]
 			if !pullListExists {
-				pullList = PullList{}
+				pullList = keep.PullList{}
 			}
 			spl[destination] = append(pullList,
-				PullRequest{Locator: locator, Servers: pullServers.From})
+				keep.PullRequest{Locator: locator, Servers: pullServers.From})
 		}
 	}
 	return
@@ -172,7 +157,7 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
 // This is just a hack for prototyping, it is not expected to be used
 // in production.
 func WritePullLists(arvLogger *logger.Logger,
-	pullLists map[string]PullList) {
+	pullLists map[string]keep.PullList) {
 	r := strings.NewReplacer(":", ".")
 	for host, list := range pullLists {
 		filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index efb40e2..bb2fb80 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -46,7 +46,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 				_, writable := writableServers[srv]
 
 				if writable {
-					m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+					m[srv] = append(m[srv], keep.TrashRequest{Locator: keep.Locator(block), BlockMtime: block_on_server.Mtime})
 				}
 			}
 		}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 3dfdce2..33511bd 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -56,6 +56,9 @@ var data_manager_token string
 // actually deleting anything.
 var never_delete = true
 
+// accept but don't process pull lists
+var never_pull = true
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -233,8 +236,13 @@ func main() {
 		&never_delete,
 		"never-delete",
 		true,
-		"If set, nothing will be deleted. HTTP 405 will be returned "+
-			"for valid DELETE requests.")
+		"If true, nothing will be deleted. HTTP 405 will be returned "+
+			"for valid DELETE requests, and don't process trash lists.")
+	flag.BoolVar(
+		&never_pull,
+		"never-pull",
+		true,
+		"If true, don't process pull lists")
 	flag.StringVar(
 		&blob_signing_key_file,
 		"permission-key-file",
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 3d67cf2..20f3471 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -40,6 +40,11 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
 		Write to storage
 */
 func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
+
+	if never_pull {
+		return errors.New(fmt.Sprintf("never_pull is true"))
+	}
+
 	keepClient.Arvados.ApiToken = token
 
 	service_roots := make(map[string]string)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list