[arvados] created: 2.5.0-40-g7fd14a123
git repository hosting
git at public.arvados.org
Thu Jan 26 22:47:27 UTC 2023
at 7fd14a123d0099c5dcfb0646125afa0b643bd6d2 (commit)
commit 7fd14a123d0099c5dcfb0646125afa0b643bd6d2
Author: Tom Clegg <tom at curii.com>
Date: Thu Jan 26 17:47:17 2023 -0500
19923: Log keep-balance memory usage.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index d8cd84b51..317121450 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -15,8 +15,10 @@ import (
"log"
"math"
"os"
+ "regexp"
"runtime"
"sort"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -25,7 +27,6 @@ import (
"git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
@@ -73,7 +74,7 @@ type Balancer struct {
func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
- ctxlog.FromContext(ctx).Info("acquiring active lock")
+ bal.logf("acquiring active lock")
if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
// context canceled
return
@@ -85,6 +86,8 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
defer cancel()
+ go bal.reportMemorySize(ctx)
+
var lbFile *os.File
if bal.LostBlocksFile != "" {
tmpfn := bal.LostBlocksFile + ".tmp"
@@ -153,6 +156,7 @@ func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *a
return
}
bal.ComputeChangeSets()
+ time.Sleep(time.Second)
bal.PrintStatistics()
if err = bal.CheckSanityLate(); err != nil {
return
@@ -1205,6 +1209,44 @@ func (bal *Balancer) time(name, help string) func() {
}
}
+// Log current memory usage: once now, at least once every 10 minutes,
+// and when memory grows by 40% since the last log. Stop when ctx is
+// canceled.
+func (bal *Balancer) reportMemorySize(ctx context.Context) {
+ buf, _ := os.ReadFile("/proc/self/smaps")
+ m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+ var pagesize int64
+ if len(m) == 2 {
+ pagesize, _ = strconv.ParseInt(string(m[1]), 10, 64)
+ pagesize <<= 10
+ }
+ if pagesize == 0 {
+ bal.logf("cannot report memory size: failed to parse KernelPageSize from /proc/self/smaps")
+ return
+ }
+
+ var nextTime time.Time
+ var nextMem int64
+ const maxInterval = time.Minute * 10
+ const maxIncrease = 1.4
+
+ ticker := time.NewTicker(time.Second)
+ defer ticker.Stop()
+ for ctx.Err() == nil {
+ now := time.Now()
+ buf, _ := os.ReadFile("/proc/self/statm")
+ fields := strings.Split(string(buf), " ")
+ mem, _ := strconv.ParseInt(fields[0], 10, 64)
+ mem *= pagesize
+ if now.After(nextTime) || mem >= nextMem {
+ bal.logf("process virtual memory size %d", mem)
+ nextMem = int64(float64(mem) * maxIncrease)
+ nextTime = now.Add(maxInterval)
+ }
+ <-ticker.C
+ }
+}
+
// Rendezvous hash sort function. Less efficient than sorting on
// precomputed rendezvous hashes, but also rarely used.
func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
commit 2d81dae6704870be0d70e0f25278c4c92a6e7cee
Author: Tom Clegg <tom at curii.com>
Date: Thu Jan 26 11:14:28 2023 -0500
19923: Log progress while reading keepstore indexes.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index 88728d956..5b6d71a4f 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -12,7 +12,10 @@ import (
"net/http"
"strconv"
"strings"
+ "sync/atomic"
"time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
)
// KeepService is an arvados#keepService record
@@ -164,10 +167,30 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
}
defer resp.Body.Close()
+ var progress int64
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ go func() {
+ log := ctxlog.FromContext(ctx)
+ logticker := time.NewTicker(5 * time.Minute)
+ defer logticker.Stop()
+ for {
+ select {
+ case <-logticker.C:
+ log.Printf("index progress: received %d blocks from %s", atomic.LoadInt64(&progress), url)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
var entries []KeepServiceIndexEntry
scanner := bufio.NewScanner(resp.Body)
sawEOF := false
for scanner.Scan() {
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
if scanner.Err() != nil {
// If we encounter a read error (timeout,
// connection failure), stop now and return it
@@ -206,6 +229,7 @@ func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string)
SizedDigest: SizedDigest(fields[0]),
Mtime: mtime,
})
+ atomic.AddInt64(&progress, 1)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("Error scanning index response: %v", err)
commit e07a249c5a6683ea4139c331927c27b8d7711f45
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 24 11:29:46 2023 -0500
19923: keep-balance option to process a subset of blocks.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index eb7988422..88728d956 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -142,16 +142,16 @@ func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error
// IndexMount returns an unsorted list of blocks at the given mount point.
func (s *KeepService) IndexMount(ctx context.Context, c *Client, mountUUID string, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(ctx, c, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
+ return s.index(ctx, c, prefix, s.url("mounts/"+mountUUID+"/blocks?prefix="+prefix))
}
// Index returns an unsorted list of blocks that can be retrieved from
// this server.
func (s *KeepService) Index(ctx context.Context, c *Client, prefix string) ([]KeepServiceIndexEntry, error) {
- return s.index(ctx, c, s.url("index/"+prefix))
+ return s.index(ctx, c, prefix, s.url("index/"+prefix))
}
-func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepServiceIndexEntry, error) {
+func (s *KeepService) index(ctx context.Context, c *Client, prefix, url string) ([]KeepServiceIndexEntry, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("NewRequestWithContext(%v): %v", url, err)
@@ -187,6 +187,9 @@ func (s *KeepService) index(ctx context.Context, c *Client, url string) ([]KeepS
if len(fields) != 2 {
return nil, fmt.Errorf("Malformed index line %q: %d fields", line, len(fields))
}
+ if !strings.HasPrefix(fields[0], prefix) {
+ return nil, fmt.Errorf("Index response included block %q despite asking for prefix %q", fields[0], prefix)
+ }
mtime, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go
index 9f581751d..d8cd84b51 100644
--- a/services/keep-balance/balance.go
+++ b/services/keep-balance/balance.go
@@ -46,6 +46,7 @@ type Balancer struct {
Dumper logrus.FieldLogger
Metrics *metrics
+ ChunkPrefix string
LostBlocksFile string
*BlockStateMap
@@ -403,7 +404,7 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
go func(mounts []*KeepMount) {
defer wg.Done()
bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
- idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
+ idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, bal.ChunkPrefix)
if err != nil {
select {
case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
@@ -495,6 +496,20 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
if coll.ReplicationDesired != nil {
repl = *coll.ReplicationDesired
}
+ if bal.ChunkPrefix != "" {
+ // Throw out blocks that don't match the requested
+ // prefix. (We save a bit of GC work here by
+ // preallocating based on each hex digit in
+ // ChunkPrefix reducing the expected size of the
+ // filtered set by ~16x.)
+ filtered := make([]arvados.SizedDigest, 0, len(blkids)>>(4*len(bal.ChunkPrefix)-1))
+ for _, blkid := range blkids {
+ if strings.HasPrefix(string(blkid), bal.ChunkPrefix) {
+ filtered = append(filtered, blkid)
+ }
+ }
+ blkids = filtered
+ }
bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
// Pass pdh to IncreaseDesired only if LostBlocksFile is being
// written -- otherwise it's just a waste of memory.
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 4772da55a..5c9a648b7 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -256,26 +256,32 @@ func (s *stubServer) serveKeepstoreMounts() *reqTracker {
}
func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
+ fooLine := func(mt int) string { return fmt.Sprintf("acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+mt) }
+ barLine := "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n"
rt := &reqTracker{}
s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
count := rt.Add(r)
- if r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
- io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+ if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.URL.Path[7:]) {
+ io.WriteString(w, barLine)
}
- fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n\n", 12345678+count)
+ if strings.HasPrefix(fooLine(count), r.URL.Path[7:]) {
+ io.WriteString(w, fooLine(count))
+ }
+ io.WriteString(w, "\n")
})
for _, mounts := range stubMounts {
for i, mnt := range mounts {
i := i
s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
count := rt.Add(r)
- if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" {
- io.WriteString(w, "37b51d194a7513e45b56f6524f2d51f2+3 12345678\n")
+ r.ParseForm()
+ if i == 0 && r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(barLine, r.Form.Get("prefix")) {
+ io.WriteString(w, barLine)
}
- if i == 0 {
- fmt.Fprintf(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 %d\n", 12345678+count)
+ if i == 0 && strings.HasPrefix(fooLine(count), r.Form.Get("prefix")) {
+ io.WriteString(w, fooLine(count))
}
- fmt.Fprintf(w, "\n")
+ io.WriteString(w, "\n")
})
}
}
@@ -283,21 +289,44 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker {
}
func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker {
+ fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
rt := &reqTracker{}
s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
rt.Add(r)
- io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
+ if r.Host == "keep0.zzzzz.arvadosapi.com:25107" && strings.HasPrefix(fooLine, r.URL.Path[7:]) {
+ io.WriteString(w, fooLine)
+ }
+ io.WriteString(w, "\n")
})
for _, mounts := range stubMounts {
for i, mnt := range mounts {
i := i
s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
rt.Add(r)
- if i == 0 {
- io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n")
- } else {
- io.WriteString(w, "\n")
+ if i == 0 && strings.HasPrefix(fooLine, r.Form.Get("prefix")) {
+ io.WriteString(w, fooLine)
}
+ io.WriteString(w, "\n")
+ })
+ }
+ }
+ return rt
+}
+
+func (s *stubServer) serveKeepstoreIndexIgnoringPrefix() *reqTracker {
+ fooLine := "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n"
+ rt := &reqTracker{}
+ s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) {
+ rt.Add(r)
+ io.WriteString(w, fooLine)
+ io.WriteString(w, "\n")
+ })
+ for _, mounts := range stubMounts {
+ for _, mnt := range mounts {
+ s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) {
+ rt.Add(r)
+ io.WriteString(w, fooLine)
+ io.WriteString(w, "\n")
})
}
}
@@ -379,6 +408,29 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
c.Check(pullReqs.Count(), check.Equals, 0)
}
+func (s *runSuite) TestRefuseBadIndex(c *check.C) {
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ ChunkPrefix: "abc",
+ Logger: ctxlog.TestLogger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveKeepServices(stubServices)
+ s.stub.serveKeepstoreMounts()
+ s.stub.serveKeepstoreIndexIgnoringPrefix()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce(context.Background())
+ c.Check(err, check.ErrorMatches, ".*Index response included block .* despite asking for prefix \"abc\"")
+ c.Check(trashReqs.Count(), check.Equals, 4)
+ c.Check(pullReqs.Count(), check.Equals, 0)
+ c.Check(bal.stats.trashes, check.Equals, 0)
+ c.Check(bal.stats.pulls, check.Equals, 0)
+}
+
func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
opts := RunOptions{
CommitPulls: true,
@@ -518,6 +570,37 @@ func (s *runSuite) TestCommit(c *check.C) {
c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
+func (s *runSuite) TestChunkPrefix(c *check.C) {
+ s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
+ opts := RunOptions{
+ CommitPulls: true,
+ CommitTrash: true,
+ ChunkPrefix: "ac", // catch "foo" but not "bar"
+ Logger: ctxlog.TestLogger(c),
+ Dumper: ctxlog.TestLogger(c),
+ }
+ s.stub.serveCurrentUserAdmin()
+ s.stub.serveFooBarFileCollections()
+ s.stub.serveKeepServices(stubServices)
+ s.stub.serveKeepstoreMounts()
+ s.stub.serveKeepstoreIndexFoo4Bar1()
+ trashReqs := s.stub.serveKeepstoreTrash()
+ pullReqs := s.stub.serveKeepstorePull()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce(context.Background())
+ c.Check(err, check.IsNil)
+ c.Check(trashReqs.Count(), check.Equals, 8)
+ c.Check(pullReqs.Count(), check.Equals, 4)
+ // "foo" block is overreplicated by 2
+ c.Check(bal.stats.trashes, check.Equals, 2)
+ // "bar" block is underreplicated but does not match prefix
+ c.Check(bal.stats.pulls, check.Equals, 0)
+
+ lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
+ c.Assert(err, check.IsNil)
+ c.Check(string(lost), check.Equals, "")
+}
+
func (s *runSuite) TestRunForever(c *check.C) {
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go
index b016db22f..6bc998958 100644
--- a/services/keep-balance/main.go
+++ b/services/keep-balance/main.go
@@ -38,6 +38,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
+ flags.StringVar(&options.ChunkPrefix, "chunk-prefix", "",
+ "operate only on blocks with the given prefix (experimental, see https://dev.arvados.org/issues/19923)")
// These options are implemented by service.Command, so we
// don't need the vars here -- we just need the flags
// to pass flags.Parse().
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
index fd53497f7..9bcaec43d 100644
--- a/services/keep-balance/server.go
+++ b/services/keep-balance/server.go
@@ -30,6 +30,7 @@ type RunOptions struct {
CommitPulls bool
CommitTrash bool
CommitConfirmedFields bool
+ ChunkPrefix string
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
@@ -86,6 +87,7 @@ func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
Dumper: srv.Dumper,
Metrics: srv.Metrics,
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
+ ChunkPrefix: srv.RunOptions.ChunkPrefix,
}
var err error
srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list