[ARVADOS] updated: 53f785c298338645b6880f22f26b0c36a7cfab4d

git at public.curoverse.com git at public.curoverse.com
Mon Apr 20 21:15:31 EDT 2015


Summary of changes:
 sdk/go/logger/logger.go                            |   7 +-
 sdk/go/logger/util.go                              |  20 +++
 services/datamanager/collection/collection.go      |  32 +++--
 services/datamanager/collection/collection_test.go |  10 +-
 services/datamanager/datamanager.go                |  35 +++--
 services/datamanager/keep/keep.go                  |  34 +++--
 services/datamanager/summary/file.go               |   4 +-
 services/datamanager/summary/summary.go            | 154 ++++++++++++++++++---
 services/datamanager/summary/summary_test.go       |  15 +-
 9 files changed, 239 insertions(+), 72 deletions(-)
 create mode 100644 sdk/go/logger/util.go

       via  53f785c298338645b6880f22f26b0c36a7cfab4d (commit)
       via  0191604164c265a7b972a5802680477f20451d9d (commit)
      from  8e1a832244c109351a20a7181da0f65c73e63987 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 53f785c298338645b6880f22f26b0c36a7cfab4d
Author: mishaz <misha at curoverse.com>
Date:   Tue Apr 21 01:04:36 2015 +0000

    Added logger util GetOrCreateMap() and started using it everywhere.
    Moved logging of summary info into ReadCollections.Summarize().
    Renamed ComputeBlockReplicationCounts(readServers *ReadServers) to ReadServers.Summarize() and moved logging into method.
    Split SummarizeReplication() into BucketReplication() and ComputeCounts().
    Some general go-fmting, cleanup and documentation.

diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go
index ce18e90..a989afc 100644
--- a/sdk/go/logger/logger.go
+++ b/sdk/go/logger/logger.go
@@ -14,8 +14,11 @@
 // 	entry map[string]interface{}) {
 //   // Modifiy properties and entry however you want
 //   // properties is a shortcut for entry["properties"].(map[string]interface{})
-//   // properties can take any values you want to give it,
-//   // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+//   // properties can take any (valid) values you want to give it,
+//   // entry will only take the fields listed at
+//   // http://doc.arvados.org/api/schema/Log.html
+//   // Valid values for properties are anything that can be json
+//   // encoded (i.e. will not error if you call json.Marshal() on it.
 // })
 package logger
 
diff --git a/sdk/go/logger/util.go b/sdk/go/logger/util.go
new file mode 100644
index 0000000..6425aca
--- /dev/null
+++ b/sdk/go/logger/util.go
@@ -0,0 +1,20 @@
+// Helper methods for interacting with Logger.
+package logger
+
+// Retrieves the map[string]interface{} stored at parent[key] if it
+// exists, otherwise it makes it and stores it there.
+// This is useful for logger because you may not know if a map you
+// need has already been created.
+func GetOrCreateMap(
+	parent map[string]interface{},
+	key string) (child map[string]interface{}) {
+	read, exists := parent[key]
+	if exists {
+		child = read.(map[string]interface{})
+
+	} else {
+		child = make(map[string]interface{})
+		parent[key] = child
+	}
+	return
+}
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 32cdcfb..f63b95f 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -98,17 +98,7 @@ func WriteHeapProfile() {
 
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
 	results = GetCollections(params)
-	results.Summarize()
-
-	if params.Logger != nil {
-		params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			collectionInfo := p["collection_info"].(map[string]interface{})
-			// Since maps are shallow copied, we run a risk of concurrent
-			// updates here. By copying results.OwnerToCollectionSize into
-			// the log, we're assuming that it won't be updated.
-			collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
-		})
-	}
+	results.Summarize(params.Logger)
 
 	log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
 	log.Printf("Read and processed %d collections",
@@ -160,10 +150,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
 	if params.Logger != nil {
 		params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			collectionInfo := make(map[string]interface{})
+			collectionInfo := logger.GetOrCreateMap(p, "collection_info")
 			collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
 			collectionInfo["batch_size"] = params.BatchSize
-			p["collection_info"] = collectionInfo
 		})
 	}
 
@@ -205,7 +194,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
 		if params.Logger != nil {
 			params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-				collectionInfo := p["collection_info"].(map[string]interface{})
+				collectionInfo := logger.GetOrCreateMap(p, "collection_info")
 				collectionInfo["collections_read"] = totalCollections
 				collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
 				collectionInfo["total_manifest_size"] = totalManifestSize
@@ -295,7 +284,7 @@ func ProcessCollections(arvLogger *logger.Logger,
 	return
 }
 
-func (readCollections *ReadCollections) Summarize() {
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
 	readCollections.OwnerToCollectionSize = make(map[string]int)
 	readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
 	numCollections := len(readCollections.UuidToCollection)
@@ -322,5 +311,18 @@ func (readCollections *ReadCollections) Summarize() {
 		}
 	}
 
+	if arvLogger != nil {
+		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+			collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+			// Since maps are shallow copied, we run a risk of concurrent
+			// updates here. By copying results.OwnerToCollectionSize into
+			// the log, we're assuming that it won't be updated.
+			collectionInfo["owner_to_collection_size"] =
+				readCollections.OwnerToCollectionSize
+			collectionInfo["distinct_blocks_named"] =
+				len(readCollections.BlockToReplication)
+		})
+	}
+
 	return
 }
diff --git a/services/datamanager/collection/collection_test.go b/services/datamanager/collection/collection_test.go
index 9117321..d42b2c9 100644
--- a/services/datamanager/collection/collection_test.go
+++ b/services/datamanager/collection/collection_test.go
@@ -63,10 +63,10 @@ func CompareSummarizedReadCollections(t *testing.T,
 func TestSummarizeSimple(t *testing.T) {
 	rc := MakeTestReadCollections([]TestCollectionSpec{TestCollectionSpec{
 		ReplicationLevel: 5,
-		Blocks: []int{1, 2},
+		Blocks:           []int{1, 2},
 	}})
 
-	rc.Summarize()
+	rc.Summarize(nil)
 
 	c := rc.UuidToCollection["col0"]
 
@@ -86,15 +86,15 @@ func TestSummarizeOverlapping(t *testing.T) {
 	rc := MakeTestReadCollections([]TestCollectionSpec{
 		TestCollectionSpec{
 			ReplicationLevel: 5,
-			Blocks: []int{1, 2},
+			Blocks:           []int{1, 2},
 		},
 		TestCollectionSpec{
 			ReplicationLevel: 8,
-			Blocks: []int{2, 3},
+			Blocks:           []int{2, 3},
 		},
 	})
 
-	rc.Summarize()
+	rc.Summarize(nil)
 
 	c0 := rc.UuidToCollection["col0"]
 	c1 := rc.UuidToCollection["col1"]
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 1f9d77b..d3efe62 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -103,19 +103,34 @@ func singlerun() {
 
 	summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
 
-	replicationSummary :=
-		summary.SummarizeReplication(readCollections, keepServerInfo)
+	buckets := summary.BucketReplication(readCollections, keepServerInfo)
+	bucketCounts := buckets.Counts()
+
+	replicationSummary := buckets.SummarizeBuckets(readCollections)
+	replicationCounts := replicationSummary.ComputeCounts()
 
 	log.Printf("Blocks In Collections: %d, "+
 		"\nBlocks In Keep: %d.",
 		len(readCollections.BlockToReplication),
 		len(keepServerInfo.BlockToServers))
-	log.Println(replicationSummary.ComputeCounts().PrettyPrint())
+	log.Println(replicationCounts.PrettyPrint())
+
+	log.Printf("Blocks Histogram:")
+	for _, rlbss := range bucketCounts {
+		log.Printf("%+v: %10d",
+			rlbss.Levels,
+			rlbss.Count)
+	}
 
 	// Log that we're finished. We force the recording, since go will
-	// not wait for the timer before exiting.
+	// not wait for the write timer before exiting.
 	if arvLogger != nil {
 		arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+			summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+			summaryInfo["block_replication_counts"] = bucketCounts
+			summaryInfo["replication_summary"] = replicationCounts
+			p["summary_info"] = summaryInfo
+
 			p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
 		})
 	}
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 93246bc..857ab86 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -116,7 +116,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
 	results = GetKeepServers(params)
 	log.Printf("Returned %d keep disks", len(results.ServerToContents))
 
-	ComputeBlockReplicationCounts(&results)
+	results.Summarize(params.Logger)
 	log.Printf("Replication level distribution: %v",
 		results.BlockReplicationCounts)
 
@@ -146,13 +146,10 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
 
 	if params.Logger != nil {
 		params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			keepInfo := make(map[string]interface{})
-
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
 			keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
 			keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
 			keepInfo["keep_servers"] = sdkResponse.KeepServers
-
-			p["keep_info"] = keepInfo
 		})
 	}
 
@@ -223,7 +220,10 @@ func GetServerContents(arvLogger *logger.Logger,
 	resp, err := client.Do(req)
 	if err != nil {
 		loggerutil.FatalWithMessage(arvLogger,
-			fmt.Sprintf("Error fetching %s: %v", req.URL.String(), err))
+			fmt.Sprintf("Error fetching %s: %v. Response was %+v",
+				req.URL.String(),
+				err,
+				resp))
 	}
 
 	return ReadServerResponse(arvLogger, keepServer, resp)
@@ -239,7 +239,7 @@ func GetServerStatus(arvLogger *logger.Logger,
 	if arvLogger != nil {
 		now := time.Now()
 		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			keepInfo := p["keep_info"].(map[string]interface{})
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
 			serverInfo := make(map[string]interface{})
 			serverInfo["status_request_sent_at"] = now
 			serverInfo["host"] = keepServer.Host
@@ -272,7 +272,7 @@ func GetServerStatus(arvLogger *logger.Logger,
 	if arvLogger != nil {
 		now := time.Now()
 		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			keepInfo := p["keep_info"].(map[string]interface{})
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
 			serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 			serverInfo["status_response_processed_at"] = now
 			serverInfo["status"] = keepStatus
@@ -288,7 +288,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
 	if arvLogger != nil {
 		now := time.Now()
 		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			keepInfo := p["keep_info"].(map[string]interface{})
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
 			serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 			serverInfo["index_request_sent_at"] = now
 		})
@@ -319,7 +319,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
 	if arvLogger != nil {
 		now := time.Now()
 		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-			keepInfo := p["keep_info"].(map[string]interface{})
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
 			serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 			serverInfo["index_response_received_at"] = now
 		})
@@ -355,7 +355,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
 				log.Println(message)
 				if arvLogger != nil {
 					arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-						keepInfo := p["keep_info"].(map[string]interface{})
+						keepInfo := logger.GetOrCreateMap(p, "keep_info")
 						serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 						var error_list []string
 						read_error_list, has_list := serverInfo["error_list"]
@@ -393,7 +393,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
 		if arvLogger != nil {
 			now := time.Now()
 			arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-				keepInfo := p["keep_info"].(map[string]interface{})
+				keepInfo := logger.GetOrCreateMap(p, "keep_info")
 				serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 
 				serverInfo["processing_finished_at"] = now
@@ -435,10 +435,18 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
 	return
 }
 
-func ComputeBlockReplicationCounts(readServers *ReadServers) {
+func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
 	readServers.BlockReplicationCounts = make(map[int]int)
 	for _, infos := range readServers.BlockToServers {
 		replication := len(infos)
 		readServers.BlockReplicationCounts[replication] += 1
 	}
+
+	if arvLogger != nil {
+		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+			keepInfo := logger.GetOrCreateMap(p, "keep_info")
+			keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
+		})
+	}
+
 }
diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go
index 15c3699..ce7d687 100644
--- a/services/datamanager/summary/file.go
+++ b/services/datamanager/summary/file.go
@@ -101,8 +101,8 @@ func MaybeReadData(arvLogger *logger.Logger,
 
 		// re-summarize data, so that we can update our summarizing
 		// functions without needing to do all our network i/o
-		data.ReadCollections.Summarize()
-		keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+		data.ReadCollections.Summarize(arvLogger)
+		data.KeepServerInfo.Summarize(arvLogger)
 
 		*readCollections = data.ReadCollections
 		*keepServerInfo = data.KeepServerInfo
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index 37e9373..8621d55 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -8,19 +8,30 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/blockdigest"
 	"git.curoverse.com/arvados.git/services/datamanager/collection"
 	"git.curoverse.com/arvados.git/services/datamanager/keep"
+	"sort"
 )
 
 type BlockSet map[blockdigest.BlockDigest]struct{}
 
+// Adds a single block to the set.
 func (bs BlockSet) Insert(digest blockdigest.BlockDigest) {
 	bs[digest] = struct{}{}
 }
 
+// Adds a set of blocks to the set.
+func (bs BlockSet) Union(obs BlockSet) {
+	for k, v := range obs {
+		bs[k] = v
+	}
+}
+
 // We use the collection index to save space. To convert to and from
 // the uuid, use collection.ReadCollections' fields
 // CollectionIndexToUuid and CollectionUuidToIndex.
 type CollectionIndexSet map[int]struct{}
 
+// Adds a single collection to the set. The collection is specified by
+// its index.
 func (cis CollectionIndexSet) Insert(collectionIndex int) {
 	cis[collectionIndex] = struct{}{}
 }
@@ -29,12 +40,37 @@ func (bs BlockSet) ToCollectionIndexSet(
 	readCollections collection.ReadCollections,
 	collectionIndexSet *CollectionIndexSet) {
 	for block := range bs {
-		for _,collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+		for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
 			collectionIndexSet.Insert(collectionIndex)
 		}
 	}
 }
 
+// Keeps track of the requested and actual replication levels.
+// Currently this is only used for blocks but could easily be used for
+// collections as well.
+type ReplicationLevels struct {
+	// The requested replication level.
+	// For Blocks this is the maximum replication level among all the
+	// collections this block belongs to.
+	Requested int
+
+	// The actual number of keep servers this is on.
+	Actual int
+}
+
+// Maps from replication levels to their blocks.
+type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
+
+// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
+type ReplicationLevelBlockCount struct {
+	Levels ReplicationLevels
+	Count  int
+}
+
+// An ordered list of ReplicationLevelBlockCount useful for reporting.
+type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+
 type ReplicationSummary struct {
 	CollectionBlocksNotInKeep  BlockSet
 	UnderReplicatedBlocks      BlockSet
@@ -61,9 +97,66 @@ type ReplicationSummaryCounts struct {
 	CorrectlyReplicatedCollections int
 }
 
+// Gets the BlockSet for a given set of ReplicationLevels, creating it
+// if it doesn't already exist.
+func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
+	repLevels ReplicationLevels) (bs BlockSet) {
+	bs, exists := rlbs[repLevels]
+	if !exists {
+		bs = make(BlockSet)
+		rlbs[repLevels] = bs
+	}
+	return
+}
+
+// Adds a block to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Insert(
+	repLevels ReplicationLevels,
+	block blockdigest.BlockDigest) {
+	rlbs.GetOrCreate(repLevels).Insert(block)
+}
+
+// Adds a set of blocks to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Union(
+	repLevels ReplicationLevels,
+	bs BlockSet) {
+	rlbs.GetOrCreate(repLevels).Union(bs)
+}
+
+// Outputs a sorted list of ReplicationLevelBlockCounts.
+func (rlbs ReplicationLevelBlockSetMap) Counts() (
+	sorted ReplicationLevelBlockSetSlice) {
+	sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
+	i := 0
+	for levels, set := range rlbs {
+		sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
+		i++
+	}
+	sort.Sort(sorted)
+	return
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Len() int {
+	return len(rlbss)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
+	return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
+		(rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
+			rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
+	rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
+}
+
 func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
-	// TODO(misha): Consider replacing this brute-force approach by
-	// iterating through the fields using reflection.
+	// TODO(misha): Consider rewriting this method to iterate through
+	// the fields using reflection, instead of explictily listing the
+	// fields as we do now.
 	rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
 	rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
 	rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
@@ -99,34 +192,53 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string {
 		rsc.CorrectlyReplicatedCollections)
 }
 
-func SummarizeReplication(readCollections collection.ReadCollections,
-	keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+func BucketReplication(readCollections collection.ReadCollections,
+	keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
+	rlbsm = make(ReplicationLevelBlockSetMap)
+
+	for block, requestedReplication := range readCollections.BlockToReplication {
+		rlbsm.Insert(
+			ReplicationLevels{
+				Requested: requestedReplication,
+				Actual:    len(keepServerInfo.BlockToServers[block])},
+			block)
+	}
+
+	for block, servers := range keepServerInfo.BlockToServers {
+		if 0 == readCollections.BlockToReplication[block] {
+			rlbsm.Insert(
+				ReplicationLevels{Requested: 0, Actual: len(servers)},
+				block)
+		}
+	}
+	return
+}
+
+func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
+	readCollections collection.ReadCollections) (
+	rs ReplicationSummary) {
 	rs.CollectionBlocksNotInKeep = make(BlockSet)
 	rs.UnderReplicatedBlocks = make(BlockSet)
 	rs.OverReplicatedBlocks = make(BlockSet)
 	rs.CorrectlyReplicatedBlocks = make(BlockSet)
 	rs.KeepBlocksNotInCollections = make(BlockSet)
+
 	rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
 	rs.UnderReplicatedCollections = make(CollectionIndexSet)
 	rs.OverReplicatedCollections = make(CollectionIndexSet)
 	rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
 
-	for block, requestedReplication := range readCollections.BlockToReplication {
-		actualReplication := len(keepServerInfo.BlockToServers[block])
-		if actualReplication == 0 {
-			rs.CollectionBlocksNotInKeep.Insert(block)
-		} else if actualReplication < requestedReplication {
-			rs.UnderReplicatedBlocks.Insert(block)
-		} else if actualReplication > requestedReplication {
-			rs.OverReplicatedBlocks.Insert(block)
+	for levels, bs := range rlbsm {
+		if levels.Actual == 0 {
+			rs.CollectionBlocksNotInKeep.Union(bs)
+		} else if levels.Requested == 0 {
+			rs.KeepBlocksNotInCollections.Union(bs)
+		} else if levels.Actual < levels.Requested {
+			rs.UnderReplicatedBlocks.Union(bs)
+		} else if levels.Actual > levels.Requested {
+			rs.OverReplicatedBlocks.Union(bs)
 		} else {
-			rs.CorrectlyReplicatedBlocks.Insert(block)
-		}
-	}
-
-	for block, _ := range keepServerInfo.BlockToServers {
-		if 0 == readCollections.BlockToReplication[block] {
-			rs.KeepBlocksNotInCollections.Insert(block)
+			rs.CorrectlyReplicatedBlocks.Union(bs)
 		}
 	}
 
@@ -151,5 +263,5 @@ func SummarizeReplication(readCollections collection.ReadCollections,
 		}
 	}
 
-	return rs
+	return
 }
diff --git a/services/datamanager/summary/summary_test.go b/services/datamanager/summary/summary_test.go
index 05db2fd..04ca5a5 100644
--- a/services/datamanager/summary/summary_test.go
+++ b/services/datamanager/summary/summary_test.go
@@ -36,6 +36,13 @@ func (cis CollectionIndexSet) ToSlice() (ints []int) {
 	return
 }
 
+// Helper method to meet interface expected by older tests.
+func SummarizeReplication(readCollections collection.ReadCollections,
+	keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+	return BucketReplication(readCollections, keepServerInfo).
+		SummarizeBuckets(readCollections)
+}
+
 // Takes a map from block digest to replication level and represents
 // it in a keep.ReadServers structure.
 func SpecifyReplication(digestToReplication map[int]int) (rs keep.ReadServers) {
@@ -94,7 +101,7 @@ func TestSimpleSummary(t *testing.T) {
 	rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
 		collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
 	})
-	rc.Summarize()
+	rc.Summarize(nil)
 	cIndex := rc.CollectionIndicesForTesting()
 
 	keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1})
@@ -123,7 +130,7 @@ func TestMissingBlock(t *testing.T) {
 	rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
 		collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
 	})
-	rc.Summarize()
+	rc.Summarize(nil)
 	cIndex := rc.CollectionIndicesForTesting()
 
 	keepInfo := SpecifyReplication(map[int]int{1: 1})
@@ -154,7 +161,7 @@ func TestUnderAndOverReplicatedBlocks(t *testing.T) {
 	rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
 		collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{1, 2}},
 	})
-	rc.Summarize()
+	rc.Summarize(nil)
 	cIndex := rc.CollectionIndicesForTesting()
 
 	keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 3})
@@ -187,7 +194,7 @@ func TestMixedReplication(t *testing.T) {
 		collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{3, 4}},
 		collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{5, 6}},
 	})
-	rc.Summarize()
+	rc.Summarize(nil)
 	cIndex := rc.CollectionIndicesForTesting()
 
 	keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1, 3: 1, 5: 1, 6: 3, 7: 2})

commit 0191604164c265a7b972a5802680477f20451d9d
Author: mishaz <misha at curoverse.com>
Date:   Tue Mar 17 17:53:54 2015 +0000

    gofmt cleanup.

diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index cd141ee..1f9d77b 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -66,7 +66,7 @@ func singlerun() {
 	var arvLogger *logger.Logger
 	if logEventTypePrefix != "" {
 		arvLogger = logger.NewLogger(logger.LoggerParams{
-			Client: arv,
+			Client:          arv,
 			EventTypePrefix: logEventTypePrefix,
 			WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
 	}
@@ -78,7 +78,7 @@ func singlerun() {
 
 	var (
 		readCollections collection.ReadCollections
-		keepServerInfo keep.ReadServers
+		keepServerInfo  keep.ReadServers
 	)
 
 	if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
@@ -87,8 +87,8 @@ func singlerun() {
 		go func() {
 			collectionChannel <- collection.GetCollectionsAndSummarize(
 				collection.GetCollectionsParams{
-					Client: arv,
-					Logger: arvLogger,
+					Client:    arv,
+					Logger:    arvLogger,
 					BatchSize: 50})
 		}()
 
@@ -96,7 +96,7 @@ func singlerun() {
 			keep.GetKeepServersParams{
 				Client: arv,
 				Logger: arvLogger,
-				Limit: 1000})
+				Limit:  1000})
 
 		readCollections = <-collectionChannel
 	}
@@ -106,7 +106,7 @@ func singlerun() {
 	replicationSummary :=
 		summary.SummarizeReplication(readCollections, keepServerInfo)
 
-	log.Printf("Blocks In Collections: %d, " +
+	log.Printf("Blocks In Collections: %d, "+
 		"\nBlocks In Keep: %d.",
 		len(readCollections.BlockToReplication),
 		len(keepServerInfo.BlockToServers))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list