[ARVADOS] created: 1.3.0-819-gb6a55c243

Git user git at public.curoverse.com
Wed Apr 24 19:23:32 UTC 2019


        at  b6a55c2430033ee1a17b2266b529b425e641d19f (commit)


commit b6a55c2430033ee1a17b2266b529b425e641d19f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Wed Apr 24 15:22:44 2019 -0400

    15112: Add config option to save list of lost blocks to a file.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/doc/install/install-keep-balance.html.textile.liquid b/doc/install/install-keep-balance.html.textile.liquid
index 68bf07a4a..4a35f448e 100644
--- a/doc/install/install-keep-balance.html.textile.liquid
+++ b/doc/install/install-keep-balance.html.textile.liquid
@@ -81,11 +81,11 @@ Client:
   AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
 KeepServiceTypes:
   - disk
-Listen: :9005
 ManagementToken: <span class="userinput">xyzzy</span>
 RunPeriod: 10m
 CollectionBatchSize: 100000
 CollectionBuffers: 1000
+LostBlocksFile: /tmp/keep-balance-lost-blocks.txt    # If given, this file will be updated atomically during each successful run.
 </code></pre>
 </notextile>
 
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index fd39ee693..bd3152fd8 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -8,12 +8,16 @@ import (
 	"bytes"
 	"crypto/md5"
 	"fmt"
+	"io"
+	"io/ioutil"
 	"log"
 	"math"
+	"os"
 	"runtime"
 	"sort"
 	"strings"
 	"sync"
+	"syscall"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -35,6 +39,8 @@ type Balancer struct {
 	Dumper  logrus.FieldLogger
 	Metrics *metrics
 
+	LostBlocksFile string
+
 	*BlockStateMap
 	KeepServices       map[string]*KeepService
 	DefaultReplication int
@@ -48,6 +54,7 @@ type Balancer struct {
 	errors        []error
 	stats         balancerStats
 	mutex         sync.Mutex
+	lostBlocks    io.Writer
 }
 
 // Run performs a balance operation using the given config and
@@ -64,6 +71,23 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
 
 	defer bal.time("sweep", "wall clock time to run one full sweep")()
 
+	var lbFile *os.File
+	if bal.LostBlocksFile != "" {
+		tmpfn := bal.LostBlocksFile + ".tmp"
+		lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
+		if err != nil {
+			return
+		}
+		err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+		if err != nil {
+			return
+		}
+		defer os.Remove(tmpfn)
+		bal.lostBlocks = lbFile
+	} else {
+		bal.lostBlocks = ioutil.Discard
+	}
+
 	if len(config.KeepServiceList.Items) > 0 {
 		err = bal.SetKeepServices(config.KeepServiceList)
 	} else {
@@ -107,6 +131,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
 	if err = bal.CheckSanityLate(); err != nil {
 		return
 	}
+	if lbFile != nil {
+		err = lbFile.Sync()
+		if err != nil {
+			return
+		}
+		err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
+		if err != nil {
+			return
+		}
+		err = lbFile.Close()
+		if err != nil {
+			return
+		}
+	}
 	if runOptions.CommitPulls {
 		err = bal.CommitPulls(&config.Client)
 		if err != nil {
@@ -882,6 +920,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
 			s.lost.replicas -= surplus
 			s.lost.blocks++
 			s.lost.bytes += bytes * int64(-surplus)
+			fmt.Fprintf(bal.lostBlocks, "%s\n", strings.SplitN(string(result.blkid), "+", 2)[0])
 		case surplus < 0:
 			s.underrep.replicas -= surplus
 			s.underrep.blocks++
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 3b7e2db9a..ee7aeb9c8 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -11,6 +11,7 @@ import (
 	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
+	"os"
 	"strings"
 	"sync"
 	"time"
@@ -270,6 +271,28 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
 	return rt
 }
 
+func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
+	})
+	for _, mounts := range stubMounts {
+		for i, mnt := range mounts {
+			i := i
+			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
+				rt.Add(r)
+				if i == 0 {
+					io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
+				} else {
+					io.WriteString(w, "\n")
+				}
+			})
+		}
+	}
+	return rt
+}
+
 func (s *stubServer) serveKeepstoreTrash() *reqTracker {
 	return s.serveStatic("/trash", `{}`)
 }
@@ -406,6 +429,32 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 	c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
+func (s *runSuite) TestWriteLostBlocks(c *check.C) {
+	lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
+	c.Assert(err, check.IsNil)
+	s.config.LostBlocksFile = lostf.Name()
+	defer os.Remove(lostf.Name())
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		Logger:      s.logger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
+	s.stub.serveKeepstoreIndexFoo1()
+	s.stub.serveKeepstoreTrash()
+	s.stub.serveKeepstorePull()
+	srv, err := NewServer(s.config, opts)
+	c.Assert(err, check.IsNil)
+	_, err = srv.Run()
+	c.Check(err, check.IsNil)
+	lost, err := ioutil.ReadFile(lostf.Name())
+	c.Assert(err, check.IsNil)
+	c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2\n")
+}
+
 func (s *runSuite) TestDryRun(c *check.C) {
 	opts := RunOptions{
 		CommitPulls: false,
@@ -435,6 +484,11 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
+	lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
+	c.Assert(err, check.IsNil)
+	s.config.LostBlocksFile = lostf.Name()
+	defer os.Remove(lostf.Name())
+
 	s.config.Listen = ":"
 	s.config.ManagementToken = "xyzzy"
 	opts := RunOptions{
@@ -462,6 +516,10 @@ func (s *runSuite) TestCommit(c *check.C) {
 	// in a poor rendezvous position
 	c.Check(bal.stats.pulls, check.Equals, 2)
 
+	lost, err := ioutil.ReadFile(lostf.Name())
+	c.Assert(err, check.IsNil)
+	c.Check(string(lost), check.Equals, "")
+
 	metrics := s.getMetrics(c, srv)
 	c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
 	c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index c867d7b10..894056c9f 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -57,6 +57,10 @@ type Config struct {
 
 	// Timeout for outgoing http request/response cycle.
 	RequestTimeout arvados.Duration
+
+	// Destination filename for the list of lost block hashes, one
+	// per line. Updated atomically during each successful run.
+	LostBlocksFile string
 }
 
 // RunOptions controls runtime behavior. The flags/options that belong
@@ -142,9 +146,10 @@ func (srv *Server) start() error {
 
 func (srv *Server) Run() (*Balancer, error) {
 	bal := &Balancer{
-		Logger:  srv.Logger,
-		Dumper:  srv.Dumper,
-		Metrics: srv.metrics,
+		Logger:         srv.Logger,
+		Dumper:         srv.Dumper,
+		Metrics:        srv.metrics,
+		LostBlocksFile: srv.config.LostBlocksFile,
 	}
 	var err error
 	srv.runOptions, err = bal.Run(srv.config, srv.runOptions)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list