[ARVADOS] updated: 1.3.0-2584-ge9fed17eb
Git user
git at public.arvados.org
Fri May 29 14:41:48 UTC 2020
Summary of changes:
lib/undelete/cmd.go | 123 +++++++++++++++++++++++++++----------
lib/undelete/cmd_test.go | 108 ++++++++++++++++++++++++++++++++
sdk/go/arvados/blob_signature.go | 3 +-
sdk/go/arvados/keep_service.go | 18 ++++++
services/keepstore/handler_test.go | 51 +++++++++++++++
services/keepstore/handlers.go | 30 +++++++++
services/keepstore/volume_test.go | 9 ++-
7 files changed, 307 insertions(+), 35 deletions(-)
create mode 100644 lib/undelete/cmd_test.go
via e9fed17eb1a7300d879a74a344dd52b00fb77d6d (commit)
via 33f8bec994716df827b07bbddb88e5944ff201c1 (commit)
via 29437a08213a7f295b95d9a14226ab41d98c5148 (commit)
from 0ad80a1594e583be9821feb8369e8dc4f619ab65 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e9fed17eb1a7300d879a74a344dd52b00fb77d6d
Author: Tom Clegg <tom at tomclegg.ca>
Date: Fri May 29 10:41:37 2020 -0400
16427: Add test cases.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/undelete/cmd_test.go b/lib/undelete/cmd_test.go
new file mode 100644
index 000000000..1ea965995
--- /dev/null
+++ b/lib/undelete/cmd_test.go
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package undelete
+
+import (
+ "bytes"
+ "encoding/json"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) SetUpSuite(c *check.C) {
+ arvadostest.StartAPI()
+ arvadostest.StartKeep(2, true)
+}
+
+func (*Suite) TestUnrecoverableBlock(c *check.C) {
+ tmp := c.MkDir()
+ mfile := tmp + "/manifest"
+ ioutil.WriteFile(mfile, []byte(". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+410 0:410:Gone\n"), 0777)
+ var stdout, stderr bytes.Buffer
+ exitcode := Command.RunCommand("undelete.test", []string{"-log-level=debug", mfile}, &bytes.Buffer{}, &stdout, &stderr)
+ c.Check(exitcode, check.Equals, 1)
+ c.Check(stdout.String(), check.Equals, "")
+ c.Log(stderr.String())
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg="not found" block=aaaaa.*`)
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg="untrash failed" block=aaaaa.*`)
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg=unrecoverable block=aaaaa.*`)
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg="recovery failed".*`)
+}
+
+func (*Suite) TestUntrashAndTouchBlock(c *check.C) {
+ tmp := c.MkDir()
+ mfile := tmp + "/manifest"
+ ioutil.WriteFile(mfile, []byte(". dcd0348cb2532ee90c99f1b846efaee7+13 0:13:test.txt\n"), 0777)
+
+ logger := ctxlog.TestLogger(c)
+ loader := config.NewLoader(&bytes.Buffer{}, logger)
+ cfg, err := loader.Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ var datadirs []string
+ for _, v := range cluster.Volumes {
+ var params struct {
+ Root string
+ }
+ err := json.Unmarshal(v.DriverParameters, ¶ms)
+ c.Assert(err, check.IsNil)
+ if params.Root != "" {
+ datadirs = append(datadirs, params.Root)
+ err := os.Remove(params.Root + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+ if err != nil && !os.IsNotExist(err) {
+ c.Error(err)
+ }
+ }
+ }
+ c.Logf("keepstore datadirs are %q", datadirs)
+
+ for _, datadir := range datadirs {
+ trashfile := datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7.trash.999999999"
+ os.Mkdir(datadir+"/dcd", 0777)
+ err = ioutil.WriteFile(trashfile, []byte("undelete test"), 0777)
+ c.Assert(err, check.IsNil)
+ t := time.Now().Add(-time.Hour * 24 * 365)
+ err = os.Chtimes(trashfile, t, t)
+ }
+
+ var stdout, stderr bytes.Buffer
+ exitcode := Command.RunCommand("undelete.test", []string{"-log-level=debug", mfile}, &bytes.Buffer{}, &stdout, &stderr)
+ c.Check(exitcode, check.Equals, 0)
+ c.Check(stdout.String(), check.Matches, `zzzzz-4zz18-.{15}\n`)
+ c.Log(stderr.String())
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg=untrashed block=dcd0348.*`)
+ c.Check(stderr.String(), check.Matches, `(?ms).*msg="updated timestamp" block=dcd0348.*`)
+
+ found := false
+ for _, datadir := range datadirs {
+ buf, err := ioutil.ReadFile(datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+ if err == nil {
+ found = true
+ c.Check(buf, check.DeepEquals, []byte("undelete test"))
+ fi, err := os.Stat(datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+ if c.Check(err, check.IsNil) {
+ c.Logf("recovered block's modtime is %s", fi.ModTime())
+ c.Check(time.Now().Sub(fi.ModTime()) < time.Hour, check.Equals, true)
+ }
+ }
+ }
+ c.Check(found, check.Equals, true)
+}
commit 33f8bec994716df827b07bbddb88e5944ff201c1
Author: Tom Clegg <tom at tomclegg.ca>
Date: Fri May 29 10:36:13 2020 -0400
16427: Touch block timestamps to avoid garbage collection race.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/lib/undelete/cmd.go b/lib/undelete/cmd.go
index 4c7bc5a85..291291587 100644
--- a/lib/undelete/cmd.go
+++ b/lib/undelete/cmd.go
@@ -12,6 +12,7 @@ import (
"io/ioutil"
"strings"
"sync"
+ "time"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -92,13 +93,14 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
exitcode = 1
continue
}
- err = und.RecoverManifest(string(mtxt))
+ uuid, err := und.RecoverManifest(string(mtxt))
if err != nil {
logger.WithError(err).Error("recovery failed")
exitcode = 1
continue
}
- logger.WithError(err).Info("recovery succeeded")
+ logger.WithField("UUID", uuid).Info("recovery succeeded")
+ fmt.Fprintln(stdout, uuid)
}
}
return exitcode
@@ -110,14 +112,73 @@ type undeleter struct {
logger logrus.FieldLogger
}
-func (und undeleter) RecoverManifest(mtxt string) error {
+// Return the timestamp of the newest copy of blk on svc. Second
+// return value is false if blk is not on svc at all, or an error
+// occurs.
+func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, bool) {
+ found, err := svc.Index(und.client, blk)
+ if err != nil {
+ logger.WithError(err).Warn("error getting index")
+ return time.Time{}, false
+ }
+ if len(found) == 0 {
+ logger.Debug("not found")
+ return time.Time{}, false
+ }
+ var latest time.Time
+ for _, ent := range found {
+ t := time.Unix(0, ent.Mtime)
+ if t.After(latest) {
+ latest = t
+ }
+ }
+ logger.WithField("latest", latest).Debug("found")
+ return latest, true
+}
+
+// Ensure the given block exists on the given server and won't be
+// eligible for trashing until after our chosen deadline (blobsigexp).
+// Returns false if the block doesn't exist on the given server, has
+// an old timestamp and can't be updated, or any error occurred.
+// Reports errors via logger.
+//
+// After we decide a block is "safe" (whether or not we had to untrash
+// it), keep-balance might notice that it's currently unreferenced and
+// decide to trash it, all before our recovered collection gets
+// saved. But if the block's timestamp is more recent than blobsigttl,
+// keepstore will refuse to trash it even if told to by keep-balance.
+func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) bool {
+ if latest, ok := und.newestMtime(logger, blk, svc); !ok {
+ return false
+ } else if latest.Add(blobsigttl).After(blobsigexp) {
+ return true
+ }
+ if err := svc.Touch(ctx, und.client, blk); err != nil {
+ logger.WithError(err).Warn("error updating timestamp")
+ return false
+ }
+ logger.Debug("updated timestamp")
+ if latest, ok := und.newestMtime(logger, blk, svc); !ok {
+ return false
+ } else if latest.Add(blobsigttl).After(blobsigexp) {
+ return true
+ } else {
+ logger.WithField("latest", latest).Error("BUG? touch return success, but newest reported timestamp is still too old")
+ return false
+ }
+}
+
+// Untrash and update GC timestamps (as needed) on blocks referenced
+// by the given manifest, save a new collection and return the new
+// collection's UUID.
+func (und undeleter) RecoverManifest(mtxt string) (string, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
coll := arvados.Collection{ManifestText: mtxt}
blks, err := coll.SizedDigests()
if err != nil {
- return err
+ return "", err
}
todo := make(chan int, len(blks))
for idx := range blks {
@@ -135,10 +196,15 @@ func (und undeleter) RecoverManifest(mtxt string) error {
return nil
})
if err != nil {
- return fmt.Errorf("error getting list of keep services: %s", err)
+ return "", fmt.Errorf("error getting list of keep services: %s", err)
}
und.logger.WithField("services", services).Debug("got list of services")
+ // Choose a deadline for saving a rescued collection.
+ blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
+ blobsigexp := time.Now().Add(blobsigttl / 2)
+ und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
+
blkFound := make([]bool, len(blks))
var wg sync.WaitGroup
for i := 0; i < 2*len(services); i++ {
@@ -149,26 +215,20 @@ func (und undeleter) RecoverManifest(mtxt string) error {
for idx := range todo {
blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
logger := und.logger.WithField("block", blk)
- for _, svc := range services {
- logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
- if found, err := svc.Index(und.client, blk); err != nil {
- logger.WithError(err).Warn("error getting index")
- } else if len(found) > 0 {
- blkFound[idx] = true
- logger.Debug("found")
- continue nextblk
- } else {
- logger.Debug("not found")
- }
- }
- for _, svc := range services {
- logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
- if err := svc.Untrash(ctx, und.client, blk); err != nil {
- logger.WithError(err).Debug("untrash failed")
- } else {
- blkFound[idx] = true
- logger.Info("untrashed")
- continue nextblk
+ for _, untrashing := range []bool{false, true} {
+ for _, svc := range services {
+ logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
+ if untrashing {
+ if err := svc.Untrash(ctx, und.client, blk); err != nil {
+ logger.WithError(err).Debug("untrash failed")
+ continue
+ }
+ logger.Info("untrashed")
+ }
+ if und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp) {
+ blkFound[idx] = true
+ continue nextblk
+ }
}
}
logger.Debug("unrecoverable")
@@ -189,23 +249,22 @@ func (und undeleter) RecoverManifest(mtxt string) error {
if have > 0 {
und.logger.Warn("partial recovery is not implemented")
}
- return fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
+ return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
}
if und.cluster.Collections.BlobSigning {
- ttl := und.cluster.Collections.BlobSigningTTL.Duration()
key := []byte(und.cluster.Collections.BlobSigningKey)
- coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, ttl, key)
+ coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
}
- und.logger.Info(coll.ManifestText)
+ und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
"collection": map[string]interface{}{
"manifest_text": coll.ManifestText,
},
})
if err != nil {
- return fmt.Errorf("error saving new collection: %s", err)
+ return "", fmt.Errorf("error saving new collection: %s", err)
}
- und.logger.WithField("UUID", coll.UUID).Info("created new collection")
- return nil
+ und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
+ return coll.UUID, nil
}
diff --git a/sdk/go/arvados/blob_signature.go b/sdk/go/arvados/blob_signature.go
index 4a936026f..132939547 100644
--- a/sdk/go/arvados/blob_signature.go
+++ b/sdk/go/arvados/blob_signature.go
@@ -53,8 +53,7 @@ var (
// SignManifest signs all locators in the given manifest, discarding
// any existing signatures.
-func SignManifest(manifest string, apiToken string, ttl time.Duration, permissionSecret []byte) string {
- expiry := time.Now().Add(ttl)
+func SignManifest(manifest string, apiToken string, expiry time.Time, ttl time.Duration, permissionSecret []byte) string {
return regexp.MustCompile(`\S+`).ReplaceAllStringFunc(manifest, func(tok string) string {
if mBlkRe.MatchString(tok) {
return SignLocator(mPermHintRe.ReplaceAllString(tok, ""), apiToken, expiry, ttl, permissionSecret)
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index a2263d179..3af747920 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -104,6 +104,24 @@ func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
return mounts, nil
}
+// Touch updates the timestamp on the given block.
+func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
+ req, err := http.NewRequest("TOUCH", s.url(blk), nil)
+ if err != nil {
+ return err
+ }
+ resp, err := c.Do(req.WithContext(ctx))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := ioutil.ReadAll(resp.Body)
+ return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+ }
+ return nil
+}
+
// Untrash moves/copies the given block out of trash.
func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)
commit 29437a08213a7f295b95d9a14226ab41d98c5148
Author: Tom Clegg <tom at tomclegg.ca>
Date: Thu May 28 09:40:44 2020 -0400
16427: Add "touch" API to keepstore.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index f1a8c2925..17ed6402c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -318,6 +318,57 @@ func (s *HandlerSuite) TestPutAndDeleteSkipReadonlyVolumes(c *check.C) {
}
}
+// Test TOUCH requests.
+func (s *HandlerSuite) TestTouchHandler(c *check.C) {
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+ vols := s.handler.volmgr.AllWritable()
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[0].Volume.(*MockVolume).TouchWithDate(TestHash, time.Now().Add(-time.Hour))
+ afterPut := time.Now()
+ t, err := vols[0].Mtime(TestHash)
+ c.Assert(err, check.IsNil)
+ c.Assert(t.Before(afterPut), check.Equals, true)
+
+ ExpectStatusCode(c,
+ "touch with no credentials",
+ http.StatusUnauthorized,
+ IssueRequest(s.handler, &RequestTester{
+ method: "TOUCH",
+ uri: "/" + TestHash,
+ }))
+
+ ExpectStatusCode(c,
+ "touch with non-root credentials",
+ http.StatusUnauthorized,
+ IssueRequest(s.handler, &RequestTester{
+ method: "TOUCH",
+ uri: "/" + TestHash,
+ apiToken: arvadostest.ActiveTokenV2,
+ }))
+
+ ExpectStatusCode(c,
+ "touch non-existent block",
+ http.StatusNotFound,
+ IssueRequest(s.handler, &RequestTester{
+ method: "TOUCH",
+ uri: "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ apiToken: s.cluster.SystemRootToken,
+ }))
+
+ beforeTouch := time.Now()
+ ExpectStatusCode(c,
+ "touch block",
+ http.StatusOK,
+ IssueRequest(s.handler, &RequestTester{
+ method: "TOUCH",
+ uri: "/" + TestHash,
+ apiToken: s.cluster.SystemRootToken,
+ }))
+ t, err = vols[0].Mtime(TestHash)
+ c.Assert(err, check.IsNil)
+ c.Assert(t.After(beforeTouch), check.Equals, true)
+}
+
// Test /index requests:
// - unauthenticated /index request
// - unauthenticated /index/prefix request
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 3d0f893d8..eb0ea5ad2 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -66,6 +66,8 @@ func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *promethe
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+ // Update timestamp on existing block. Privileged client only.
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
// Internals/debugging info (runtime.MemStats)
rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
@@ -191,6 +193,34 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
}
}
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+ if !rtr.isSystemAuth(GetAPIToken(req)) {
+ http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+ return
+ }
+ hash := mux.Vars(req)["hash"]
+ vols := rtr.volmgr.AllWritable()
+ if len(vols) == 0 {
+ http.Error(resp, "no volumes", http.StatusNotFound)
+ return
+ }
+ var err error
+ for _, mnt := range vols {
+ err = mnt.Touch(hash)
+ if err == nil {
+ break
+ }
+ }
+ switch {
+ case err == nil:
+ return
+ case os.IsNotExist(err):
+ http.Error(resp, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ }
+}
+
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index a928a71a2..2de21edde 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -178,13 +178,20 @@ func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
}
func (v *MockVolume) Touch(loc string) error {
+ return v.TouchWithDate(loc, time.Now())
+}
+
+func (v *MockVolume) TouchWithDate(loc string, t time.Time) error {
v.gotCall("Touch")
<-v.Gate
if v.volume.ReadOnly {
return MethodDisabledError
}
+ if _, exists := v.Store[loc]; !exists {
+ return os.ErrNotExist
+ }
if v.Touchable {
- v.Timestamps[loc] = time.Now()
+ v.Timestamps[loc] = t
return nil
}
return errors.New("Touch failed")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list