[ARVADOS] updated: a5a4f79e91aa8bba1794394646808f6d4c444661
git at public.curoverse.com
git at public.curoverse.com
Mon Feb 23 18:15:59 EST 2015
Summary of changes:
doc/_includes/_navbar_top.liquid | 4 +-
doc/_layouts/default.html.liquid | 6 +-
doc/images/uses/shared.png | Bin 57235 -> 54402 bytes
doc/images/uses/{share.png => sharing.png} | Bin
doc/index.html.liquid | 26 +++-
.../getting_started/sharedata.html.textile.liquid | 2 +-
sdk/go/blockdigest/blockdigest.go | 10 +-
sdk/go/logger/logger.go | 32 +++--
services/datamanager/collection/collection.go | 52 ++++++--
services/datamanager/datamanager.go | 81 ++++++++---
services/datamanager/summary/summary.go | 148 +++++++++++++++++++++
11 files changed, 308 insertions(+), 53 deletions(-)
rename doc/images/uses/{share.png => sharing.png} (100%)
create mode 100644 services/datamanager/summary/summary.go
via a5a4f79e91aa8bba1794394646808f6d4c444661 (commit)
via d3f9fad0cc83a9af47589894998a781db9c60989 (commit)
via 6b5b6890158830b26161b3879a0d1eeaa122659f (commit)
via 248d2ebb4ab7ea3d9060838bcfbfe3b9330da5ee (commit)
via f5a886733b2f628f462dcc03f45d20621c8ee015 (commit)
via 83ea6e36b19db2c9a45be87c900efbbd9ea8bdb9 (commit)
via 1713f54c6b41cb18a69c09f361d97ca6384a9492 (commit)
via 546b44cf5dfba972d2f5f91fa4434e9cb64fa0cd (commit)
via bd53829c4e3c43be8e8125c59940bfb8c81f5233 (commit)
via 177e076e2602935b6549f9957f031353b09e77c0 (commit)
via 520008a541ac57bb07b11b79bd68d2930ea508a2 (commit)
via 17f611827a12edffec0245c27d4c15b785bc0436 (commit)
from 6100a423c0a75736238747224d72afb112793fdb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a5a4f79e91aa8bba1794394646808f6d4c444661
Author: mishaz <misha at curoverse.com>
Date: Mon Feb 23 23:14:17 2015 +0000
Added block to collection index map. Started using collection index to save memory over using long uuid strings to identify collections.
Started running summarize() methods after reading data from disk so that we can modify the summarization code and test it without network i/o.
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index b15864e..a3f1e31 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -38,11 +38,13 @@ type Collection struct {
}
type ReadCollections struct {
- ReadAllCollections bool
- UuidToCollection map[string]Collection
- OwnerToCollectionSize map[string]int
- BlockToReplication map[blockdigest.BlockDigest]int
- // TODO(misha): add block to collection map
+ ReadAllCollections bool
+ UuidToCollection map[string]Collection
+ OwnerToCollectionSize map[string]int
+ BlockToReplication map[blockdigest.BlockDigest]int
+ CollectionUuidToIndex map[string]int
+ CollectionIndexToUuid []string
+ BlockToCollectionIndex map[blockdigest.BlockDigest]int
}
type GetCollectionsParams struct {
@@ -296,12 +298,22 @@ func ProcessCollections(arvLogger *logger.Logger,
func Summarize(readCollections *ReadCollections) {
readCollections.OwnerToCollectionSize = make(map[string]int)
readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+ numCollections := len(readCollections.UuidToCollection)
+ readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
+ readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+ readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int)
for _, coll := range readCollections.UuidToCollection {
+ collectionIndex := len(readCollections.CollectionIndexToUuid)
+ readCollections.CollectionIndexToUuid =
+ append(readCollections.CollectionIndexToUuid, coll.Uuid)
+ readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+
readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
for block, _ := range coll.BlockDigestToSize {
+ readCollections.BlockToCollectionIndex[block] = collectionIndex
storedReplication := readCollections.BlockToReplication[block]
if coll.ReplicationLevel > storedReplication {
readCollections.BlockToReplication[block] = coll.ReplicationLevel
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index e432330..6698062 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -103,6 +103,12 @@ func MaybeReadData(arvLogger *logger.Logger,
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Failed to read summary data: %v", err))
}
+
+ // re-summarize data, so that we can update our summarizing
+ // functions without needing to do all our network i/o
+ collection.Summarize(&data.ReadCollections)
+ keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+
*readCollections = data.ReadCollections
*keepServerInfo = data.KeepServerInfo
log.Printf("Read summary data from: %s", readDataFrom)
commit d3f9fad0cc83a9af47589894998a781db9c60989
Author: mishaz <misha at curoverse.com>
Date: Mon Feb 23 22:49:18 2015 +0000
Fixed bug where I wasn't copied saved data.
Computed and logged summary of replication state.
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 0eca61c..b15864e 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -42,6 +42,7 @@ type ReadCollections struct {
UuidToCollection map[string]Collection
OwnerToCollectionSize map[string]int
BlockToReplication map[blockdigest.BlockDigest]int
+ // TODO(misha): add block to collection map
}
type GetCollectionsParams struct {
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 9035f11..ce8114a 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -103,9 +103,24 @@ func singlerun() {
summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
- // TODO(misha): Use these together to verify replication.
- _ = readCollections
- _ = keepServerInfo
+ replicationSummary :=
+ summary.SummarizeReplication(arvLogger, readCollections, keepServerInfo)
+
+ log.Printf("Replication Counts:" +
+ "\nBlocks In Collections: %d, " +
+ "\nBlocks In Keep: %d, " +
+ "\nMissing From Keep: %d, " +
+ "\nUnder Replicated: %d, " +
+ "\nOver Replicated: %d, " +
+ "\nReplicated Just Right: %d, " +
+ "\nNot In Any Collection: %d.",
+ len(readCollections.BlockToReplication),
+ len(keepServerInfo.BlockToServers),
+ len(replicationSummary.CollectionBlocksNotInKeep),
+ len(replicationSummary.UnderReplicatedBlocks),
+ len(replicationSummary.OverReplicatedBlocks),
+ len(replicationSummary.CorrectlyReplicatedBlocks),
+ len(replicationSummary.KeepBlocksNotInCollections))
// Log that we're finished. We force the recording, since go will
// not wait for the timer before exiting.
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index 8a381eb..e432330 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -6,6 +6,7 @@ import (
"encoding/gob"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/collection"
"git.curoverse.com/arvados.git/services/datamanager/keep"
@@ -14,17 +15,24 @@ import (
"os"
)
-var (
- // These are just used for development, to save network i/o
- writeDataTo string
- readDataFrom string
-)
+type ReplicationSummary struct {
+ CollectionBlocksNotInKeep map[blockdigest.BlockDigest]struct{}
+ UnderReplicatedBlocks map[blockdigest.BlockDigest]struct{}
+ OverReplicatedBlocks map[blockdigest.BlockDigest]struct{}
+ CorrectlyReplicatedBlocks map[blockdigest.BlockDigest]struct{}
+ KeepBlocksNotInCollections map[blockdigest.BlockDigest]struct{}
+}
type serializedData struct {
ReadCollections collection.ReadCollections
KeepServerInfo keep.ReadServers
}
+var (
+ writeDataTo string
+ readDataFrom string
+)
+
func init() {
flag.StringVar(&writeDataTo,
"write-data-to",
@@ -95,7 +103,40 @@ func MaybeReadData(arvLogger *logger.Logger,
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Failed to read summary data: %v", err))
}
+ *readCollections = data.ReadCollections
+ *keepServerInfo = data.KeepServerInfo
log.Printf("Read summary data from: %s", readDataFrom)
return true
}
}
+
+func SummarizeReplication(arvLogger *logger.Logger,
+ readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+ rs.CollectionBlocksNotInKeep = make(map[blockdigest.BlockDigest]struct{})
+ rs.UnderReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+ rs.OverReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+ rs.CorrectlyReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+ rs.KeepBlocksNotInCollections = make(map[blockdigest.BlockDigest]struct{})
+
+ for block, requestedReplication := range readCollections.BlockToReplication {
+ actualReplication := len(keepServerInfo.BlockToServers[block])
+ if actualReplication == 0 {
+ rs.CollectionBlocksNotInKeep[block] = struct{}{}
+ } else if actualReplication < requestedReplication {
+ rs.UnderReplicatedBlocks[block] = struct{}{}
+ } else if actualReplication > requestedReplication {
+ rs.OverReplicatedBlocks[block] = struct{}{}
+ } else {
+ rs.CorrectlyReplicatedBlocks[block] = struct{}{}
+ }
+ }
+
+ for block, _ := range keepServerInfo.BlockToServers {
+ if 0 == readCollections.BlockToReplication[block] {
+ rs.KeepBlocksNotInCollections[block] = struct{}{}
+ }
+ }
+
+ return rs
+}
commit 6b5b6890158830b26161b3879a0d1eeaa122659f
Author: mishaz <misha at curoverse.com>
Date: Mon Feb 23 21:41:19 2015 +0000
Added flags to write network data and then read it back. This is useful to speed up development, but should not be used in production since data will be stale.
Unfortunately had to switch H,L fields in blockdigest from private to public, otherwise they would not be exported.
diff --git a/sdk/go/blockdigest/blockdigest.go b/sdk/go/blockdigest/blockdigest.go
index 9b818d3..ad2e365 100644
--- a/sdk/go/blockdigest/blockdigest.go
+++ b/sdk/go/blockdigest/blockdigest.go
@@ -11,12 +11,12 @@ import (
// Stores a Block Locator Digest compactly, up to 128 bits.
// Can be used as a map key.
type BlockDigest struct {
- h uint64
- l uint64
+ H uint64
+ L uint64
}
func (d BlockDigest) String() string {
- return fmt.Sprintf("%016x%016x", d.h, d.l)
+ return fmt.Sprintf("%016x%016x", d.H, d.L)
}
// Will create a new BlockDigest unless an error is encountered.
@@ -27,11 +27,11 @@ func FromString(s string) (dig BlockDigest, err error) {
}
var d BlockDigest
- d.h, err = strconv.ParseUint(s[:16], 16, 64)
+ d.H, err = strconv.ParseUint(s[:16], 16, 64)
if err != nil {
return
}
- d.l, err = strconv.ParseUint(s[16:], 16, 64)
+ d.L, err = strconv.ParseUint(s[16:], 16, 64)
if err != nil {
return
}
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 67dbf08..9035f11 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -10,6 +10,7 @@ import (
"git.curoverse.com/arvados.git/services/datamanager/collection"
"git.curoverse.com/arvados.git/services/datamanager/keep"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "git.curoverse.com/arvados.git/services/datamanager/summary"
"log"
"time"
)
@@ -75,23 +76,32 @@ func singlerun() {
arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
}
- collectionChannel := make(chan collection.ReadCollections)
+ var (
+ readCollections collection.ReadCollections
+ keepServerInfo keep.ReadServers
+ )
- go func() {
- collectionChannel <- collection.GetCollectionsAndSummarize(
- collection.GetCollectionsParams{
+ if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
+ collectionChannel := make(chan collection.ReadCollections)
+
+ go func() {
+ collectionChannel <- collection.GetCollectionsAndSummarize(
+ collection.GetCollectionsParams{
+ Client: arv,
+ Logger: arvLogger,
+ BatchSize: 50})
+ }()
+
+ keepServerInfo = keep.GetKeepServersAndSummarize(
+ keep.GetKeepServersParams{
Client: arv,
Logger: arvLogger,
- BatchSize: 50})
- }()
+ Limit: 1000})
- keepServerInfo := keep.GetKeepServersAndSummarize(
- keep.GetKeepServersParams{
- Client: arv,
- Logger: arvLogger,
- Limit: 1000})
+ readCollections = <-collectionChannel
+ }
- readCollections := <-collectionChannel
+ summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
// TODO(misha): Use these together to verify replication.
_ = readCollections
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
new file mode 100644
index 0000000..8a381eb
--- /dev/null
+++ b/services/datamanager/summary/summary.go
@@ -0,0 +1,101 @@
+/* Computes Summary based on data read from API server. */
+
+package summary
+
+import (
+ "encoding/gob"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/logger"
+ "git.curoverse.com/arvados.git/services/datamanager/collection"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
+ "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "log"
+ "os"
+)
+
+var (
+ // These are just used for development, to save network i/o
+ writeDataTo string
+ readDataFrom string
+)
+
+type serializedData struct {
+ ReadCollections collection.ReadCollections
+ KeepServerInfo keep.ReadServers
+}
+
+func init() {
+ flag.StringVar(&writeDataTo,
+ "write-data-to",
+ "",
+ "Write summary of data received to this file. Used for development only.")
+ flag.StringVar(&readDataFrom,
+ "read-data-from",
+ "",
+ "Avoid network i/o and read summary data from this file instead. Used for development only.")
+}
+
+// Writes data we've read to a file.
+//
+// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeWriteData(arvLogger *logger.Logger,
+ readCollections collection.ReadCollections,
+ keepServerInfo keep.ReadServers) bool {
+ if writeDataTo == "" {
+ return false
+ } else {
+ summaryFile, err := os.Create(writeDataTo)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+ }
+ defer summaryFile.Close()
+
+ enc := gob.NewEncoder(summaryFile)
+ data := serializedData{
+ ReadCollections: readCollections,
+ KeepServerInfo: keepServerInfo}
+ err = enc.Encode(data)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to write summary data: %v", err))
+ }
+ log.Printf("Wrote summary data to: %s", writeDataTo)
+ return true
+ }
+}
+
+// Reads data that we've read to a file.
+//
+// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeReadData(arvLogger *logger.Logger,
+ readCollections *collection.ReadCollections,
+ keepServerInfo *keep.ReadServers) bool {
+ if readDataFrom == "" {
+ return false
+ } else {
+ summaryFile, err := os.Open(readDataFrom)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", readDataFrom, err))
+ }
+ defer summaryFile.Close()
+
+ dec := gob.NewDecoder(summaryFile)
+ data := serializedData{}
+ err = dec.Decode(&data)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to read summary data: %v", err))
+ }
+ log.Printf("Read summary data from: %s", readDataFrom)
+ return true
+ }
+}
commit 248d2ebb4ab7ea3d9060838bcfbfe3b9330da5ee
Author: mishaz <misha at curoverse.com>
Date: Mon Feb 23 19:41:29 2015 +0000
Added BlockToReplication field to collection.ReadCollections.
Lots of other little cleanup.
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 9a7a838..0eca61c 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -19,12 +19,16 @@ import (
)
var (
- heap_profile_filename string
+ heapProfileFilename string
// globals for debugging
totalManifestSize uint64
maxManifestSize uint64
)
+const (
+ DefaultReplicationLevel = 2
+)
+
type Collection struct {
Uuid string
OwnerUuid string
@@ -37,6 +41,7 @@ type ReadCollections struct {
ReadAllCollections bool
UuidToCollection map[string]Collection
OwnerToCollectionSize map[string]int
+ BlockToReplication map[blockdigest.BlockDigest]int
}
type GetCollectionsParams struct {
@@ -59,7 +64,7 @@ type SdkCollectionList struct {
}
func init() {
- flag.StringVar(&heap_profile_filename,
+ flag.StringVar(&heapProfileFilename,
"heap-profile",
"",
"File to write the heap profiles to. Leave blank to skip profiling.")
@@ -72,9 +77,9 @@ func init() {
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
func WriteHeapProfile() {
- if heap_profile_filename != "" {
+ if heapProfileFilename != "" {
- heap_profile, err := os.Create(heap_profile_filename)
+ heap_profile, err := os.Create(heapProfileFilename)
if err != nil {
log.Fatal(err)
}
@@ -90,7 +95,7 @@ func WriteHeapProfile() {
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
- ComputeSizeOfOwnedCollections(&results)
+ Summarize(&results)
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
@@ -237,13 +242,18 @@ func ProcessCollections(arvLogger *logger.Logger,
"Arvados SDK collection returned with unexpected zero "+
"modifcation date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
- "it returns modification dates: %v",
+ "it returns modification dates: %+v",
collection))
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
latestModificationDate = sdkCollection.ModifiedAt
}
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = DefaultReplicationLevel
+ }
+
manifest := manifest.Manifest{sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
@@ -282,11 +292,20 @@ func ProcessCollections(arvLogger *logger.Logger,
return
}
-func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+func Summarize(readCollections *ReadCollections) {
readCollections.OwnerToCollectionSize = make(map[string]int)
+ readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+
for _, coll := range readCollections.UuidToCollection {
readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+
+ for block, _ := range coll.BlockDigestToSize {
+ storedReplication := readCollections.BlockToReplication[block]
+ if coll.ReplicationLevel > storedReplication {
+ readCollections.BlockToReplication[block] = coll.ReplicationLevel
+ }
+ }
}
return
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index a8e506e..67dbf08 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -64,7 +64,8 @@ func singlerun() {
var arvLogger *logger.Logger
if logEventTypePrefix != "" {
- arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
+ arvLogger = logger.NewLogger(logger.LoggerParams{
+ Client: arv,
EventTypePrefix: logEventTypePrefix,
WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
}
@@ -79,11 +80,16 @@ func singlerun() {
go func() {
collectionChannel <- collection.GetCollectionsAndSummarize(
collection.GetCollectionsParams{
- Client: arv, Logger: arvLogger, BatchSize: 50})
+ Client: arv,
+ Logger: arvLogger,
+ BatchSize: 50})
}()
keepServerInfo := keep.GetKeepServersAndSummarize(
- keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000})
+ keep.GetKeepServersParams{
+ Client: arv,
+ Logger: arvLogger,
+ Limit: 1000})
readCollections := <-collectionChannel
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list