[ARVADOS] created: 25be2534e8746475ddc799042fefa08bd0548e9d

Git user git at public.curoverse.com
Tue May 24 12:40:02 EDT 2016


        at  25be2534e8746475ddc799042fefa08bd0548e9d (commit)


commit 25be2534e8746475ddc799042fefa08bd0548e9d
Author: radhika <radhika at curoverse.com>
Date:   Tue May 24 12:39:28 2016 -0400

    8556: implement trash/untrash for azure volumes.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 5bd7f10..99da2a3 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -10,6 +10,7 @@ import (
 	"log"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -133,6 +134,18 @@ func (v *AzureBlobVolume) Check() error {
 	return nil
 }
 
+// Return true if expires_at metadata attribute is found on the block
+func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return false, metadata, v.translateError(err)
+	}
+	if metadata["expires_at"] != "" {
+		return true, metadata, nil
+	}
+	return false, metadata, nil
+}
+
 // Get reads a Keep block that has been stored as a block blob in the
 // container.
 //
@@ -140,6 +153,13 @@ func (v *AzureBlobVolume) Check() error {
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+	trashed, _, err := v.checkTrashed(loc)
+	if err != nil {
+		return 0, err
+	}
+	if trashed {
+		return 0, os.ErrNotExist
+	}
 	var deadline time.Time
 	haveDeadline := false
 	size, err := v.get(loc, buf)
@@ -241,6 +261,13 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
 
 // Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+	trashed, _, err := v.checkTrashed(loc)
+	if err != nil {
+		return err
+	}
+	if trashed {
+		return os.ErrNotExist
+	}
 	rdr, err := v.bsClient.GetBlob(v.containerName, loc)
 	if err != nil {
 		return v.translateError(err)
@@ -262,13 +289,28 @@ func (v *AzureBlobVolume) Touch(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-		"touch": fmt.Sprintf("%d", time.Now()),
-	}, nil)
+	trashed, metadata, err := v.checkTrashed(loc)
+	if err != nil {
+		return err
+	}
+	if trashed {
+		return os.ErrNotExist
+	}
+
+	metadata["touch"] = fmt.Sprintf("%d", time.Now())
+	return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+	trashed, _, err := v.checkTrashed(loc)
+	if err != nil {
+		return time.Time{}, err
+	}
+	if trashed {
+		return time.Time{}, os.ErrNotExist
+	}
+
 	props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
 	if err != nil {
 		return time.Time{}, err
@@ -280,7 +322,8 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
 // container.
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 	params := storage.ListBlobsParameters{
-		Prefix: prefix,
+		Prefix:  prefix,
+		Include: "metadata",
 	}
 	for {
 		resp, err := v.bsClient.ListBlobs(v.containerName, params)
@@ -303,6 +346,10 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 				// value.
 				continue
 			}
+			if b.Metadata["expires_at"] != "" {
+				// Trashed blob; exclude it from response
+				continue
+			}
 			fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
 		}
 		if resp.NextMarker == "" {
@@ -318,10 +365,6 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 		return MethodDisabledError
 	}
 
-	if trashLifetime != 0 {
-		return ErrNotImplemented
-	}
-
 	// Ideally we would use If-Unmodified-Since, but that
 	// particular condition seems to be ignored by Azure. Instead,
 	// we get the Etag before checking Mtime, and use If-Match to
@@ -336,15 +379,38 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 	} else if time.Since(t) < blobSignatureTTL {
 		return nil
 	}
-	return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+
+	// If trashLifetime == 0, just delete it
+	if trashLifetime == 0 {
+		return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+			"If-Match": props.Etag,
+		})
+	}
+
+	// Otherwise, mark as trash
+	return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+		"expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+	}, map[string]string{
 		"If-Match": props.Etag,
 	})
 }
 
 // Untrash a Keep block.
-// TBD
+// Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
-	return ErrNotImplemented
+	// if expires_at does not exist, return NotFoundError
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return v.translateError(err)
+	}
+	if metadata["expires_at"] == "" {
+		return os.ErrNotExist
+	}
+
+	// reset expires_at metadata attribute
+	metadata["expires_at"] = ""
+	err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+	return v.translateError(err)
 }
 
 // Status returns a VolumeStatus struct with placeholder data.
@@ -379,7 +445,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
 	switch {
 	case err == nil:
 		return err
-	case strings.Contains(err.Error(), "404 Not Found"):
+	case strings.Contains(err.Error(), "Not Found"):
 		// "storage: service returned without a response body (404 Not Found)"
 		return os.ErrNotExist
 	default:
@@ -395,6 +461,51 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *AzureBlobVolume) EmptyTrash() {
+	var bytesDeleted, bytesInTrash int64
+	var blocksDeleted, blocksInTrash int
+	params := storage.ListBlobsParameters{Include: "metadata"}
+
+	for {
+		resp, err := v.bsClient.ListBlobs(v.containerName, params)
+		if err != nil {
+			log.Printf("EmptyTrash: ListBlobs: %v", err)
+			break
+		}
+		for _, b := range resp.Blobs {
+			// Check if the block is expired
+			if b.Metadata["expires_at"] == "" {
+				continue
+			}
+
+			blocksInTrash++
+			bytesInTrash += b.Properties.ContentLength
+
+			expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
+			if err != nil {
+				log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+				continue
+			}
+
+			if expiresAt > time.Now().Unix() {
+				continue
+			}
+
+			err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+				"If-Match": b.Properties.Etag,
+			})
+			if err != nil {
+				log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+				continue
+			}
+			blocksDeleted++
+			bytesDeleted += b.Properties.ContentLength
+		}
+		if resp.NextMarker == "" {
+			break
+		}
+		params.Marker = resp.NextMarker
+	}
+
+	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index e3c0e27..5d556b3 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -74,6 +74,7 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
 	h.blobs[container+"|"+hash] = &azBlob{
 		Data:        data,
 		Mtime:       time.Now(),
+		Metadata:    make(map[string]string),
 		Uncommitted: make(map[string][]byte),
 	}
 }
@@ -136,14 +137,23 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			h.blobs[container+"|"+hash] = &azBlob{
 				Mtime:       time.Now(),
 				Uncommitted: make(map[string][]byte),
+				Metadata:    make(map[string]string),
 				Etag:        makeEtag(),
 			}
 			h.unlockAndRace()
 		}
+		metadata := make(map[string]string)
+		for k, v := range r.Header {
+			if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+				name := k[len("x-ms-meta-"):]
+				metadata[strings.ToLower(name)] = v[0]
+			}
+		}
 		h.blobs[container+"|"+hash] = &azBlob{
 			Data:        body,
 			Mtime:       time.Now(),
 			Uncommitted: make(map[string][]byte),
+			Metadata:    metadata,
 			Etag:        makeEtag(),
 		}
 		rw.WriteHeader(http.StatusCreated)
@@ -196,11 +206,22 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 		blob.Metadata = make(map[string]string)
 		for k, v := range r.Header {
 			if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
-				blob.Metadata[k] = v[0]
+				name := k[len("x-ms-meta-"):]
+				blob.Metadata[strings.ToLower(name)] = v[0]
 			}
 		}
 		blob.Mtime = time.Now()
 		blob.Etag = makeEtag()
+	case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
+		// "Get Blob Metadata" API
+		if !blobExists {
+			rw.WriteHeader(http.StatusNotFound)
+			return
+		}
+		for k, v := range blob.Metadata {
+			rw.Header().Set(fmt.Sprintf("x-ms-meta-%s", k), v)
+		}
+		return
 	case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
 		// "Get Blob" API
 		if !blobExists {
@@ -265,14 +286,20 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			}
 			if len(resp.Blobs) > 0 || marker == "" || marker == hash {
 				blob := h.blobs[container+"|"+hash]
-				resp.Blobs = append(resp.Blobs, storage.Blob{
+				bmeta := map[string]string(nil)
+				if r.Form.Get("include") == "metadata" {
+					bmeta = blob.Metadata
+				}
+				b := storage.Blob{
 					Name: hash,
 					Properties: storage.BlobProperties{
 						LastModified:  blob.Mtime.Format(time.RFC1123),
 						ContentLength: int64(len(blob.Data)),
 						Etag:          blob.Etag,
 					},
-				})
+					Metadata: bmeta,
+				}
+				resp.Blobs = append(resp.Blobs, b)
 			}
 		}
 		buf, err := xml.Marshal(resp)
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index d068b2a..80a7c89 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -10,6 +10,7 @@ import (
 	"net/http"
 	"os"
 	"regexp"
+	"strings"
 	"time"
 
 	"github.com/AdRoll/goamz/aws"
@@ -310,7 +311,8 @@ func (v *S3Volume) isKeepBlock(s string) bool {
 func (v *S3Volume) translateError(err error) error {
 	switch err := err.(type) {
 	case *s3.Error:
-		if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
+		if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
+			strings.Contains(err.Error(), "Not Found") {
 			return os.ErrNotExist
 		}
 		// Other 404 errors like NoSuchVersion and
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 105795c..f8fe0d0 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -453,6 +453,27 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 	if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
+
+	_, err := v.Mtime(TestHash)
+	if err == nil || !os.IsNotExist(err) {
+		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	}
+
+	err = v.Compare(TestHash, TestBlock)
+	if err == nil || !os.IsNotExist(err) {
+		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	}
+
+	indexBuf := new(bytes.Buffer)
+	v.IndexTo("", indexBuf)
+	if strings.Contains(string(indexBuf.Bytes()), TestHash) {
+		t.Fatalf("Found trashed block in IndexTo")
+	}
+
+	err = v.Touch(TestHash)
+	if err == nil || !os.IsNotExist(err) {
+		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	}
 }
 
 // Calling Delete() for a block that does not exist should result in error.
@@ -723,11 +744,11 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	err = v.Trash(TestHash)
 	if v.Writable() == false {
 		if err != MethodDisabledError {
-			t.Error(err)
+			t.Fatal(err)
 		}
 	} else if err != nil {
 		if err != ErrNotImplemented {
-			t.Error(err)
+			t.Fatal(err)
 		}
 	} else {
 		_, err = v.Get(TestHash, buf)
@@ -768,6 +789,23 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		if bytes.Compare(buf[:n], TestBlock) != 0 {
 			t.Fatalf("Got data %+q, expected %+q", buf[:n], TestBlock)
 		}
+
+		_, err = v.Mtime(TestHash)
+		if err != nil {
+			return err
+		}
+
+		err = v.Compare(TestHash, TestBlock)
+		if err != nil {
+			return err
+		}
+
+		indexBuf := new(bytes.Buffer)
+		v.IndexTo("", indexBuf)
+		if !strings.Contains(string(indexBuf.Bytes()), TestHash) {
+			return os.ErrNotExist
+		}
+
 		return nil
 	}
 
@@ -783,6 +821,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 
+	// Trash the block
 	err = v.Trash(TestHash)
 	if err == MethodDisabledError || err == ErrNotImplemented {
 		// Skip the trash tests for read-only volumes, and
@@ -795,6 +834,11 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
+	err = v.Touch(TestHash)
+	if err == nil || !os.IsNotExist(err) {
+		t.Fatalf("os.IsNotExist(%v) should have been true", err)
+	}
+
 	v.EmptyTrash()
 
 	// Even after emptying the trash, we can untrash our block
@@ -803,11 +847,20 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	if err != nil {
 		t.Fatal(err)
 	}
+
 	err = checkGet()
 	if err != nil {
 		t.Fatal(err)
 	}
 
+	err = v.Touch(TestHash)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Because we Touch'ed, need to backdate again for next set of tests
+	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
 	// Untrash should fail if the only block in the trash has
 	// already been untrashed.
 	err = v.Untrash(TestHash)
@@ -848,11 +901,14 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 
 	// Trash it again, and this time call EmptyTrash so it really
 	// goes away.
+	// (In Azure volumes, un/trash changes Mtime, so first backdate again)
+	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 	err = v.Trash(TestHash)
 	err = checkGet()
 	if err == nil || !os.IsNotExist(err) {
-		t.Errorf("os.IsNotExist(%v) should have been true", err)
+		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
+	// EmptryTrash
 	v.EmptyTrash()
 
 	// Untrash won't find it
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index edec048..7aff85e 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -538,7 +538,7 @@ func (v *UnixVolume) translateError(err error) error {
 	}
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -554,7 +554,7 @@ func (v *UnixVolume) EmptyTrash() {
 		if info.Mode().IsDir() {
 			return nil
 		}
-		matches := trashLocRegexp.FindStringSubmatch(path)
+		matches := unixTrashLocRegexp.FindStringSubmatch(path)
 		if len(matches) != 3 {
 			return nil
 		}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list