[ARVADOS] updated: 1.1.3-223-g7b6c2fc

Git user git at public.curoverse.com
Tue Mar 20 13:11:56 EDT 2018


Summary of changes:
 services/keepstore/azure_blob_volume.go |  2 +-
 services/keepstore/volume.go            | 16 ++++++++++------
 2 files changed, 11 insertions(+), 7 deletions(-)

  discards  8fb5d2ef73aabd2a54aefe51dd0594ac84ce8b20 (commit)
  discards  0d7c42c706f2c744ee6268707d9861328209621f (commit)
       via  7b6c2fc3137cc59ca2ada663697bd33158447223 (commit)
       via  758fe7367b2af4d6a1483efa163c7d114b774856 (commit)
       via  bf42b1bd8d68e746ba6db0b09e3d18c82d24a946 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (8fb5d2ef73aabd2a54aefe51dd0594ac84ce8b20)
            \
             N -- N -- N (7b6c2fc3137cc59ca2ada663697bd33158447223)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7b6c2fc3137cc59ca2ada663697bd33158447223
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Mar 20 13:10:38 2018 -0400

    7931: Fix format/type mismatch.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index f18d82c..828a1f1 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -452,7 +452,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
 		return os.ErrNotExist
 	}
 
-	metadata["touch"] = fmt.Sprintf("%d", time.Now())
+	metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
 	return v.container.SetBlobMetadata(loc, metadata, nil)
 }
 

commit 758fe7367b2af4d6a1483efa163c7d114b774856
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Mar 19 13:19:41 2018 -0400

    7931: Trash excess replicas when a server has more than one.
    
    Previously a given server was classified only as "having" or "not
    having" a replica, regardless of how many it reported. With this
    change, once there are already enough distinct replicas on other
    servers, excess replicas will be trashed -- even if they are on the
    same server as a replica that is being retained.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 2d0324d..d87e2a2 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -405,14 +405,49 @@ var changeName = map[int]string{
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	debugf("balanceBlock: %v %+v", blkid, blk)
+
+	// A slot is somewhere a replica could potentially be trashed
+	// from, pulled from, or pulled to. Each KeepService gets
+	// either one empty slot, or one or more non-empty slots.
+	type slot struct {
+		srv  *KeepService // never nil
+		repl *Replica     // nil if none found
+	}
+
+	// First, we build an ordered list of all slots worth
+	// considering (including all slots where replicas have been
+	// found, as well as all of the optimal slots for this block).
+	// Then, when we consider each slot in that order, we will
+	// have all of the information we need to make a decision
+	// about that slot.
+
 	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
-	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
-	for _, repl := range blk.Replicas {
-		hasRepl[repl.KeepService.UUID] = repl
+	rendezvousOrder := make(map[*KeepService]int, len(uuids))
+	slots := make([]slot, len(uuids))
+	for i, uuid := range uuids {
+		srv := bal.KeepServices[uuid]
+		rendezvousOrder[srv] = i
+		slots[i].srv = srv
+	}
+	for ri := range blk.Replicas {
 		// TODO: when multiple copies are on one server, use
 		// the oldest one that doesn't have a timestamp
 		// collision with other replicas.
+		repl := &blk.Replicas[ri]
+		srv := repl.KeepService
+		slotIdx := rendezvousOrder[srv]
+		if slots[slotIdx].repl != nil {
+			// Additional replicas on a single server are
+			// considered non-optimal. Within this
+			// category, we don't try to optimize layout:
+			// we just say the optimal order is the order
+			// we encounter them.
+			slotIdx = len(slots)
+			slots = append(slots, slot{srv: srv})
+		}
+		slots[slotIdx].repl = repl
 	}
+
 	// number of replicas already found in positions better than
 	// the position we're contemplating now.
 	reportedBestRepl := 0
@@ -428,12 +463,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	// requested on rendezvous positions M<N will be successful.)
 	pulls := 0
 	var changes []string
-	for _, uuid := range uuids {
+	for _, slot := range slots {
 		change := changeNone
-		srv := bal.KeepServices[uuid]
+		srv, repl := slot.srv, slot.repl
 		// TODO: request a Touch if Mtime is duplicated.
-		repl, ok := hasRepl[srv.UUID]
-		if ok {
+		if repl != nil {
 			// This service has a replica. We should
 			// delete it if [1] we already have enough
 			// distinct replicas in better rendezvous
@@ -469,7 +503,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 			change = changePull
 		}
 		if bal.Dumper != nil {
-			changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+			var mtime int64
+			if repl != nil {
+				mtime = repl.Mtime
+			}
+			changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
 		}
 	}
 	if bal.Dumper != nil {
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 88a2e1f..167e874 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -150,6 +150,47 @@ func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
 		shouldTrash: slots{7}})
 }
 
+func (bal *balancerSuite) TestMultipleReplicasPerService(c *check.C) {
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 0},
+		shouldPull: slots{1}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{2, 2},
+		shouldPull: slots{0, 1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 0, 1},
+		shouldTrash: slots{0}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 0},
+		shouldTrash: slots{1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 0, 1, 0, 2},
+		shouldTrash: slots{0, 1, 2}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 1, 0, 2},
+		shouldTrash: slots{1, 1, 2}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 2},
+		shouldPull:  slots{0},
+		shouldTrash: slots{1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 0},
+		timestamps:  []int64{12345678, 12345678, 12345679},
+		shouldTrash: nil})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{1, 1},
+		shouldPull: slots{0}})
+}
+
 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
 	// For purposes of increasing replication, we assume identical
 	// replicas are distinct.
@@ -232,8 +273,8 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
 }
 
 // srvList returns the KeepServices, sorted in rendezvous order and
-// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
-// the first-, second-, and fifth-best servers for storing
+// then selected by idx. For example, srvList(3, slots{0, 1, 4})
+// returns the the first-, second-, and fifth-best servers for storing
 // bal.knownBlkid(3).
 func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
 	for _, i := range order {

commit bf42b1bd8d68e746ba6db0b09e3d18c82d24a946
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sun Mar 18 02:48:43 2018 -0400

    7931: keep-balance: use keepstore /mounts APIs instead of /index.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 7f9d869..9797440 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -22,6 +22,14 @@ type KeepService struct {
 	ReadOnly       bool   `json:"read_only"`
 }
 
+type KeepMount struct {
+	UUID           string   `json:"uuid"`
+	DeviceID       string   `json:"device_id"`
+	ReadOnly       bool     `json:"read_only"`
+	Replication    int      `json:"replication"`
+	StorageClasses []string `json:"storage_classes"`
+}
+
 // KeepServiceList is an arvados#keepServiceList record
 type KeepServiceList struct {
 	Items          []KeepService `json:"items"`
@@ -77,10 +85,32 @@ func (s *KeepService) String() string {
 	return s.UUID
 }
 
+func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
+	url := s.url("mounts")
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, err
+	}
+	var mounts []KeepMount
+	err = c.DoAndDecode(&mounts, req)
+	if err != nil {
+		return nil, fmt.Errorf("GET %v: %v", url, err)
+	}
+	return mounts, nil
+}
+
+// Index returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+}
+
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-	url := s.url("index/" + prefix)
+	return s.index(c, s.url("index/"+prefix))
+}
+
+func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
 	req, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
@@ -89,7 +119,7 @@ func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry,
 	if err != nil {
 		return nil, fmt.Errorf("Do(%v): %v", url, err)
 	} else if resp.StatusCode != 200 {
-		return nil, fmt.Errorf("%v: %v", url, resp.Status)
+		return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
 	}
 	defer resp.Body.Close()
 
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 0c4dd5b..2d0324d 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -82,6 +82,12 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
 	if err != nil {
 		return
 	}
+	for _, srv := range bal.KeepServices {
+		err = srv.discoverMounts(&config.Client)
+		if err != nil {
+			return
+		}
+	}
 
 	if err = bal.CheckSanityEarly(&config.Client); err != nil {
 		return
@@ -242,20 +248,24 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 		wg.Add(1)
 		go func(srv *KeepService) {
 			defer wg.Done()
-			bal.logf("%s: retrieve index", srv)
-			idx, err := srv.Index(c, "")
-			if err != nil {
-				errs <- fmt.Errorf("%s: %v", srv, err)
-				return
-			}
-			if len(errs) > 0 {
-				// Some other goroutine encountered an
-				// error -- any further effort here
-				// will be wasted.
-				return
+			bal.logf("%s: retrieve indexes", srv)
+			for _, mount := range srv.mounts {
+				bal.logf("%s: retrieve index", mount)
+				idx, err := srv.IndexMount(c, mount.UUID, "")
+				if err != nil {
+					errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
+					return
+				}
+				if len(errs) > 0 {
+					// Some other goroutine encountered an
+					// error -- any further effort here
+					// will be wasted.
+					return
+				}
+				bal.logf("%s: add %d replicas to map", mount, len(idx))
+				bal.BlockStateMap.AddReplicas(mount, idx)
+				bal.logf("%s: done", mount)
 			}
-			bal.logf("%s: add %d replicas to map", srv, len(idx))
-			bal.BlockStateMap.AddReplicas(srv, idx)
 			bal.logf("%s: done", srv)
 		}(srv)
 	}
@@ -398,7 +408,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
 	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
 	for _, repl := range blk.Replicas {
-		hasRepl[repl.UUID] = repl
+		hasRepl[repl.KeepService.UUID] = repl
 		// TODO: when multiple copies are on one server, use
 		// the oldest one that doesn't have a timestamp
 		// collision with other replicas.
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 2d6dd2b..08cfcce 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -5,7 +5,7 @@
 package main
 
 import (
-	_ "encoding/json"
+	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -41,6 +41,63 @@ func (rt *reqTracker) Add(req *http.Request) int {
 	return len(rt.reqs)
 }
 
+var stubServices = []arvados.KeepService{
+	{
+		UUID:           "zzzzz-bi6l4-000000000000000",
+		ServiceHost:    "keep0.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000001",
+		ServiceHost:    "keep1.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000002",
+		ServiceHost:    "keep2.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000003",
+		ServiceHost:    "keep3.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
+		ServiceHost:    "keep.zzzzz.arvadosapi.com",
+		ServicePort:    25333,
+		ServiceSSLFlag: true,
+		ServiceType:    "proxy",
+	},
+}
+
+var stubMounts = map[string][]arvados.KeepMount{
+	"keep0.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-000000000000000",
+		DeviceID: "keep0-vol0",
+	}},
+	"keep1.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-100000000000000",
+		DeviceID: "keep1-vol0",
+	}},
+	"keep2.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-200000000000000",
+		DeviceID: "keep2-vol0",
+	}},
+	"keep3.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-300000000000000",
+		DeviceID: "keep3-vol0",
+	}},
+}
+
 // stubServer is an HTTP transport that intercepts and processes all
 // requests using its own handlers.
 type stubServer struct {
@@ -156,17 +213,32 @@ func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
 }
 
 func (s *stubServer) serveZeroKeepServices() *reqTracker {
-	return s.serveStatic("/arvados/v1/keep_services",
-		`{"items":[],"items_available":0}`)
+	return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
 }
 
-func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
-	return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
-		{"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
+	return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
+		ItemsAvailable: len(svcs),
+		Items:          svcs,
+	})
+}
+
+func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		json.NewEncoder(w).Encode(resp)
+	})
+	return rt
+}
+
+func (s *stubServer) serveKeepstoreMounts() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		json.NewEncoder(w).Encode(stubMounts[r.Host])
+	})
+	return rt
 }
 
 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
@@ -178,6 +250,21 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
 		}
 		fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
 	})
+	for _, mounts := range stubMounts {
+		for i, mnt := range mounts {
+			i := i
+			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
+				count := rt.Add(r)
+				if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+					io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+				}
+				if i == 0 {
+					fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
+				}
+				fmt.Fprintf(w, "\n")
+			})
+		}
+	}
 	return rt
 }
 
@@ -238,7 +325,8 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveZeroCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -257,7 +345,8 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
 	s.config.KeepServiceTypes = []string{"unlisted-type"}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	_, err := (&Balancer{}).Run(s.config, opts)
@@ -274,7 +363,8 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	}
 	s.stub.serveCurrentUserNotAdmin()
 	s.stub.serveZeroCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	_, err := (&Balancer{}).Run(s.config, opts)
@@ -291,7 +381,8 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveCollectionsButSkipOne()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -309,7 +400,8 @@ func (s *runSuite) TestDryRun(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	collReqs := s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -336,7 +428,8 @@ func (s *runSuite) TestCommit(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -362,7 +455,8 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index edc8cf3..88a2e1f 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -76,6 +76,7 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
 				UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
 			},
 		}
+		srv.mounts = []*KeepMount{{KeepMount: arvados.KeepMount{UUID: fmt.Sprintf("mount-%015x", i)}, KeepService: srv}}
 		bal.srvs[i] = srv
 		bal.KeepServices[srv.UUID] = srv
 	}
@@ -246,7 +247,7 @@ func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepSe
 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
 	mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
 	for _, srv := range bal.srvList(knownBlockID, order) {
-		repls = append(repls, Replica{srv, mtime})
+		repls = append(repls, Replica{srv.mounts[0], mtime})
 		mtime++
 	}
 	return
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index d6dd0f3..958cdb5 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -14,7 +14,7 @@ import (
 // Azure storage container, etc.) as reported in a keepstore index
 // response.
 type Replica struct {
-	*KeepService
+	*KeepMount
 	Mtime int64
 }
 
@@ -73,16 +73,16 @@ func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
 	}
 }
 
-// AddReplicas updates the map to indicate srv has a replica of each
-// block in idx.
-func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+// AddReplicas updates the map to indicate that mnt has a replica of
+// each block in idx.
+func (bsm *BlockStateMap) AddReplicas(mnt *KeepMount, idx []arvados.KeepServiceIndexEntry) {
 	bsm.mutex.Lock()
 	defer bsm.mutex.Unlock()
 
 	for _, ent := range idx {
 		bsm.get(ent.SizedDigest).addReplica(Replica{
-			KeepService: srv,
-			Mtime:       ent.Mtime,
+			KeepMount: mnt,
+			Mtime:     ent.Mtime,
 		})
 	}
 }
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index 41f22ab..27d0af8 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -17,6 +17,7 @@ import (
 // KeepService represents a keepstore server that is being rebalanced.
 type KeepService struct {
 	arvados.KeepService
+	mounts []*KeepMount
 	*ChangeSet
 }
 
@@ -78,3 +79,28 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
 
 	return err
 }
+
+func (srv *KeepService) discoverMounts(c *arvados.Client) error {
+	mounts, err := srv.Mounts(c)
+	if err != nil {
+		return fmt.Errorf("%s: error retrieving mounts: %v", srv, err)
+	}
+	srv.mounts = nil
+	for _, m := range mounts {
+		srv.mounts = append(srv.mounts, &KeepMount{
+			KeepMount:   m,
+			KeepService: srv,
+		})
+	}
+	return nil
+}
+
+type KeepMount struct {
+	arvados.KeepMount
+	KeepService *KeepService
+}
+
+// String implements fmt.Stringer.
+func (mnt *KeepMount) String() string {
+	return fmt.Sprintf("%s (%s) on %s", mnt.UUID, mnt.DeviceID, mnt.KeepService)
+}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 1f8fba5..6bce05b 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -12,6 +12,8 @@ import (
 	"math/big"
 	"sync/atomic"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
 type BlockWriter interface {
@@ -287,12 +289,8 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-	UUID           string
-	DeviceID       string
-	ReadOnly       bool
-	Replication    int
-	StorageClasses []string
-	volume         Volume
+	arvados.KeepMount
+	volume Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"
@@ -334,12 +332,14 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
 			sc = []string{"default"}
 		}
 		mnt := &VolumeMount{
-			UUID:           (*VolumeMount)(nil).generateUUID(),
-			DeviceID:       v.DeviceID(),
-			ReadOnly:       !v.Writable(),
-			Replication:    v.Replication(),
-			StorageClasses: sc,
-			volume:         v,
+			KeepMount: arvados.KeepMount{
+				UUID:           (*VolumeMount)(nil).generateUUID(),
+				DeviceID:       v.DeviceID(),
+				ReadOnly:       !v.Writable(),
+				Replication:    v.Replication(),
+				StorageClasses: sc,
+			},
+			volume: v,
 		}
 		vm.iostats[v] = &ioStats{}
 		vm.mounts = append(vm.mounts, mnt)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list