[ARVADOS] updated: d7635fbe751b2d00dd722a038723577f344406e1

git at public.curoverse.com git at public.curoverse.com
Mon Jun 1 20:49:24 EDT 2015


Summary of changes:
 services/datamanager/collection/collection.go  |  2 ++
 services/datamanager/datamanager.go            | 50 ++++++++++++++++----------
 services/datamanager/summary/file.go           | 16 ++++++---
 services/datamanager/summary/pull_list.go      | 21 +++++++----
 services/datamanager/summary/pull_list_test.go | 16 +++++++++
 5 files changed, 77 insertions(+), 28 deletions(-)

       via  d7635fbe751b2d00dd722a038723577f344406e1 (commit)
      from  5824ee2e5198dd46a7813fe2adbd380a114f9ac4 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d7635fbe751b2d00dd722a038723577f344406e1
Author: mishaz <misha at curoverse.com>
Date:   Tue Jun 2 00:26:37 2015 +0000

    Changes in response to code review.
    
    Created DataFetcher type so that we can abstract whether our data is read from remote servers or local files.
    Moved code for reading from remote servers into BuildDataFetcher().
    Split summary.MaybeReadData() into ShouldReadData() and ReadData().
    
    Started paying attention to new writable flag for keep servers. Required reworking CreatePullServers somewhat.
    
    Cleaned up code for adding pull list to new destination.
    Updated tests.

diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index e0929b7..ddc4f95 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -25,6 +25,8 @@ var (
 )
 
 const (
+	// TODO(misha): Read this value from the SDK once support is added
+	// as suggested in https://arvados.org/issues/3408#note-31
 	DefaultReplicationLevel = 2
 )
 
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 587d426..3e84308 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -79,30 +79,19 @@ func singlerun() {
 	}
 
 	var (
+		dataFetcher     summary.DataFetcher
 		readCollections collection.ReadCollections
 		keepServerInfo  keep.ReadServers
 	)
 
-	if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
-		collectionChannel := make(chan collection.ReadCollections)
-
-		go func() {
-			collectionChannel <- collection.GetCollectionsAndSummarize(
-				collection.GetCollectionsParams{
-					Client:    arv,
-					Logger:    arvLogger,
-					BatchSize: 50})
-		}()
-
-		keepServerInfo = keep.GetKeepServersAndSummarize(
-			keep.GetKeepServersParams{
-				Client: arv,
-				Logger: arvLogger,
-				Limit:  1000})
-
-		readCollections = <-collectionChannel
+	if summary.ShouldReadData() {
+		dataFetcher = summary.ReadData
+	} else {
+		dataFetcher = BuildDataFetcher(arv)
 	}
 
+	dataFetcher(arvLogger, &readCollections, &keepServerInfo)
+
 	summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
 
 	buckets := summary.BucketReplication(readCollections, keepServerInfo)
@@ -152,3 +141,28 @@ func singlerun() {
 		})
 	}
 }
+
+// Returns a data fetcher that fetches data from remote servers.
+func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
+	return func(arvLogger *logger.Logger,
+		readCollections *collection.ReadCollections,
+		keepServerInfo *keep.ReadServers) {
+		collectionChannel := make(chan collection.ReadCollections)
+
+		go func() {
+			collectionChannel <- collection.GetCollectionsAndSummarize(
+				collection.GetCollectionsParams{
+					Client:    arv,
+					Logger:    arvLogger,
+					BatchSize: 50})
+		}()
+
+		*keepServerInfo = keep.GetKeepServersAndSummarize(
+			keep.GetKeepServersParams{
+				Client: arv,
+				Logger: arvLogger,
+				Limit:  1000})
+
+		*readCollections = <-collectionChannel
+	}
+}
diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go
index ce7d687..8c37e99 100644
--- a/services/datamanager/summary/file.go
+++ b/services/datamanager/summary/file.go
@@ -26,6 +26,10 @@ var (
 	readDataFrom string
 )
 
+type DataFetcher func(arvLogger *logger.Logger,
+	readCollections *collection.ReadCollections,
+	keepServerInfo *keep.ReadServers)
+
 func init() {
 	flag.StringVar(&writeDataTo,
 		"write-data-to",
@@ -71,6 +75,10 @@ func MaybeWriteData(arvLogger *logger.Logger,
 	}
 }
 
+func ShouldReadData() bool {
+	return readDataFrom != ""
+}
+
 // Reads data that we've written to a file.
 //
 // This is useful for development, so that we don't need to read all
@@ -78,11 +86,12 @@ func MaybeWriteData(arvLogger *logger.Logger,
 //
 // This should not be used outside of development, since you'll be
 // working with stale data.
-func MaybeReadData(arvLogger *logger.Logger,
+func ReadData(arvLogger *logger.Logger,
 	readCollections *collection.ReadCollections,
-	keepServerInfo *keep.ReadServers) bool {
+	keepServerInfo *keep.ReadServers) {
 	if readDataFrom == "" {
-		return false
+		loggerutil.FatalWithMessage(arvLogger,
+			"ReadData() called with empty filename.")
 	} else {
 		summaryFile, err := os.Open(readDataFrom)
 		if err != nil {
@@ -107,6 +116,5 @@ func MaybeReadData(arvLogger *logger.Logger,
 		*readCollections = data.ReadCollections
 		*keepServerInfo = data.KeepServerInfo
 		log.Printf("Read summary data from: %s", readDataFrom)
-		return true
 	}
 }
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index 64a83a5..726f2c6 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -62,6 +62,12 @@ func ComputePullServers(kc *keepclient.KeepClient,
 	// copies of the same string.
 	var cs CanonicalString
 
+	// Servers that are writeable
+	writableServers := map[string]struct{}{}
+	for _, url := range kc.WritableLocalRoots() {
+		writableServers[cs.Get(RemoveProtocolPrefix(url))] = struct{}{}
+	}
+
 	for block, _ := range underReplicated {
 		serversStoringBlock := keepServerInfo.BlockToServers[block]
 		numCopies := len(serversStoringBlock)
@@ -79,14 +85,15 @@ func ComputePullServers(kc *keepclient.KeepClient,
 				serverHasBlock := map[string]struct{}{}
 				for _, info := range serversStoringBlock {
 					sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
-					serverHasBlock[sa.HostPort()] = struct{}{}
+					serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
 				}
 
 				roots := keepclient.NewRootSorter(kc.LocalRoots(),
 					block.String()).GetSortedRoots()
 
 				l := Locator{Digest: block}
-				m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
+				m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
+					roots, numCopiesMissing)
 			}
 		}
 	}
@@ -98,6 +105,7 @@ func ComputePullServers(kc *keepclient.KeepClient,
 // strings.
 func CreatePullServers(cs CanonicalString,
 	serverHasBlock map[string]struct{},
+	writableServers map[string]struct{},
 	sortedServers []string,
 	maxToFields int) (ps PullServers) {
 
@@ -114,7 +122,10 @@ func CreatePullServers(cs CanonicalString,
 		if hasBlock {
 			ps.From = append(ps.From, server)
 		} else if len(ps.To) < maxToFields {
-			ps.To = append(ps.To, server)
+			_, writable := writableServers[server]
+			if writable {
+				ps.To = append(ps.To, server)
+			}
 		}
 	}
 
@@ -137,11 +148,9 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
 			pullList, pullListExists := spl[destination]
 			if !pullListExists {
 				pullList = PullList{}
-				spl[destination] = pullList
 			}
-			pullList = append(pullList,
+			spl[destination] = append(pullList,
 				PullRequest{Locator: locator, Servers: pullServers.From})
-			spl[destination] = pullList
 		}
 	}
 	return
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
index 9d7075c..692af5c 100644
--- a/services/datamanager/summary/pull_list_test.go
+++ b/services/datamanager/summary/pull_list_test.go
@@ -46,6 +46,7 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet(),
+			stringSet(),
 			[]string{},
 			5),
 		DeepEquals,
@@ -54,6 +55,7 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet("keep0:25107", "keep1:25108"),
+			stringSet(),
 			[]string{},
 			5),
 		DeepEquals,
@@ -62,6 +64,7 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet("keep0:25107", "keep1:25108"),
+			stringSet("keep0:25107"),
 			[]string{"keep0:25107"},
 			5),
 		DeepEquals,
@@ -70,6 +73,7 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet("keep0:25107", "keep1:25108"),
+			stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
 			[]string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
 			5),
 		DeepEquals,
@@ -79,6 +83,17 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet("keep0:25107", "keep1:25108"),
+			stringSet("keep3:25110", "keep1:25108", "keep0:25107"),
+			[]string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
+			5),
+		DeepEquals,
+		PullServers{To: []string{"keep3:25110"},
+			From: []string{"keep1:25108", "keep0:25107"}})
+
+	c.Check(
+		CreatePullServers(cs,
+			stringSet("keep0:25107", "keep1:25108"),
+			stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
 			[]string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
 			1),
 		DeepEquals,
@@ -88,6 +103,7 @@ func (s *MySuite) TestCreatePullServers(c *C) {
 	c.Check(
 		CreatePullServers(cs,
 			stringSet("keep0:25107", "keep1:25108"),
+			stringSet("keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"),
 			[]string{"keep3:25110", "keep2:25109", "keep1:25108", "keep0:25107"},
 			0),
 		DeepEquals,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list