[ARVADOS] updated: a5a4f79e91aa8bba1794394646808f6d4c444661

git at public.curoverse.com git at public.curoverse.com
Mon Feb 23 18:15:59 EST 2015


Summary of changes:
 doc/_includes/_navbar_top.liquid                   |   4 +-
 doc/_layouts/default.html.liquid                   |   6 +-
 doc/images/uses/shared.png                         | Bin 57235 -> 54402 bytes
 doc/images/uses/{share.png => sharing.png}         | Bin
 doc/index.html.liquid                              |  26 +++-
 .../getting_started/sharedata.html.textile.liquid  |   2 +-
 sdk/go/blockdigest/blockdigest.go                  |  10 +-
 sdk/go/logger/logger.go                            |  32 +++--
 services/datamanager/collection/collection.go      |  52 ++++++--
 services/datamanager/datamanager.go                |  81 ++++++++---
 services/datamanager/summary/summary.go            | 148 +++++++++++++++++++++
 11 files changed, 308 insertions(+), 53 deletions(-)
 rename doc/images/uses/{share.png => sharing.png} (100%)
 create mode 100644 services/datamanager/summary/summary.go

       via  a5a4f79e91aa8bba1794394646808f6d4c444661 (commit)
       via  d3f9fad0cc83a9af47589894998a781db9c60989 (commit)
       via  6b5b6890158830b26161b3879a0d1eeaa122659f (commit)
       via  248d2ebb4ab7ea3d9060838bcfbfe3b9330da5ee (commit)
       via  f5a886733b2f628f462dcc03f45d20621c8ee015 (commit)
       via  83ea6e36b19db2c9a45be87c900efbbd9ea8bdb9 (commit)
       via  1713f54c6b41cb18a69c09f361d97ca6384a9492 (commit)
       via  546b44cf5dfba972d2f5f91fa4434e9cb64fa0cd (commit)
       via  bd53829c4e3c43be8e8125c59940bfb8c81f5233 (commit)
       via  177e076e2602935b6549f9957f031353b09e77c0 (commit)
       via  520008a541ac57bb07b11b79bd68d2930ea508a2 (commit)
       via  17f611827a12edffec0245c27d4c15b785bc0436 (commit)
      from  6100a423c0a75736238747224d72afb112793fdb (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a5a4f79e91aa8bba1794394646808f6d4c444661
Author: mishaz <misha at curoverse.com>
Date:   Mon Feb 23 23:14:17 2015 +0000

    Added block to collection index map. Started using collection index to save memory over using long uuid strings to identify collections.
    Started running summarize() methods after reading data from disk so that we can modify the summarization code and test it without network i/o.

diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index b15864e..a3f1e31 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -38,11 +38,13 @@ type Collection struct {
 }
 
 type ReadCollections struct {
-	ReadAllCollections    bool
-	UuidToCollection      map[string]Collection
-	OwnerToCollectionSize map[string]int
-	BlockToReplication    map[blockdigest.BlockDigest]int
-	// TODO(misha): add block to collection map
+	ReadAllCollections     bool
+	UuidToCollection       map[string]Collection
+	OwnerToCollectionSize  map[string]int
+	BlockToReplication     map[blockdigest.BlockDigest]int
+	CollectionUuidToIndex  map[string]int
+	CollectionIndexToUuid  []string
+	BlockToCollectionIndex map[blockdigest.BlockDigest]int
 }
 
 type GetCollectionsParams struct {
@@ -296,12 +298,22 @@ func ProcessCollections(arvLogger *logger.Logger,
 func Summarize(readCollections *ReadCollections) {
 	readCollections.OwnerToCollectionSize = make(map[string]int)
 	readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+	numCollections := len(readCollections.UuidToCollection)
+	readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
+	readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+	readCollections.BlockToCollectionIndex = make(map[blockdigest.BlockDigest]int)
 
 	for _, coll := range readCollections.UuidToCollection {
+		collectionIndex := len(readCollections.CollectionIndexToUuid)
+		readCollections.CollectionIndexToUuid =
+			append(readCollections.CollectionIndexToUuid, coll.Uuid)
+		readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+
 		readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
 			readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
 
 		for block, _ := range coll.BlockDigestToSize {
+			readCollections.BlockToCollectionIndex[block] = collectionIndex
 			storedReplication := readCollections.BlockToReplication[block]
 			if coll.ReplicationLevel > storedReplication {
 				readCollections.BlockToReplication[block] = coll.ReplicationLevel
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index e432330..6698062 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -103,6 +103,12 @@ func MaybeReadData(arvLogger *logger.Logger,
 			loggerutil.FatalWithMessage(arvLogger,
 				fmt.Sprintf("Failed to read summary data: %v", err))
 		}
+
+		// re-summarize data, so that we can update our summarizing
+		// functions without needing to do all our network i/o
+		collection.Summarize(&data.ReadCollections)
+		keep.ComputeBlockReplicationCounts(&data.KeepServerInfo)
+
 		*readCollections = data.ReadCollections
 		*keepServerInfo = data.KeepServerInfo
 		log.Printf("Read summary data from: %s", readDataFrom)

commit d3f9fad0cc83a9af47589894998a781db9c60989
Author: mishaz <misha at curoverse.com>
Date:   Mon Feb 23 22:49:18 2015 +0000

    Fixed bug where I wasn't copied saved data.
    Computed and logged summary of replication state.

diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 0eca61c..b15864e 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -42,6 +42,7 @@ type ReadCollections struct {
 	UuidToCollection      map[string]Collection
 	OwnerToCollectionSize map[string]int
 	BlockToReplication    map[blockdigest.BlockDigest]int
+	// TODO(misha): add block to collection map
 }
 
 type GetCollectionsParams struct {
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 9035f11..ce8114a 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -103,9 +103,24 @@ func singlerun() {
 
 	summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
 
-	// TODO(misha): Use these together to verify replication.
-	_ = readCollections
-	_ = keepServerInfo
+	replicationSummary :=
+		summary.SummarizeReplication(arvLogger, readCollections, keepServerInfo)
+
+	log.Printf("Replication Counts:" +
+		"\nBlocks In Collections: %d, " +
+		"\nBlocks In Keep: %d, " +
+		"\nMissing From Keep: %d, " +
+		"\nUnder Replicated: %d, " +
+		"\nOver Replicated: %d, " +
+		"\nReplicated Just Right: %d, " +
+		"\nNot In Any Collection: %d.",
+		len(readCollections.BlockToReplication),
+		len(keepServerInfo.BlockToServers),
+		len(replicationSummary.CollectionBlocksNotInKeep),
+		len(replicationSummary.UnderReplicatedBlocks),
+		len(replicationSummary.OverReplicatedBlocks),
+		len(replicationSummary.CorrectlyReplicatedBlocks),
+		len(replicationSummary.KeepBlocksNotInCollections))
 
 	// Log that we're finished. We force the recording, since go will
 	// not wait for the timer before exiting.
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
index 8a381eb..e432330 100644
--- a/services/datamanager/summary/summary.go
+++ b/services/datamanager/summary/summary.go
@@ -6,6 +6,7 @@ import (
 	"encoding/gob"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/blockdigest"
 	"git.curoverse.com/arvados.git/sdk/go/logger"
 	"git.curoverse.com/arvados.git/services/datamanager/collection"
 	"git.curoverse.com/arvados.git/services/datamanager/keep"
@@ -14,17 +15,24 @@ import (
 	"os"
 )
 
-var (
-	// These are just used for development, to save network i/o
-	writeDataTo  string
-	readDataFrom string
-)
+type ReplicationSummary struct {
+	CollectionBlocksNotInKeep  map[blockdigest.BlockDigest]struct{}
+	UnderReplicatedBlocks      map[blockdigest.BlockDigest]struct{}
+	OverReplicatedBlocks       map[blockdigest.BlockDigest]struct{}
+	CorrectlyReplicatedBlocks  map[blockdigest.BlockDigest]struct{}
+	KeepBlocksNotInCollections map[blockdigest.BlockDigest]struct{}
+}
 
 type serializedData struct {
 	ReadCollections collection.ReadCollections
 	KeepServerInfo  keep.ReadServers
 }
 
+var (
+	writeDataTo  string
+	readDataFrom string
+)
+
 func init() {
 	flag.StringVar(&writeDataTo,
 		"write-data-to",
@@ -95,7 +103,40 @@ func MaybeReadData(arvLogger *logger.Logger,
 			loggerutil.FatalWithMessage(arvLogger,
 				fmt.Sprintf("Failed to read summary data: %v", err))
 		}
+		*readCollections = data.ReadCollections
+		*keepServerInfo = data.KeepServerInfo
 		log.Printf("Read summary data from: %s", readDataFrom)
 		return true
 	}
 }
+
+func SummarizeReplication(arvLogger *logger.Logger,
+	readCollections collection.ReadCollections,
+	keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
+	rs.CollectionBlocksNotInKeep = make(map[blockdigest.BlockDigest]struct{})
+	rs.UnderReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+	rs.OverReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+	rs.CorrectlyReplicatedBlocks = make(map[blockdigest.BlockDigest]struct{})
+	rs.KeepBlocksNotInCollections = make(map[blockdigest.BlockDigest]struct{})
+
+	for block, requestedReplication := range readCollections.BlockToReplication {
+		actualReplication := len(keepServerInfo.BlockToServers[block])
+		if actualReplication == 0 {
+			rs.CollectionBlocksNotInKeep[block] = struct{}{}
+		} else if actualReplication < requestedReplication {
+			rs.UnderReplicatedBlocks[block] = struct{}{}
+		} else if actualReplication > requestedReplication {
+			rs.OverReplicatedBlocks[block] = struct{}{}
+		} else {
+			rs.CorrectlyReplicatedBlocks[block] = struct{}{}
+		}
+	}
+
+	for block, _ := range keepServerInfo.BlockToServers {
+		if 0 == readCollections.BlockToReplication[block] {
+			rs.KeepBlocksNotInCollections[block] = struct{}{}
+		}
+	}
+
+	return rs
+}

commit 6b5b6890158830b26161b3879a0d1eeaa122659f
Author: mishaz <misha at curoverse.com>
Date:   Mon Feb 23 21:41:19 2015 +0000

    Added flags to write network data and then read it back. This is useful to speed up development, but should not be used in production since data will be stale.
    Unfortunately had to switch H,L fields in blockdigest from private to public, otherwise they would not be exported.

diff --git a/sdk/go/blockdigest/blockdigest.go b/sdk/go/blockdigest/blockdigest.go
index 9b818d3..ad2e365 100644
--- a/sdk/go/blockdigest/blockdigest.go
+++ b/sdk/go/blockdigest/blockdigest.go
@@ -11,12 +11,12 @@ import (
 // Stores a Block Locator Digest compactly, up to 128 bits.
 // Can be used as a map key.
 type BlockDigest struct {
-	h uint64
-	l uint64
+	H uint64
+	L uint64
 }
 
 func (d BlockDigest) String() string {
-	return fmt.Sprintf("%016x%016x", d.h, d.l)
+	return fmt.Sprintf("%016x%016x", d.H, d.L)
 }
 
 // Will create a new BlockDigest unless an error is encountered.
@@ -27,11 +27,11 @@ func FromString(s string) (dig BlockDigest, err error) {
 	}
 
 	var d BlockDigest
-	d.h, err = strconv.ParseUint(s[:16], 16, 64)
+	d.H, err = strconv.ParseUint(s[:16], 16, 64)
 	if err != nil {
 		return
 	}
-	d.l, err = strconv.ParseUint(s[16:], 16, 64)
+	d.L, err = strconv.ParseUint(s[16:], 16, 64)
 	if err != nil {
 		return
 	}
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 67dbf08..9035f11 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -10,6 +10,7 @@ import (
 	"git.curoverse.com/arvados.git/services/datamanager/collection"
 	"git.curoverse.com/arvados.git/services/datamanager/keep"
 	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+	"git.curoverse.com/arvados.git/services/datamanager/summary"
 	"log"
 	"time"
 )
@@ -75,23 +76,32 @@ func singlerun() {
 		arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
 	}
 
-	collectionChannel := make(chan collection.ReadCollections)
+	var (
+		readCollections collection.ReadCollections
+		keepServerInfo keep.ReadServers
+	)
 
-	go func() {
-		collectionChannel <- collection.GetCollectionsAndSummarize(
-			collection.GetCollectionsParams{
+	if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) {
+		collectionChannel := make(chan collection.ReadCollections)
+
+		go func() {
+			collectionChannel <- collection.GetCollectionsAndSummarize(
+				collection.GetCollectionsParams{
+					Client: arv,
+					Logger: arvLogger,
+					BatchSize: 50})
+		}()
+
+		keepServerInfo = keep.GetKeepServersAndSummarize(
+			keep.GetKeepServersParams{
 				Client: arv,
 				Logger: arvLogger,
-				BatchSize: 50})
-	}()
+				Limit: 1000})
 
-	keepServerInfo := keep.GetKeepServersAndSummarize(
-		keep.GetKeepServersParams{
-			Client: arv,
-			Logger: arvLogger,
-			Limit: 1000})
+		readCollections = <-collectionChannel
+	}
 
-	readCollections := <-collectionChannel
+	summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
 
 	// TODO(misha): Use these together to verify replication.
 	_ = readCollections
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
new file mode 100644
index 0000000..8a381eb
--- /dev/null
+++ b/services/datamanager/summary/summary.go
@@ -0,0 +1,101 @@
+/* Computes Summary based on data read from API server. */
+
+package summary
+
+import (
+	"encoding/gob"
+	"flag"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/logger"
+	"git.curoverse.com/arvados.git/services/datamanager/collection"
+	"git.curoverse.com/arvados.git/services/datamanager/keep"
+	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+	"log"
+	"os"
+)
+
+var (
+	// These are just used for development, to save network i/o
+	writeDataTo  string
+	readDataFrom string
+)
+
+type serializedData struct {
+	ReadCollections collection.ReadCollections
+	KeepServerInfo  keep.ReadServers
+}
+
+func init() {
+	flag.StringVar(&writeDataTo,
+		"write-data-to",
+		"",
+		"Write summary of data received to this file. Used for development only.")
+	flag.StringVar(&readDataFrom,
+		"read-data-from",
+		"",
+		"Avoid network i/o and read summary data from this file instead. Used for development only.")
+}
+
+// Writes data we've read to a file.
+//
+// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeWriteData(arvLogger *logger.Logger,
+	readCollections collection.ReadCollections,
+	keepServerInfo keep.ReadServers) bool {
+	if writeDataTo == "" {
+		return false
+	} else {
+		summaryFile, err := os.Create(writeDataTo)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to open %s: %v", writeDataTo, err))
+		}
+		defer summaryFile.Close()
+
+		enc := gob.NewEncoder(summaryFile)
+		data := serializedData{
+			ReadCollections: readCollections,
+			KeepServerInfo:  keepServerInfo}
+		err = enc.Encode(data)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to write summary data: %v", err))
+		}
+		log.Printf("Wrote summary data to: %s", writeDataTo)
+		return true
+	}
+}
+
+// Reads data that we've read to a file.
+//
+// This is useful for development, so that we don't need to read all our data from the network every time we tweak something.
+//
+// This should not be used outside of development, since you'll be
+// working with stale data.
+func MaybeReadData(arvLogger *logger.Logger,
+	readCollections *collection.ReadCollections,
+	keepServerInfo *keep.ReadServers) bool {
+	if readDataFrom == "" {
+		return false
+	} else {
+		summaryFile, err := os.Open(readDataFrom)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to open %s: %v", readDataFrom, err))
+		}
+		defer summaryFile.Close()
+
+		dec := gob.NewDecoder(summaryFile)
+		data := serializedData{}
+		err = dec.Decode(&data)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to read summary data: %v", err))
+		}
+		log.Printf("Read summary data from: %s", readDataFrom)
+		return true
+	}
+}

commit 248d2ebb4ab7ea3d9060838bcfbfe3b9330da5ee
Author: mishaz <misha at curoverse.com>
Date:   Mon Feb 23 19:41:29 2015 +0000

    Added BlockToReplication field to collection.ReadCollections.
    Lots of other little cleanup.

diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index 9a7a838..0eca61c 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -19,12 +19,16 @@ import (
 )
 
 var (
-	heap_profile_filename string
+	heapProfileFilename string
 	// globals for debugging
 	totalManifestSize uint64
 	maxManifestSize   uint64
 )
 
+const (
+	DefaultReplicationLevel = 2
+)
+
 type Collection struct {
 	Uuid              string
 	OwnerUuid         string
@@ -37,6 +41,7 @@ type ReadCollections struct {
 	ReadAllCollections    bool
 	UuidToCollection      map[string]Collection
 	OwnerToCollectionSize map[string]int
+	BlockToReplication    map[blockdigest.BlockDigest]int
 }
 
 type GetCollectionsParams struct {
@@ -59,7 +64,7 @@ type SdkCollectionList struct {
 }
 
 func init() {
-	flag.StringVar(&heap_profile_filename,
+	flag.StringVar(&heapProfileFilename,
 		"heap-profile",
 		"",
 		"File to write the heap profiles to. Leave blank to skip profiling.")
@@ -72,9 +77,9 @@ func init() {
 // Otherwise we would see cumulative numbers as explained here:
 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
 func WriteHeapProfile() {
-	if heap_profile_filename != "" {
+	if heapProfileFilename != "" {
 
-		heap_profile, err := os.Create(heap_profile_filename)
+		heap_profile, err := os.Create(heapProfileFilename)
 		if err != nil {
 			log.Fatal(err)
 		}
@@ -90,7 +95,7 @@ func WriteHeapProfile() {
 
 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
 	results = GetCollections(params)
-	ComputeSizeOfOwnedCollections(&results)
+	Summarize(&results)
 
 	if params.Logger != nil {
 		params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
@@ -237,13 +242,18 @@ func ProcessCollections(arvLogger *logger.Logger,
 					"Arvados SDK collection returned with unexpected zero "+
 						"modifcation date. This probably means that either we failed to "+
 						"parse the modification date or the API server has changed how "+
-						"it returns modification dates: %v",
+						"it returns modification dates: %+v",
 					collection))
 		}
 
 		if sdkCollection.ModifiedAt.After(latestModificationDate) {
 			latestModificationDate = sdkCollection.ModifiedAt
 		}
+
+		if collection.ReplicationLevel == 0 {
+			collection.ReplicationLevel = DefaultReplicationLevel
+		}
+
 		manifest := manifest.Manifest{sdkCollection.ManifestText}
 		manifestSize := uint64(len(sdkCollection.ManifestText))
 
@@ -282,11 +292,20 @@ func ProcessCollections(arvLogger *logger.Logger,
 	return
 }
 
-func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+func Summarize(readCollections *ReadCollections) {
 	readCollections.OwnerToCollectionSize = make(map[string]int)
+	readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+
 	for _, coll := range readCollections.UuidToCollection {
 		readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
 			readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+
+		for block, _ := range coll.BlockDigestToSize {
+			storedReplication := readCollections.BlockToReplication[block]
+			if coll.ReplicationLevel > storedReplication {
+				readCollections.BlockToReplication[block] = coll.ReplicationLevel
+			}
+		}
 	}
 
 	return
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index a8e506e..67dbf08 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -64,7 +64,8 @@ func singlerun() {
 
 	var arvLogger *logger.Logger
 	if logEventTypePrefix != "" {
-		arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
+		arvLogger = logger.NewLogger(logger.LoggerParams{
+			Client: arv,
 			EventTypePrefix: logEventTypePrefix,
 			WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
 	}
@@ -79,11 +80,16 @@ func singlerun() {
 	go func() {
 		collectionChannel <- collection.GetCollectionsAndSummarize(
 			collection.GetCollectionsParams{
-				Client: arv, Logger: arvLogger, BatchSize: 50})
+				Client: arv,
+				Logger: arvLogger,
+				BatchSize: 50})
 	}()
 
 	keepServerInfo := keep.GetKeepServersAndSummarize(
-		keep.GetKeepServersParams{Client: arv, Logger: arvLogger, Limit: 1000})
+		keep.GetKeepServersParams{
+			Client: arv,
+			Logger: arvLogger,
+			Limit: 1000})
 
 	readCollections := <-collectionChannel
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list