[ARVADOS] created: 1.1.3-222-g8fb5d2e

Git user git at public.curoverse.com
Mon Mar 19 14:34:32 EDT 2018


        at  8fb5d2ef73aabd2a54aefe51dd0594ac84ce8b20 (commit)


commit 8fb5d2ef73aabd2a54aefe51dd0594ac84ce8b20
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Mon Mar 19 13:19:41 2018 -0400

    7931: Trash excess replicas when a server has more than one.
    
    Previously a given server was classified only as "having" or "not
    having" a replica, regardless of how many it reported. With this
    change, once there are already enough distinct replicas on other
    servers, excess replicas will be trashed -- even if they are on the
    same server as a replica that is being retained.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 2d0324d..d87e2a2 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -405,14 +405,49 @@ var changeName = map[int]string{
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	debugf("balanceBlock: %v %+v", blkid, blk)
+
+	// A slot is somewhere a replica could potentially be trashed
+	// from, pulled from, or pulled to. Each KeepService gets
+	// either one empty slot, or one or more non-empty slots.
+	type slot struct {
+		srv  *KeepService // never nil
+		repl *Replica     // nil if none found
+	}
+
+	// First, we build an ordered list of all slots worth
+	// considering (including all slots where replicas have been
+	// found, as well as all of the optimal slots for this block).
+	// Then, when we consider each slot in that order, we will
+	// have all of the information we need to make a decision
+	// about that slot.
+
 	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
-	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
-	for _, repl := range blk.Replicas {
-		hasRepl[repl.KeepService.UUID] = repl
+	rendezvousOrder := make(map[*KeepService]int, len(uuids))
+	slots := make([]slot, len(uuids))
+	for i, uuid := range uuids {
+		srv := bal.KeepServices[uuid]
+		rendezvousOrder[srv] = i
+		slots[i].srv = srv
+	}
+	for ri := range blk.Replicas {
 		// TODO: when multiple copies are on one server, use
 		// the oldest one that doesn't have a timestamp
 		// collision with other replicas.
+		repl := &blk.Replicas[ri]
+		srv := repl.KeepService
+		slotIdx := rendezvousOrder[srv]
+		if slots[slotIdx].repl != nil {
+			// Additional replicas on a single server are
+			// considered non-optimal. Within this
+			// category, we don't try to optimize layout:
+			// we just say the optimal order is the order
+			// we encounter them.
+			slotIdx = len(slots)
+			slots = append(slots, slot{srv: srv})
+		}
+		slots[slotIdx].repl = repl
 	}
+
 	// number of replicas already found in positions better than
 	// the position we're contemplating now.
 	reportedBestRepl := 0
@@ -428,12 +463,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	// requested on rendezvous positions M<N will be successful.)
 	pulls := 0
 	var changes []string
-	for _, uuid := range uuids {
+	for _, slot := range slots {
 		change := changeNone
-		srv := bal.KeepServices[uuid]
+		srv, repl := slot.srv, slot.repl
 		// TODO: request a Touch if Mtime is duplicated.
-		repl, ok := hasRepl[srv.UUID]
-		if ok {
+		if repl != nil {
 			// This service has a replica. We should
 			// delete it if [1] we already have enough
 			// distinct replicas in better rendezvous
@@ -469,7 +503,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 			change = changePull
 		}
 		if bal.Dumper != nil {
-			changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+			var mtime int64
+			if repl != nil {
+				mtime = repl.Mtime
+			}
+			changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
 		}
 	}
 	if bal.Dumper != nil {
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index 88a2e1f..167e874 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -150,6 +150,47 @@ func (bal *balancerSuite) TestFixUnbalanced(c *check.C) {
 		shouldTrash: slots{7}})
 }
 
+func (bal *balancerSuite) TestMultipleReplicasPerService(c *check.C) {
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{0, 0},
+		shouldPull: slots{1}})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{2, 2},
+		shouldPull: slots{0, 1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{0, 0, 1},
+		shouldTrash: slots{0}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 0},
+		shouldTrash: slots{1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 0, 1, 0, 2},
+		shouldTrash: slots{0, 1, 2}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 1, 0, 2},
+		shouldTrash: slots{1, 1, 2}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 2},
+		shouldPull:  slots{0},
+		shouldTrash: slots{1}})
+	bal.try(c, tester{
+		desired:     2,
+		current:     slots{1, 1, 0},
+		timestamps:  []int64{12345678, 12345678, 12345679},
+		shouldTrash: nil})
+	bal.try(c, tester{
+		desired:    2,
+		current:    slots{1, 1},
+		shouldPull: slots{0}})
+}
+
 func (bal *balancerSuite) TestIncreaseReplTimestampCollision(c *check.C) {
 	// For purposes of increasing replication, we assume identical
 	// replicas are distinct.
@@ -232,8 +273,8 @@ func (bal *balancerSuite) try(c *check.C, t tester) {
 }
 
 // srvList returns the KeepServices, sorted in rendezvous order and
-// then selected by idx. For example, srvList(3, 0, 1, 4) returns the
-// the first-, second-, and fifth-best servers for storing
+// then selected by idx. For example, srvList(3, slots{0, 1, 4})
+// returns the the first-, second-, and fifth-best servers for storing
 // bal.knownBlkid(3).
 func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepService) {
 	for _, i := range order {

commit 0d7c42c706f2c744ee6268707d9861328209621f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sun Mar 18 02:48:43 2018 -0400

    7931: keep-balance: use keepstore /mounts APIs instead of /index.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 7f9d869..9797440 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -22,6 +22,14 @@ type KeepService struct {
 	ReadOnly       bool   `json:"read_only"`
 }
 
+type KeepMount struct {
+	UUID           string   `json:"uuid"`
+	DeviceID       string   `json:"device_id"`
+	ReadOnly       bool     `json:"read_only"`
+	Replication    int      `json:"replication"`
+	StorageClasses []string `json:"storage_classes"`
+}
+
 // KeepServiceList is an arvados#keepServiceList record
 type KeepServiceList struct {
 	Items          []KeepService `json:"items"`
@@ -77,10 +85,32 @@ func (s *KeepService) String() string {
 	return s.UUID
 }
 
+func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
+	url := s.url("mounts")
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, err
+	}
+	var mounts []KeepMount
+	err = c.DoAndDecode(&mounts, req)
+	if err != nil {
+		return nil, fmt.Errorf("GET %v: %v", url, err)
+	}
+	return mounts, nil
+}
+
+// Index returns an unsorted list of blocks at the given mount point.
+func (s *KeepService) IndexMount(c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
+	return s.index(c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+}
+
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
 func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-	url := s.url("index/" + prefix)
+	return s.index(c, s.url("index/"+prefix))
+}
+
+func (s *KeepService) index(c *Client, url string) ([]KeepServiceIndexEntry, error) {
 	req, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, fmt.Errorf("NewRequest(%v): %v", url, err)
@@ -89,7 +119,7 @@ func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry,
 	if err != nil {
 		return nil, fmt.Errorf("Do(%v): %v", url, err)
 	} else if resp.StatusCode != 200 {
-		return nil, fmt.Errorf("%v: %v", url, resp.Status)
+		return nil, fmt.Errorf("%v: %d %v", url, resp.StatusCode, resp.Status)
 	}
 	defer resp.Body.Close()
 
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 0c4dd5b..2d0324d 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -82,6 +82,12 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
 	if err != nil {
 		return
 	}
+	for _, srv := range bal.KeepServices {
+		err = srv.discoverMounts(&config.Client)
+		if err != nil {
+			return
+		}
+	}
 
 	if err = bal.CheckSanityEarly(&config.Client); err != nil {
 		return
@@ -242,20 +248,24 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 		wg.Add(1)
 		go func(srv *KeepService) {
 			defer wg.Done()
-			bal.logf("%s: retrieve index", srv)
-			idx, err := srv.Index(c, "")
-			if err != nil {
-				errs <- fmt.Errorf("%s: %v", srv, err)
-				return
-			}
-			if len(errs) > 0 {
-				// Some other goroutine encountered an
-				// error -- any further effort here
-				// will be wasted.
-				return
+			bal.logf("%s: retrieve indexes", srv)
+			for _, mount := range srv.mounts {
+				bal.logf("%s: retrieve index", mount)
+				idx, err := srv.IndexMount(c, mount.UUID, "")
+				if err != nil {
+					errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
+					return
+				}
+				if len(errs) > 0 {
+					// Some other goroutine encountered an
+					// error -- any further effort here
+					// will be wasted.
+					return
+				}
+				bal.logf("%s: add %d replicas to map", mount, len(idx))
+				bal.BlockStateMap.AddReplicas(mount, idx)
+				bal.logf("%s: done", mount)
 			}
-			bal.logf("%s: add %d replicas to map", srv, len(idx))
-			bal.BlockStateMap.AddReplicas(srv, idx)
 			bal.logf("%s: done", srv)
 		}(srv)
 	}
@@ -398,7 +408,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
 	uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
 	hasRepl := make(map[string]Replica, len(bal.serviceRoots))
 	for _, repl := range blk.Replicas {
-		hasRepl[repl.UUID] = repl
+		hasRepl[repl.KeepService.UUID] = repl
 		// TODO: when multiple copies are on one server, use
 		// the oldest one that doesn't have a timestamp
 		// collision with other replicas.
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 2d6dd2b..08cfcce 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -5,7 +5,7 @@
 package main
 
 import (
-	_ "encoding/json"
+	"encoding/json"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -41,6 +41,63 @@ func (rt *reqTracker) Add(req *http.Request) int {
 	return len(rt.reqs)
 }
 
+var stubServices = []arvados.KeepService{
+	{
+		UUID:           "zzzzz-bi6l4-000000000000000",
+		ServiceHost:    "keep0.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000001",
+		ServiceHost:    "keep1.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000002",
+		ServiceHost:    "keep2.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-000000000000003",
+		ServiceHost:    "keep3.zzzzz.arvadosapi.com",
+		ServicePort:    25107,
+		ServiceSSLFlag: false,
+		ServiceType:    "disk",
+	},
+	{
+		UUID:           "zzzzz-bi6l4-h0a0xwut9qa6g3a",
+		ServiceHost:    "keep.zzzzz.arvadosapi.com",
+		ServicePort:    25333,
+		ServiceSSLFlag: true,
+		ServiceType:    "proxy",
+	},
+}
+
+var stubMounts = map[string][]arvados.KeepMount{
+	"keep0.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-000000000000000",
+		DeviceID: "keep0-vol0",
+	}},
+	"keep1.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-100000000000000",
+		DeviceID: "keep1-vol0",
+	}},
+	"keep2.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-200000000000000",
+		DeviceID: "keep2-vol0",
+	}},
+	"keep3.zzzzz.arvadosapi.com:25107": {{
+		UUID:     "zzzzz-ivpuk-300000000000000",
+		DeviceID: "keep3-vol0",
+	}},
+}
+
 // stubServer is an HTTP transport that intercepts and processes all
 // requests using its own handlers.
 type stubServer struct {
@@ -156,17 +213,32 @@ func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
 }
 
 func (s *stubServer) serveZeroKeepServices() *reqTracker {
-	return s.serveStatic("/arvados/v1/keep_services",
-		`{"items":[],"items_available":0}`)
+	return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{})
 }
 
-func (s *stubServer) serveFourDiskKeepServices() *reqTracker {
-	return s.serveStatic("/arvados/v1/keep_services", `{"items_available":5,"items":[
-		{"uuid":"zzzzz-bi6l4-000000000000000","service_host":"keep0.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000001","service_host":"keep1.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000002","service_host":"keep2.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-000000000000003","service_host":"keep3.zzzzz.arvadosapi.com","service_port":25107,"service_ssl_flag":false,"service_type":"disk"},
-		{"uuid":"zzzzz-bi6l4-h0a0xwut9qa6g3a","service_host":"keep.zzzzz.arvadosapi.com","service_port":25333,"service_ssl_flag":true,"service_type":"proxy"}]}`)
+func (s *stubServer) serveKeepServices(svcs []arvados.KeepService) *reqTracker {
+	return s.serveJSON("/arvados/v1/keep_services", arvados.KeepServiceList{
+		ItemsAvailable: len(svcs),
+		Items:          svcs,
+	})
+}
+
+func (s *stubServer) serveJSON(path string, resp interface{}) *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		json.NewEncoder(w).Encode(resp)
+	})
+	return rt
+}
+
+func (s *stubServer) serveKeepstoreMounts() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/mounts", func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		json.NewEncoder(w).Encode(stubMounts[r.Host])
+	})
+	return rt
 }
 
 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
@@ -178,6 +250,21 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
 		}
 		fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
 	})
+	for _, mounts := range stubMounts {
+		for i, mnt := range mounts {
+			i := i
+			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
+				count := rt.Add(r)
+				if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
+					io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+				}
+				if i == 0 {
+					fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
+				}
+				fmt.Fprintf(w, "\n")
+			})
+		}
+	}
 	return rt
 }
 
@@ -238,7 +325,8 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveZeroCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -257,7 +345,8 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
 	s.config.KeepServiceTypes = []string{"unlisted-type"}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	_, err := (&Balancer{}).Run(s.config, opts)
@@ -274,7 +363,8 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	}
 	s.stub.serveCurrentUserNotAdmin()
 	s.stub.serveZeroCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
 	_, err := (&Balancer{}).Run(s.config, opts)
@@ -291,7 +381,8 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveCollectionsButSkipOne()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -309,7 +400,8 @@ func (s *runSuite) TestDryRun(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	collReqs := s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -336,7 +428,8 @@ func (s *runSuite) TestCommit(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
@@ -362,7 +455,8 @@ func (s *runSuite) TestRunForever(c *check.C) {
 	}
 	s.stub.serveCurrentUserAdmin()
 	s.stub.serveFooBarFileCollections()
-	s.stub.serveFourDiskKeepServices()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
 	s.stub.serveKeepstoreIndexFoo4Bar1()
 	trashReqs := s.stub.serveKeepstoreTrash()
 	pullReqs := s.stub.serveKeepstorePull()
diff --git a/services/keep-balance/balance_test.go b/services/keep-balance/balance_test.go
index edc8cf3..88a2e1f 100644
--- a/services/keep-balance/balance_test.go
+++ b/services/keep-balance/balance_test.go
@@ -76,6 +76,7 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
 				UUID: fmt.Sprintf("zzzzz-bi6l4-%015x", i),
 			},
 		}
+		srv.mounts = []*KeepMount{{KeepMount: arvados.KeepMount{UUID: fmt.Sprintf("mount-%015x", i)}, KeepService: srv}}
 		bal.srvs[i] = srv
 		bal.KeepServices[srv.UUID] = srv
 	}
@@ -246,7 +247,7 @@ func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepSe
 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
 	mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
 	for _, srv := range bal.srvList(knownBlockID, order) {
-		repls = append(repls, Replica{srv, mtime})
+		repls = append(repls, Replica{srv.mounts[0], mtime})
 		mtime++
 	}
 	return
diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go
index d6dd0f3..958cdb5 100644
--- a/services/keep-balance/block_state.go
+++ b/services/keep-balance/block_state.go
@@ -14,7 +14,7 @@ import (
 // Azure storage container, etc.) as reported in a keepstore index
 // response.
 type Replica struct {
-	*KeepService
+	*KeepMount
 	Mtime int64
 }
 
@@ -73,16 +73,16 @@ func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
 	}
 }
 
-// AddReplicas updates the map to indicate srv has a replica of each
-// block in idx.
-func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
+// AddReplicas updates the map to indicate that mnt has a replica of
+// each block in idx.
+func (bsm *BlockStateMap) AddReplicas(mnt *KeepMount, idx []arvados.KeepServiceIndexEntry) {
 	bsm.mutex.Lock()
 	defer bsm.mutex.Unlock()
 
 	for _, ent := range idx {
 		bsm.get(ent.SizedDigest).addReplica(Replica{
-			KeepService: srv,
-			Mtime:       ent.Mtime,
+			KeepMount: mnt,
+			Mtime:     ent.Mtime,
 		})
 	}
 }
diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go
index 41f22ab..27d0af8 100644
--- a/services/keep-balance/keep_service.go
+++ b/services/keep-balance/keep_service.go
@@ -17,6 +17,7 @@ import (
 // KeepService represents a keepstore server that is being rebalanced.
 type KeepService struct {
 	arvados.KeepService
+	mounts []*KeepMount
 	*ChangeSet
 }
 
@@ -78,3 +79,28 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
 
 	return err
 }
+
+func (srv *KeepService) discoverMounts(c *arvados.Client) error {
+	mounts, err := srv.Mounts(c)
+	if err != nil {
+		return fmt.Errorf("%s: error retrieving mounts: %v", srv, err)
+	}
+	srv.mounts = nil
+	for _, m := range mounts {
+		srv.mounts = append(srv.mounts, &KeepMount{
+			KeepMount:   m,
+			KeepService: srv,
+		})
+	}
+	return nil
+}
+
+type KeepMount struct {
+	arvados.KeepMount
+	KeepService *KeepService
+}
+
+// String implements fmt.Stringer.
+func (mnt *KeepMount) String() string {
+	return fmt.Sprintf("%s (%s) on %s", mnt.UUID, mnt.DeviceID, mnt.KeepService)
+}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 1f8fba5..940cd14 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -287,12 +287,8 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-	UUID           string
-	DeviceID       string
-	ReadOnly       bool
-	Replication    int
-	StorageClasses []string
-	volume         Volume
+	arvados.KeepMount
+	volume Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list