[arvados] created: 2.5.0-40-g7fd14a123

git repository hosting git at public.arvados.org
Thu Jan 26 22:47:27 UTC 2023


        at  7fd14a123d0099c5dcfb0646125afa0b643bd6d2 (commit)


commit 7fd14a123d0099c5dcfb0646125afa0b643bd6d2
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jan 26 17:47:17 2023 -0500

    19923: Log keep-balance memory usage.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d8cd84b51..317121450 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -15,8 +15,10 @@ import (
 	"log"
 	"math"
 	"os"
+	"regexp"
 	"runtime"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -25,7 +27,6 @@ import (
 
 	"git.arvados.org/arvados.git/lib/controller/dblock"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
-	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	"github.com/jmoiron/sqlx"
 	"github.com/sirupsen/logrus"
@@ -73,7 +74,7 @@ type Balancer struct {
 func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
 	nextRunOptions = runOptions
 
-	ctxlog.FromContext(ctx).Info("acquiring active lock")
+	bal.logf("acquiring active lock")
 	if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
 		// context canceled
 		return
@@ -85,6 +86,8 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
 	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
 	defer cancel()
 
+	go bal.reportMemorySize(ctx)
+
 	var lbFile *os.File
 	if bal.LostBlocksFile != "" {
 		tmpfn := bal.LostBlocksFile + ".tmp"
@@ -153,6 +156,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
 		return
 	}
 	bal.ComputeChangeSets()
+	time.Sleep(time.Second)
 	bal.PrintStatistics()
 	if err = bal.CheckSanityLate(); err != nil {
 		return
@@ -1205,6 +1209,44 @@ func (bal *Balancer) time(name, help string) func() {
 	}
 }
 
+// Log current memory usage: once now, at least once every 10 minutes,
+// and when memory grows by 40% since the last log. Stop when ctx is
+// canceled.
+func (bal *Balancer) reportMemorySize(ctx context.Context) {
+	buf, _ := os.ReadFile("/proc/self/smaps")
+	m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+	var pagesize int64
+	if len(m) == 2 {
+		pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
+		pagesize <<= 10
+	}
+	if pagesize == 0 {
+		bal.logf("cannot report memory size: failed to parse KernelPageSize from /proc/self/smaps")
+		return
+	}
+
+	var nextTime time.Time
+	var nextMem int64
+	const maxInterval = time.Minute * 10
+	const maxIncrease = 1.4
+
+	ticker := time.NewTicker(time.Second)
+	defer ticker.Stop()
+	for ctx.Err() == nil {
+		now := time.Now()
+		buf, _ := os.ReadFile("/proc/self/statm")
+		fields := strings.Split(string(buf), " ")
+		mem, _ := strconv.ParseInt(fields[0], 10, 64)
+		mem *= pagesize
+		if now.After(nextTime) || mem >= nextMem {
+			bal.logf("process virtual memory size %d", mem)
+			nextMem = int64(float64(mem) * maxIncrease)
+			nextTime = now.Add(maxInterval)
+		}
+		<-ticker.C
+	}
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {

commit 2d81dae6704870be0d70e0f25278c4c92a6e7cee
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jan 26 11:14:28 2023 -0500

    19923: Log progress while reading keepstore indexes.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 88728d956..5b6d71a4f 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -12,7 +12,10 @@ import (
 	"net/http"
 	"strconv"
 	"strings"
+	"sync/atomic"
 	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
 // KeepService is an arvados#keepService record
@@ -164,10 +167,30 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
 	}
 	defer resp.Body.Close()
 
+	var progress int64
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+	go func() {
+		log := ctxlog.FromContext(ctx)
+		logticker := time.NewTicker(5 * time.Minute)
+		defer logticker.Stop()
+		for {
+			select {
+			case <-logticker.C:
+				log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
 	var entries []KeepServiceIndexEntry
 	scanner := bufio.NewScanner(resp.Body)
 	sawEOF := false
 	for scanner.Scan() {
+		if ctx.Err() != nil {
+			return nil, ctx.Err()
+		}
 		if scanner.Err() != nil {
 			// If we encounter a read error (timeout,
 			// connection failure), stop now and return it
@@ -206,6 +229,7 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
 			SizedDigest: SizedDigest(fields[0]),
 			Mtime:       mtime,
 		})
+		atomic.AddInt64(&progress, 1)
 	}
 	if err := scanner.Err(); err != nil {
 		return nil, fmt.Errorf("Error scanning index response: %v", err)

commit e07a249c5a6683ea4139c331927c27b8d7711f45
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jan 24 11:29:46 2023 -0500

    19923: keep-balance option to process a subset of blocks.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index eb7988422..88728d956 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -142,16 +142,16 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
 
 // IndexMount returns an unsorted list of blocks at the given mount point.
 func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+	return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
 }
 
 // Index returns an unsorted list of blocks that can be retrieved from
 // this server.
 func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
-	return s.index(ctx, c, s.url("index/"+prefix))
+	return s.index(ctx, c, prefix, s.url("index/"+prefix))
 }
 
-func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
 	req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
 	if err != nil {
 		return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
@@ -187,6 +187,9 @@ func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepS
 		if len(fields) != 2 {
 			return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
 		}
+		if !strings.HasPrefix(fields[0], prefix) {
+			return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+		}
 		mtime, err := strconv.ParseInt(fields[1], 10, 64)
 		if err != nil {
 			return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 9f581751d..d8cd84b51 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -46,6 +46,7 @@ type Balancer struct {
 	Dumper  logrus.FieldLogger
 	Metrics *metrics
 
+	ChunkPrefix    string
 	LostBlocksFile string
 
 	*BlockStateMap
@@ -403,7 +404,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
 		go func(mounts []*KeepMount) {
 			defer wg.Done()
 			bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
-			idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
+			idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
 			if err != nil {
 				select {
 				case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
@@ -495,6 +496,20 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 	if coll.ReplicationDesired != nil {
 		repl = *coll.ReplicationDesired
 	}
+	if bal.ChunkPrefix != "" {
+		// Throw out blocks that don't match the requested
+		// prefix.  (We save a bit of GC work here by
+		// preallocating based on each hex digit in
+		// ChunkPrefix reducing the expected size of the
+		// filtered set by ~16x.)
+		filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
+		for _, blkid := range blkids {
+			if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
+				filtered = append(filtered, blkid)
+			}
+		}
+		blkids = filtered
+	}
 	bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
 	// Pass pdh to IncreaseDesired only if LostBlocksFile is being
 	// written -- otherwise it's just a waste of memory.
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 4772da55a..5c9a648b7 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -256,26 +256,32 @@ func (s *stubServer) serveKeepstoreMounts() *reqTracker {
 }
 
 func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+	fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
+	barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
 	rt := &reqTracker{}
 	s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
 		count := rt.Add(r)
-		if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
-			io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+		if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
+			io.WriteString(w, barLine)
 		}
-		fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+		if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
+			io.WriteString(w, fooLine(count))
+		}
+		io.WriteString(w, "\n")
 	})
 	for _, mounts := range stubMounts {
 		for i, mnt := range mounts {
 			i := i
 			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
 				count := rt.Add(r)
-				if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
-					io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+				r.ParseForm()
+				if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
+					io.WriteString(w, barLine)
 				}
-				if i == 0 {
-					fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
+				if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
+					io.WriteString(w, fooLine(count))
 				}
-				fmt.Fprintf(w, "\n")
+				io.WriteString(w, "\n")
 			})
 		}
 	}
@@ -283,21 +289,44 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
 }
 
 func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
+	fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
 	rt := &reqTracker{}
 	s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
 		rt.Add(r)
-		io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
+		if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
+			io.WriteString(w, fooLine)
+		}
+		io.WriteString(w, "\n")
 	})
 	for _, mounts := range stubMounts {
 		for i, mnt := range mounts {
 			i := i
 			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
 				rt.Add(r)
-				if i == 0 {
-					io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
-				} else {
-					io.WriteString(w, "\n")
+				if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
+					io.WriteString(w, fooLine)
 				}
+				io.WriteString(w, "\n")
+			})
+		}
+	}
+	return rt
+}
+
+func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
+	fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
+	rt := &reqTracker{}
+	s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+		rt.Add(r)
+		io.WriteString(w, fooLine)
+		io.WriteString(w, "\n")
+	})
+	for _, mounts := range stubMounts {
+		for _, mnt := range mounts {
+			s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
+				rt.Add(r)
+				io.WriteString(w, fooLine)
+				io.WriteString(w, "\n")
 			})
 		}
 	}
@@ -379,6 +408,29 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
 	c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
+func (s *runSuite) TestRefuseBadIndex(c *check.C) {
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		ChunkPrefix: "abc",
+		Logger:      ctxlog.TestLogger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
+	s.stub.serveKeepstoreIndexIgnoringPrefix()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	srv := s.newServer(&opts)
+	bal, err := srv.runOnce(context.Background())
+	c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
+	c.Check(trashReqs.Count(), check.Equals, 4)
+	c.Check(pullReqs.Count(), check.Equals, 0)
+	c.Check(bal.stats.trashes, check.Equals, 0)
+	c.Check(bal.stats.pulls, check.Equals, 0)
+}
+
 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
 	opts := RunOptions{
 		CommitPulls: true,
@@ -518,6 +570,37 @@ func (s *runSuite) TestCommit(c *check.C) {
 	c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
 }
 
+func (s *runSuite) TestChunkPrefix(c *check.C) {
+	s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
+	opts := RunOptions{
+		CommitPulls: true,
+		CommitTrash: true,
+		ChunkPrefix: "ac", // catch "foo" but not "bar"
+		Logger:      ctxlog.TestLogger(c),
+		Dumper:      ctxlog.TestLogger(c),
+	}
+	s.stub.serveCurrentUserAdmin()
+	s.stub.serveFooBarFileCollections()
+	s.stub.serveKeepServices(stubServices)
+	s.stub.serveKeepstoreMounts()
+	s.stub.serveKeepstoreIndexFoo4Bar1()
+	trashReqs := s.stub.serveKeepstoreTrash()
+	pullReqs := s.stub.serveKeepstorePull()
+	srv := s.newServer(&opts)
+	bal, err := srv.runOnce(context.Background())
+	c.Check(err, check.IsNil)
+	c.Check(trashReqs.Count(), check.Equals, 8)
+	c.Check(pullReqs.Count(), check.Equals, 4)
+	// "foo" block is overreplicated by 2
+	c.Check(bal.stats.trashes, check.Equals, 2)
+	// "bar" block is underreplicated but does not match prefix
+	c.Check(bal.stats.pulls, check.Equals, 0)
+
+	lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
+	c.Assert(err, check.IsNil)
+	c.Check(string(lost), check.Equals, "")
+}
+
 func (s *runSuite) TestRunForever(c *check.C) {
 	s.config.ManagementToken = "xyzzy"
 	opts := RunOptions{
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index b016db22f..6bc998958 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -38,6 +38,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
 	flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
 		"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
+	flags.StringVar(&options.ChunkPrefix, "chunk-prefix", "",
+		"operate only on blocks with the given prefix (experimental, see https://dev.arvados.org/issues/19923)")
 	// These options are implemented by service.Command, so we
 	// don't need the vars here -- we just need the flags
 	// to pass flags.Parse().
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index fd53497f7..9bcaec43d 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -30,6 +30,7 @@ type RunOptions struct {
 	CommitPulls           bool
 	CommitTrash           bool
 	CommitConfirmedFields bool
+	ChunkPrefix           string
 	Logger                logrus.FieldLogger
 	Dumper                logrus.FieldLogger
 
@@ -86,6 +87,7 @@ func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
 		Dumper:         srv.Dumper,
 		Metrics:        srv.Metrics,
 		LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
+		ChunkPrefix:    srv.RunOptions.ChunkPrefix,
 	}
 	var err error
 	srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list