[ARVADOS] updated: 1.3.0-2584-ge9fed17eb

Git user git at public.arvados.org
Fri May 29 14:41:48 UTC 2020


Summary of changes:
 lib/undelete/cmd.go                | 123 +++++++++++++++++++++++++++----------
 lib/undelete/cmd_test.go           | 108 ++++++++++++++++++++++++++++++++
 sdk/go/arvados/blob_signature.go   |   3 +-
 sdk/go/arvados/keep_service.go     |  18 ++++++
 services/keepstore/handler_test.go |  51 +++++++++++++++
 services/keepstore/handlers.go     |  30 +++++++++
 services/keepstore/volume_test.go  |   9 ++-
 7 files changed, 307 insertions(+), 35 deletions(-)
 create mode 100644 lib/undelete/cmd_test.go

       via  e9fed17eb1a7300d879a74a344dd52b00fb77d6d (commit)
       via  33f8bec994716df827b07bbddb88e5944ff201c1 (commit)
       via  29437a08213a7f295b95d9a14226ab41d98c5148 (commit)
      from  0ad80a1594e583be9821feb8369e8dc4f619ab65 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e9fed17eb1a7300d879a74a344dd52b00fb77d6d
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Fri May 29 10:41:37 2020 -0400

    16427: Add test cases.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/undelete/cmd_test.go b/lib/undelete/cmd_test.go
new file mode 100644
index 000000000..1ea965995
--- /dev/null
+++ b/lib/undelete/cmd_test.go
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package undelete
+
+import (
+	"bytes"
+	"encoding/json"
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+
+	"git.arvados.org/arvados.git/lib/config"
+	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+	check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (*Suite) SetUpSuite(c *check.C) {
+	arvadostest.StartAPI()
+	arvadostest.StartKeep(2, true)
+}
+
+func (*Suite) TestUnrecoverableBlock(c *check.C) {
+	tmp := c.MkDir()
+	mfile := tmp + "/manifest"
+	ioutil.WriteFile(mfile, []byte(". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+410 0:410:Gone\n"), 0777)
+	var stdout, stderr bytes.Buffer
+	exitcode := Command.RunCommand("undelete.test", []string{"-log-level=debug", mfile}, &bytes.Buffer{}, &stdout, &stderr)
+	c.Check(exitcode, check.Equals, 1)
+	c.Check(stdout.String(), check.Equals, "")
+	c.Log(stderr.String())
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg="not found" block=aaaaa.*`)
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg="untrash failed" block=aaaaa.*`)
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg=unrecoverable block=aaaaa.*`)
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg="recovery failed".*`)
+}
+
+func (*Suite) TestUntrashAndTouchBlock(c *check.C) {
+	tmp := c.MkDir()
+	mfile := tmp + "/manifest"
+	ioutil.WriteFile(mfile, []byte(". dcd0348cb2532ee90c99f1b846efaee7+13 0:13:test.txt\n"), 0777)
+
+	logger := ctxlog.TestLogger(c)
+	loader := config.NewLoader(&bytes.Buffer{}, logger)
+	cfg, err := loader.Load()
+	c.Assert(err, check.IsNil)
+	cluster, err := cfg.GetCluster("")
+	c.Assert(err, check.IsNil)
+	var datadirs []string
+	for _, v := range cluster.Volumes {
+		var params struct {
+			Root string
+		}
+		err := json.Unmarshal(v.DriverParameters, &params)
+		c.Assert(err, check.IsNil)
+		if params.Root != "" {
+			datadirs = append(datadirs, params.Root)
+			err := os.Remove(params.Root + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+			if err != nil && !os.IsNotExist(err) {
+				c.Error(err)
+			}
+		}
+	}
+	c.Logf("keepstore datadirs are %q", datadirs)
+
+	for _, datadir := range datadirs {
+		trashfile := datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7.trash.999999999"
+		os.Mkdir(datadir+"/dcd", 0777)
+		err = ioutil.WriteFile(trashfile, []byte("undelete test"), 0777)
+		c.Assert(err, check.IsNil)
+		t := time.Now().Add(-time.Hour * 24 * 365)
+		err = os.Chtimes(trashfile, t, t)
+	}
+
+	var stdout, stderr bytes.Buffer
+	exitcode := Command.RunCommand("undelete.test", []string{"-log-level=debug", mfile}, &bytes.Buffer{}, &stdout, &stderr)
+	c.Check(exitcode, check.Equals, 0)
+	c.Check(stdout.String(), check.Matches, `zzzzz-4zz18-.{15}\n`)
+	c.Log(stderr.String())
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg=untrashed block=dcd0348.*`)
+	c.Check(stderr.String(), check.Matches, `(?ms).*msg="updated timestamp" block=dcd0348.*`)
+
+	found := false
+	for _, datadir := range datadirs {
+		buf, err := ioutil.ReadFile(datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+		if err == nil {
+			found = true
+			c.Check(buf, check.DeepEquals, []byte("undelete test"))
+			fi, err := os.Stat(datadir + "/dcd/dcd0348cb2532ee90c99f1b846efaee7")
+			if c.Check(err, check.IsNil) {
+				c.Logf("recovered block's modtime is %s", fi.ModTime())
+				c.Check(time.Now().Sub(fi.ModTime()) < time.Hour, check.Equals, true)
+			}
+		}
+	}
+	c.Check(found, check.Equals, true)
+}

commit 33f8bec994716df827b07bbddb88e5944ff201c1
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Fri May 29 10:36:13 2020 -0400

    16427: Touch block timestamps to avoid garbage collection race.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/lib/undelete/cmd.go b/lib/undelete/cmd.go
index 4c7bc5a85..291291587 100644
--- a/lib/undelete/cmd.go
+++ b/lib/undelete/cmd.go
@@ -12,6 +12,7 @@ import (
 	"io/ioutil"
 	"strings"
 	"sync"
+	"time"
 
 	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -92,13 +93,14 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 				exitcode = 1
 				continue
 			}
-			err = und.RecoverManifest(string(mtxt))
+			uuid, err := und.RecoverManifest(string(mtxt))
 			if err != nil {
 				logger.WithError(err).Error("recovery failed")
 				exitcode = 1
 				continue
 			}
-			logger.WithError(err).Info("recovery succeeded")
+			logger.WithField("UUID", uuid).Info("recovery succeeded")
+			fmt.Fprintln(stdout, uuid)
 		}
 	}
 	return exitcode
@@ -110,14 +112,73 @@ type undeleter struct {
 	logger  logrus.FieldLogger
 }
 
-func (und undeleter) RecoverManifest(mtxt string) error {
+// Return the timestamp of the newest copy of blk on svc. Second
+// return value is false if blk is not on svc at all, or an error
+// occurs.
+func (und undeleter) newestMtime(logger logrus.FieldLogger, blk string, svc arvados.KeepService) (time.Time, bool) {
+	found, err := svc.Index(und.client, blk)
+	if err != nil {
+		logger.WithError(err).Warn("error getting index")
+		return time.Time{}, false
+	}
+	if len(found) == 0 {
+		logger.Debug("not found")
+		return time.Time{}, false
+	}
+	var latest time.Time
+	for _, ent := range found {
+		t := time.Unix(0, ent.Mtime)
+		if t.After(latest) {
+			latest = t
+		}
+	}
+	logger.WithField("latest", latest).Debug("found")
+	return latest, true
+}
+
+// Ensure the given block exists on the given server and won't be
+// eligible for trashing until after our chosen deadline (blobsigexp).
+// Returns false if the block doesn't exist on the given server, has
+// an old timestamp and can't be updated, or any error occurred.
+// Reports errors via logger.
+//
+// After we decide a block is "safe" (whether or not we had to untrash
+// it), keep-balance might notice that it's currently unreferenced and
+// decide to trash it, all before our recovered collection gets
+// saved. But if the block's timestamp is more recent than blobsigttl,
+// keepstore will refuse to trash it even if told to by keep-balance.
+func (und undeleter) ensureSafe(ctx context.Context, logger logrus.FieldLogger, blk string, svc arvados.KeepService, blobsigttl time.Duration, blobsigexp time.Time) bool {
+	if latest, ok := und.newestMtime(logger, blk, svc); !ok {
+		return false
+	} else if latest.Add(blobsigttl).After(blobsigexp) {
+		return true
+	}
+	if err := svc.Touch(ctx, und.client, blk); err != nil {
+		logger.WithError(err).Warn("error updating timestamp")
+		return false
+	}
+	logger.Debug("updated timestamp")
+	if latest, ok := und.newestMtime(logger, blk, svc); !ok {
+		return false
+	} else if latest.Add(blobsigttl).After(blobsigexp) {
+		return true
+	} else {
+		logger.WithField("latest", latest).Error("BUG? touch return success, but newest reported timestamp is still too old")
+		return false
+	}
+}
+
+// Untrash and update GC timestamps (as needed) on blocks referenced
+// by the given manifest, save a new collection and return the new
+// collection's UUID.
+func (und undeleter) RecoverManifest(mtxt string) (string, error) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
 	coll := arvados.Collection{ManifestText: mtxt}
 	blks, err := coll.SizedDigests()
 	if err != nil {
-		return err
+		return "", err
 	}
 	todo := make(chan int, len(blks))
 	for idx := range blks {
@@ -135,10 +196,15 @@ func (und undeleter) RecoverManifest(mtxt string) error {
 		return nil
 	})
 	if err != nil {
-		return fmt.Errorf("error getting list of keep services: %s", err)
+		return "", fmt.Errorf("error getting list of keep services: %s", err)
 	}
 	und.logger.WithField("services", services).Debug("got list of services")
 
+	// Choose a deadline for saving a rescued collection.
+	blobsigttl := und.cluster.Collections.BlobSigningTTL.Duration()
+	blobsigexp := time.Now().Add(blobsigttl / 2)
+	und.logger.WithField("blobsigexp", blobsigexp).Debug("chose save deadline")
+
 	blkFound := make([]bool, len(blks))
 	var wg sync.WaitGroup
 	for i := 0; i < 2*len(services); i++ {
@@ -149,26 +215,20 @@ func (und undeleter) RecoverManifest(mtxt string) error {
 			for idx := range todo {
 				blk := strings.SplitN(string(blks[idx]), "+", 2)[0]
 				logger := und.logger.WithField("block", blk)
-				for _, svc := range services {
-					logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
-					if found, err := svc.Index(und.client, blk); err != nil {
-						logger.WithError(err).Warn("error getting index")
-					} else if len(found) > 0 {
-						blkFound[idx] = true
-						logger.Debug("found")
-						continue nextblk
-					} else {
-						logger.Debug("not found")
-					}
-				}
-				for _, svc := range services {
-					logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
-					if err := svc.Untrash(ctx, und.client, blk); err != nil {
-						logger.WithError(err).Debug("untrash failed")
-					} else {
-						blkFound[idx] = true
-						logger.Info("untrashed")
-						continue nextblk
+				for _, untrashing := range []bool{false, true} {
+					for _, svc := range services {
+						logger := logger.WithField("service", fmt.Sprintf("%s:%d", svc.ServiceHost, svc.ServicePort))
+						if untrashing {
+							if err := svc.Untrash(ctx, und.client, blk); err != nil {
+								logger.WithError(err).Debug("untrash failed")
+								continue
+							}
+							logger.Info("untrashed")
+						}
+						if und.ensureSafe(ctx, logger, blk, svc, blobsigttl, blobsigexp) {
+							blkFound[idx] = true
+							continue nextblk
+						}
 					}
 				}
 				logger.Debug("unrecoverable")
@@ -189,23 +249,22 @@ func (und undeleter) RecoverManifest(mtxt string) error {
 		if have > 0 {
 			und.logger.Warn("partial recovery is not implemented")
 		}
-		return fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
+		return "", fmt.Errorf("unable to recover %d of %d blocks", havenot, have+havenot)
 	}
 
 	if und.cluster.Collections.BlobSigning {
-		ttl := und.cluster.Collections.BlobSigningTTL.Duration()
 		key := []byte(und.cluster.Collections.BlobSigningKey)
-		coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, ttl, key)
+		coll.ManifestText = arvados.SignManifest(coll.ManifestText, und.client.AuthToken, blobsigexp, blobsigttl, key)
 	}
-	und.logger.Info(coll.ManifestText)
+	und.logger.WithField("manifest", coll.ManifestText).Debug("updated blob signatures in manifest")
 	err = und.client.RequestAndDecodeContext(ctx, &coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
 		"collection": map[string]interface{}{
 			"manifest_text": coll.ManifestText,
 		},
 	})
 	if err != nil {
-		return fmt.Errorf("error saving new collection: %s", err)
+		return "", fmt.Errorf("error saving new collection: %s", err)
 	}
-	und.logger.WithField("UUID", coll.UUID).Info("created new collection")
-	return nil
+	und.logger.WithField("UUID", coll.UUID).Debug("created new collection")
+	return coll.UUID, nil
 }
diff --git a/sdk/go/arvados/blob_signature.go b/sdk/go/arvados/blob_signature.go
index 4a936026f..132939547 100644
--- a/sdk/go/arvados/blob_signature.go
+++ b/sdk/go/arvados/blob_signature.go
@@ -53,8 +53,7 @@ var (
 
 // SignManifest signs all locators in the given manifest, discarding
 // any existing signatures.
-func SignManifest(manifest string, apiToken string, ttl time.Duration, permissionSecret []byte) string {
-	expiry := time.Now().Add(ttl)
+func SignManifest(manifest string, apiToken string, expiry time.Time, ttl time.Duration, permissionSecret []byte) string {
 	return regexp.MustCompile(`\S+`).ReplaceAllStringFunc(manifest, func(tok string) string {
 		if mBlkRe.MatchString(tok) {
 			return SignLocator(mPermHintRe.ReplaceAllString(tok, ""), apiToken, expiry, ttl, permissionSecret)
diff --git a/sdk/go/arvados/keep_service.go b/sdk/go/arvados/keep_service.go
index a2263d179..3af747920 100644
--- a/sdk/go/arvados/keep_service.go
+++ b/sdk/go/arvados/keep_service.go
@@ -104,6 +104,24 @@ func (s *KeepService) Mounts(c *Client) ([]KeepMount, error) {
 	return mounts, nil
 }
 
+// Touch updates the timestamp on the given block.
+func (s *KeepService) Touch(ctx context.Context, c *Client, blk string) error {
+	req, err := http.NewRequest("TOUCH", s.url(blk), nil)
+	if err != nil {
+		return err
+	}
+	resp, err := c.Do(req.WithContext(ctx))
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		body, _ := ioutil.ReadAll(resp.Body)
+		return fmt.Errorf("%s %s: %s", resp.Proto, resp.Status, body)
+	}
+	return nil
+}
+
 // Untrash moves/copies the given block out of trash.
 func (s *KeepService) Untrash(ctx context.Context, c *Client, blk string) error {
 	req, err := http.NewRequest("PUT", s.url("untrash/"+blk), nil)

commit 29437a08213a7f295b95d9a14226ab41d98c5148
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Thu May 28 09:40:44 2020 -0400

    16427: Add "touch" API to keepstore.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index f1a8c2925..17ed6402c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -318,6 +318,57 @@ func (s *HandlerSuite) TestPutAndDeleteSkipReadonlyVolumes(c *check.C) {
 	}
 }
 
+// Test TOUCH requests.
+func (s *HandlerSuite) TestTouchHandler(c *check.C) {
+	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+	vols := s.handler.volmgr.AllWritable()
+	vols[0].Put(context.Background(), TestHash, TestBlock)
+	vols[0].Volume.(*MockVolume).TouchWithDate(TestHash, time.Now().Add(-time.Hour))
+	afterPut := time.Now()
+	t, err := vols[0].Mtime(TestHash)
+	c.Assert(err, check.IsNil)
+	c.Assert(t.Before(afterPut), check.Equals, true)
+
+	ExpectStatusCode(c,
+		"touch with no credentials",
+		http.StatusUnauthorized,
+		IssueRequest(s.handler, &RequestTester{
+			method: "TOUCH",
+			uri:    "/" + TestHash,
+		}))
+
+	ExpectStatusCode(c,
+		"touch with non-root credentials",
+		http.StatusUnauthorized,
+		IssueRequest(s.handler, &RequestTester{
+			method:   "TOUCH",
+			uri:      "/" + TestHash,
+			apiToken: arvadostest.ActiveTokenV2,
+		}))
+
+	ExpectStatusCode(c,
+		"touch non-existent block",
+		http.StatusNotFound,
+		IssueRequest(s.handler, &RequestTester{
+			method:   "TOUCH",
+			uri:      "/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+			apiToken: s.cluster.SystemRootToken,
+		}))
+
+	beforeTouch := time.Now()
+	ExpectStatusCode(c,
+		"touch block",
+		http.StatusOK,
+		IssueRequest(s.handler, &RequestTester{
+			method:   "TOUCH",
+			uri:      "/" + TestHash,
+			apiToken: s.cluster.SystemRootToken,
+		}))
+	t, err = vols[0].Mtime(TestHash)
+	c.Assert(err, check.IsNil)
+	c.Assert(t.After(beforeTouch), check.Equals, true)
+}
+
 // Test /index requests:
 //   - unauthenticated /index request
 //   - unauthenticated /index/prefix request
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 3d0f893d8..eb0ea5ad2 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -66,6 +66,8 @@ func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *promethe
 	// List blocks stored here whose hash has the given prefix.
 	// Privileged client only.
 	rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
+	// Update timestamp on existing block. Privileged client only.
+	rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
 
 	// Internals/debugging info (runtime.MemStats)
 	rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
@@ -191,6 +193,34 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
 	}
 }
 
+func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
+	if !rtr.isSystemAuth(GetAPIToken(req)) {
+		http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+		return
+	}
+	hash := mux.Vars(req)["hash"]
+	vols := rtr.volmgr.AllWritable()
+	if len(vols) == 0 {
+		http.Error(resp, "no volumes", http.StatusNotFound)
+		return
+	}
+	var err error
+	for _, mnt := range vols {
+		err = mnt.Touch(hash)
+		if err == nil {
+			break
+		}
+	}
+	switch {
+	case err == nil:
+		return
+	case os.IsNotExist(err):
+		http.Error(resp, err.Error(), http.StatusNotFound)
+	default:
+		http.Error(resp, err.Error(), http.StatusInternalServerError)
+	}
+}
+
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 	ctx, cancel := contextForResponse(context.TODO(), resp)
 	defer cancel()
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index a928a71a2..2de21edde 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -178,13 +178,20 @@ func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
 }
 
 func (v *MockVolume) Touch(loc string) error {
+	return v.TouchWithDate(loc, time.Now())
+}
+
+func (v *MockVolume) TouchWithDate(loc string, t time.Time) error {
 	v.gotCall("Touch")
 	<-v.Gate
 	if v.volume.ReadOnly {
 		return MethodDisabledError
 	}
+	if _, exists := v.Store[loc]; !exists {
+		return os.ErrNotExist
+	}
 	if v.Touchable {
-		v.Timestamps[loc] = time.Now()
+		v.Timestamps[loc] = t
 		return nil
 	}
 	return errors.New("Touch failed")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list