[ARVADOS] updated: 53f785c298338645b6880f22f26b0c36a7cfab4d
git at public.curoverse.com
git at public.curoverse.com
Mon Apr 20 21:15:31 EDT 2015
Summary of changes:
sdk/go/logger/logger.go | 7 +-
sdk/go/logger/util.go | 20 +++
services/datamanager/collection/collection.go | 32 +++--
services/datamanager/collection/collection_test.go | 10 +-
services/datamanager/datamanager.go | 35 +++--
services/datamanager/keep/keep.go | 34 +++--
services/datamanager/summary/file.go | 4 +-
services/datamanager/summary/summary.go | 154 ++++++++++++++++++---
services/datamanager/summary/summary_test.go | 15 +-
9 files changed, 239 insertions(+), 72 deletions(-)
create mode 100644 sdk/go/logger/util.go
via 53f785c298338645b6880f22f26b0c36a7cfab4d (commit)
via 0191604164c265a7b972a5802680477f20451d9d (commit)
from 8e1a832244c109351a20a7181da0f65c73e63987 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 53f785c298338645b6880f22f26b0c36a7cfab4d
Author: mishaz <misha at curoverse.com>
Date: Tue Apr 21 01:04:36 2015 +0000
Added logger util GetOrCreateMap() and started using it everywhere.
Moved logging of summary info into ReadCollections.Summarize().
Renamed ComputeBlockReplicationCounts(readServers *ReadServers) to ReadServers.Summarize() and moved logging into method.
Split SummarizeReplication() into BucketReplication() and ComputeCounts().
Some general go-fmting, cleanup and documentation.
diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go
index ce18e90..a989afc 100644
--- a/sdk/go/logger/logger.go
+++ b/sdk/go/logger/logger.go
@@ -14,8 +14,11 @@
// entry map[string]interface{}) {
// // Modifiy properties and entry however you want
// // properties is a shortcut for entry["properties"].(map[string]interface{})
-// // properties can take any values you want to give it,
-// // entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
+// // properties can take any (valid) values you want to give it,
+// // entry will only take the fields listed at
+// // http://doc.arvados.org/api/schema/Log.html
+// // Valid values for properties are anything that can be json
+// // encoded (i.e. will not error if you call json.Marshal() on it.
// })
package logger
diff --git a/sdk/go/logger/util.go b/sdk/go/logger/util.go
new file mode 100644
index 0000000..6425aca
--- /dev/null
+++ b/sdk/go/logger/util.go
@@ -0,0 +1,20 @@
+// Helper methods for interacting with Logger.
+package logger
+
+// Retrieves the map[string]interface{} stored at parent[key] if it
+// exists, otherwise it makes it and stores it there.
+// This is useful for logger because you may not know if a map you
+// need has already been created.
+func GetOrCreateMap(
+ parent map[string]interface{},
+ key string) (child map[string]interface{}) {
+ read, exists := parent[key]
+ if exists {
+ child = read.(map[string]interface{})
+
+ } else {
+ child = make(map[string]interface{})
+ parent[key] = child
+ }
+ return
+}
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 32cdcfb..f63b95f 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -98,17 +98,7 @@ func WriteHeapProfile() {
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
- results.Summarize()
-
- if params.Logger != nil {
- params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := p["collection_info"].(map[string]interface{})
- // Since maps are shallow copied, we run a risk of concurrent
- // updates here. By copying results.OwnerToCollectionSize into
- // the log, we're assuming that it won't be updated.
- collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
- })
- }
+ results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
log.Printf("Read and processed %d collections",
@@ -160,10 +150,9 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := make(map[string]interface{})
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
collectionInfo["batch_size"] = params.BatchSize
- p["collection_info"] = collectionInfo
})
}
@@ -205,7 +194,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := p["collection_info"].(map[string]interface{})
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
collectionInfo["collections_read"] = totalCollections
collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
collectionInfo["total_manifest_size"] = totalManifestSize
@@ -295,7 +284,7 @@ func ProcessCollections(arvLogger *logger.Logger,
return
}
-func (readCollections *ReadCollections) Summarize() {
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
readCollections.OwnerToCollectionSize = make(map[string]int)
readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
numCollections := len(readCollections.UuidToCollection)
@@ -322,5 +311,18 @@ func (readCollections *ReadCollections) Summarize() {
}
}
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ // Since maps are shallow copied, we run a risk of concurrent
+ // updates here. By copying results.OwnerToCollectionSize into
+ // the log, we're assuming that it won't be updated.
+ collectionInfo["owner_to_collection_size"] =
+ readCollections.OwnerToCollectionSize
+ collectionInfo["distinct_blocks_named"] =
+ len(readCollections.BlockToReplication)
+ })
+ }
+
return
}
diff --git a/services/datamanager/collection/collection_test.go b/services/datamanager/collection/collection_test.go
index 9117321..d42b2c9 100644
--- a/services/datamanager/collection/collection_test.go
+++ b/services/datamanager/collection/collection_test.go
@@ -63,10 +63,10 @@ func CompareSummarizedReadCollections(t *testing.T,
func TestSummarizeSimple(t *testing.T) {
rc := MakeTestReadCollections([]TestCollectionSpec{TestCollectionSpec{
ReplicationLevel: 5,
- Blocks: []int{1, 2},
+ Blocks: []int{1, 2},
}})
- rc.Summarize()
+ rc.Summarize(nil)
c := rc.UuidToCollection["col0"]
@@ -86,15 +86,15 @@ func TestSummarizeOverlapping(t *testing.T) {
rc := MakeTestReadCollections([]TestCollectionSpec{
TestCollectionSpec{
ReplicationLevel: 5,
- Blocks: []int{1, 2},
+ Blocks: []int{1, 2},
},
TestCollectionSpec{
ReplicationLevel: 8,
- Blocks: []int{2, 3},
+ Blocks: []int{2, 3},
},
})
- rc.Summarize()
+ rc.Summarize(nil)
c0 := rc.UuidToCollection["col0"]
c1 := rc.UuidToCollection["col1"]
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 1f9d77b..d3efe62 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -103,19 +103,34 @@ func singlerun() {
summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
- replicationSummary :=
- summary.SummarizeReplication(readCollections, keepServerInfo)
+ buckets := summary.BucketReplication(readCollections, keepServerInfo)
+ bucketCounts := buckets.Counts()
+
+ replicationSummary := buckets.SummarizeBuckets(readCollections)
+ replicationCounts := replicationSummary.ComputeCounts()
log.Printf("Blocks In Collections: %d, "+
"\nBlocks In Keep: %d.",
len(readCollections.BlockToReplication),
len(keepServerInfo.BlockToServers))
- log.Println(replicationSummary.ComputeCounts().PrettyPrint())
+ log.Println(replicationCounts.PrettyPrint())
+
+ log.Printf("Blocks Histogram:")
+ for _, rlbss := range bucketCounts {
+ log.Printf("%+v: %10d",
+ rlbss.Levels,
+ rlbss.Count)
+ }
// Log that we're finished. We force the recording, since go will
- // not wait for the timer before exiting.
+ // not wait for the write timer before exiting.
if arvLogger != nil {
arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+ summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+ summaryInfo["block_replication_counts"] = bucketCounts
+ summaryInfo["replication_summary"] = replicationCounts
+ p["summary_info"] = summaryInfo
+
p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
})
}
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 93246bc..857ab86 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -116,7 +116,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
results = GetKeepServers(params)
log.Printf("Returned %d keep disks", len(results.ServerToContents))
- ComputeBlockReplicationCounts(&results)
+ results.Summarize(params.Logger)
log.Printf("Replication level distribution: %v",
results.BlockReplicationCounts)
@@ -146,13 +146,10 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := make(map[string]interface{})
-
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
keepInfo["keep_servers"] = sdkResponse.KeepServers
-
- p["keep_info"] = keepInfo
})
}
@@ -223,7 +220,10 @@ func GetServerContents(arvLogger *logger.Logger,
resp, err := client.Do(req)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error fetching %s: %v", req.URL.String(), err))
+ fmt.Sprintf("Error fetching %s: %v. Response was %+v",
+ req.URL.String(),
+ err,
+ resp))
}
return ReadServerResponse(arvLogger, keepServer, resp)
@@ -239,7 +239,7 @@ func GetServerStatus(arvLogger *logger.Logger,
if arvLogger != nil {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := make(map[string]interface{})
serverInfo["status_request_sent_at"] = now
serverInfo["host"] = keepServer.Host
@@ -272,7 +272,7 @@ func GetServerStatus(arvLogger *logger.Logger,
if arvLogger != nil {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
serverInfo["status_response_processed_at"] = now
serverInfo["status"] = keepStatus
@@ -288,7 +288,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
if arvLogger != nil {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
serverInfo["index_request_sent_at"] = now
})
@@ -319,7 +319,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
if arvLogger != nil {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
serverInfo["index_response_received_at"] = now
})
@@ -355,7 +355,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
log.Println(message)
if arvLogger != nil {
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
var error_list []string
read_error_list, has_list := serverInfo["error_list"]
@@ -393,7 +393,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
if arvLogger != nil {
now := time.Now()
arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
serverInfo["processing_finished_at"] = now
@@ -435,10 +435,18 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
return
}
-func ComputeBlockReplicationCounts(readServers *ReadServers) {
+func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
readServers.BlockReplicationCounts = make(map[int]int)
for _, infos := range readServers.BlockToServers {
replication := len(infos)
readServers.BlockReplicationCounts[replication] += 1
}
+
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
+ keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
+ })
+ }
+
}
diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go
index 15c3699..ce7d687 100644
--- a/services/datamanager/summary/file.go
+++ b/services/datamanager/summary/file.go
@@ -101,8 +101,8 @@ func MaybeReadData(arvLogger *logger.Logger,
// re-summarize data, so that we can update our summarizing
// functions without needing to do all our network i/o
- data.ReadCollections.Summarize()
- keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+ data.ReadCollections.Summarize(arvLogger)
+ data.KeepServerInfo.Summarize(arvLogger)
*readCollections = data.ReadCollections
*keepServerInfo = data.KeepServerInfo
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index 37e9373..8621d55 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -8,19 +8,30 @@ import (
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/services/datamanager/collection"
"git.curoverse.com/arvados.git/services/datamanager/keep"
+ "sort"
)
type BlockSet map[blockdigest.BlockDigest]struct{}
+// Adds a single block to the set.
func (bs BlockSet) Insert(digest blockdigest.BlockDigest) {
bs[digest] = struct{}{}
}
+// Adds a set of blocks to the set.
+func (bs BlockSet) Union(obs BlockSet) {
+ for k, v := range obs {
+ bs[k] = v
+ }
+}
+
// We use the collection index to save space. To convert to and from
// the uuid, use collection.ReadCollections' fields
// CollectionIndexToUuid and CollectionUuidToIndex.
type CollectionIndexSet map[int]struct{}
+// Adds a single collection to the set. The collection is specified by
+// its index.
func (cis CollectionIndexSet) Insert(collectionIndex int) {
cis[collectionIndex] = struct{}{}
}
@@ -29,12 +40,37 @@ func (bs BlockSet) ToCollectionIndexSet(
readCollections collection.ReadCollections,
collectionIndexSet *CollectionIndexSet) {
for block := range bs {
- for _,collectionIndex := range readCollections.BlockToCollectionIndices[block] {
+ for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
collectionIndexSet.Insert(collectionIndex)
}
}
}
+// Keeps track of the requested and actual replication levels.
+// Currently this is only used for blocks but could easily be used for
+// collections as well.
+type ReplicationLevels struct {
+ // The requested replication level.
+ // For Blocks this is the maximum replication level among all the
+ // collections this block belongs to.
+ Requested int
+
+ // The actual number of keep servers this is on.
+ Actual int
+}
+
+// Maps from replication levels to their blocks.
+type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
+
+// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks.
+type ReplicationLevelBlockCount struct {
+ Levels ReplicationLevels
+ Count int
+}
+
+// An ordered list of ReplicationLevelBlockCount useful for reporting.
+type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
+
type ReplicationSummary struct {
CollectionBlocksNotInKeep BlockSet
UnderReplicatedBlocks BlockSet
@@ -61,9 +97,66 @@ type ReplicationSummaryCounts struct {
CorrectlyReplicatedCollections int
}
+// Gets the BlockSet for a given set of ReplicationLevels, creating it
+// if it doesn't already exist.
+func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
+ repLevels ReplicationLevels) (bs BlockSet) {
+ bs, exists := rlbs[repLevels]
+ if !exists {
+ bs = make(BlockSet)
+ rlbs[repLevels] = bs
+ }
+ return
+}
+
+// Adds a block to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Insert(
+ repLevels ReplicationLevels,
+ block blockdigest.BlockDigest) {
+ rlbs.GetOrCreate(repLevels).Insert(block)
+}
+
+// Adds a set of blocks to the set for a given replication level.
+func (rlbs ReplicationLevelBlockSetMap) Union(
+ repLevels ReplicationLevels,
+ bs BlockSet) {
+ rlbs.GetOrCreate(repLevels).Union(bs)
+}
+
+// Outputs a sorted list of ReplicationLevelBlockCounts.
+func (rlbs ReplicationLevelBlockSetMap) Counts() (
+ sorted ReplicationLevelBlockSetSlice) {
+ sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
+ i := 0
+ for levels, set := range rlbs {
+ sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
+ i++
+ }
+ sort.Sort(sorted)
+ return
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Len() int {
+ return len(rlbss)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
+ return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
+ (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
+ rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
+}
+
+// Implemented to meet sort.Interface
+func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
+ rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
+}
+
func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
- // TODO(misha): Consider replacing this brute-force approach by
- // iterating through the fields using reflection.
+ // TODO(misha): Consider rewriting this method to iterate through
+ // the fields using reflection, instead of explictily listing the
+ // fields as we do now.
rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
@@ -99,34 +192,53 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string {
rsc.CorrectlyReplicatedCollections)
}
-func SummarizeReplication(readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+func BucketReplication(readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) {
+ rlbsm = make(ReplicationLevelBlockSetMap)
+
+ for block, requestedReplication := range readCollections.BlockToReplication {
+ rlbsm.Insert(
+ ReplicationLevels{
+ Requested: requestedReplication,
+ Actual: len(keepServerInfo.BlockToServers[block])},
+ block)
+ }
+
+ for block, servers := range keepServerInfo.BlockToServers {
+ if 0 == readCollections.BlockToReplication[block] {
+ rlbsm.Insert(
+ ReplicationLevels{Requested: 0, Actual: len(servers)},
+ block)
+ }
+ }
+ return
+}
+
+func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets(
+ readCollections collection.ReadCollections) (
+ rs ReplicationSummary) {
rs.CollectionBlocksNotInKeep = make(BlockSet)
rs.UnderReplicatedBlocks = make(BlockSet)
rs.OverReplicatedBlocks = make(BlockSet)
rs.CorrectlyReplicatedBlocks = make(BlockSet)
rs.KeepBlocksNotInCollections = make(BlockSet)
+
rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
rs.UnderReplicatedCollections = make(CollectionIndexSet)
rs.OverReplicatedCollections = make(CollectionIndexSet)
rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
- for block, requestedReplication := range readCollections.BlockToReplication {
- actualReplication := len(keepServerInfo.BlockToServers[block])
- if actualReplication == 0 {
- rs.CollectionBlocksNotInKeep.Insert(block)
- } else if actualReplication < requestedReplication {
- rs.UnderReplicatedBlocks.Insert(block)
- } else if actualReplication > requestedReplication {
- rs.OverReplicatedBlocks.Insert(block)
+ for levels, bs := range rlbsm {
+ if levels.Actual == 0 {
+ rs.CollectionBlocksNotInKeep.Union(bs)
+ } else if levels.Requested == 0 {
+ rs.KeepBlocksNotInCollections.Union(bs)
+ } else if levels.Actual < levels.Requested {
+ rs.UnderReplicatedBlocks.Union(bs)
+ } else if levels.Actual > levels.Requested {
+ rs.OverReplicatedBlocks.Union(bs)
} else {
- rs.CorrectlyReplicatedBlocks.Insert(block)
- }
- }
-
- for block, _ := range keepServerInfo.BlockToServers {
- if 0 == readCollections.BlockToReplication[block] {
- rs.KeepBlocksNotInCollections.Insert(block)
+ rs.CorrectlyReplicatedBlocks.Union(bs)
}
}
@@ -151,5 +263,5 @@ func SummarizeReplication(readCollections collection.ReadCollections,
}
}
- return rs
+ return
}
diff --git a/services/datamanager/summary/summary_test.go b/services/datamanager/summary/summary_test.go
index 05db2fd..04ca5a5 100644
--- a/services/datamanager/summary/summary_test.go
+++ b/services/datamanager/summary/summary_test.go
@@ -36,6 +36,13 @@ func (cis CollectionIndexSet) ToSlice() (ints []int) {
return
}
+// Helper method to meet interface expected by older tests.
+func SummarizeReplication(readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+ return BucketReplication(readCollections, keepServerInfo).
+ SummarizeBuckets(readCollections)
+}
+
// Takes a map from block digest to replication level and represents
// it in a keep.ReadServers structure.
func SpecifyReplication(digestToReplication map[int]int) (rs keep.ReadServers) {
@@ -94,7 +101,7 @@ func TestSimpleSummary(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
})
- rc.Summarize()
+ rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1})
@@ -123,7 +130,7 @@ func TestMissingBlock(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
})
- rc.Summarize()
+ rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
keepInfo := SpecifyReplication(map[int]int{1: 1})
@@ -154,7 +161,7 @@ func TestUnderAndOverReplicatedBlocks(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{1, 2}},
})
- rc.Summarize()
+ rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 3})
@@ -187,7 +194,7 @@ func TestMixedReplication(t *testing.T) {
collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{3, 4}},
collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{5, 6}},
})
- rc.Summarize()
+ rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1, 3: 1, 5: 1, 6: 3, 7: 2})
commit 0191604164c265a7b972a5802680477f20451d9d
Author: mishaz <misha at curoverse.com>
Date: Tue Mar 17 17:53:54 2015 +0000
gofmt cleanup.
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index cd141ee..1f9d77b 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -66,7 +66,7 @@ func singlerun() {
var arvLogger *logger.Logger
if logEventTypePrefix != "" {
arvLogger = logger.NewLogger(logger.LoggerParams{
- Client: arv,
+ Client: arv,
EventTypePrefix: logEventTypePrefix,
WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
}
@@ -78,7 +78,7 @@ func singlerun() {
var (
readCollections collection.ReadCollections
- keepServerInfo keep.ReadServers
+ keepServerInfo keep.ReadServers
)
if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
@@ -87,8 +87,8 @@ func singlerun() {
go func() {
collectionChannel <- collection.GetCollectionsAndSummarize(
collection.GetCollectionsParams{
- Client: arv,
- Logger: arvLogger,
+ Client: arv,
+ Logger: arvLogger,
BatchSize: 50})
}()
@@ -96,7 +96,7 @@ func singlerun() {
keep.GetKeepServersParams{
Client: arv,
Logger: arvLogger,
- Limit: 1000})
+ Limit: 1000})
readCollections = <-collectionChannel
}
@@ -106,7 +106,7 @@ func singlerun() {
replicationSummary :=
summary.SummarizeReplication(readCollections, keepServerInfo)
- log.Printf("Blocks In Collections: %d, " +
+ log.Printf("Blocks In Collections: %d, "+
"\nBlocks In Keep: %d.",
len(readCollections.BlockToReplication),
len(keepServerInfo.BlockToServers))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list