[ARVADOS] updated: 316f588fdd86e4dfb0193a78f0a71859b77d1714

Git user git at public.curoverse.com
Thu Mar 24 09:52:09 EDT 2016


Summary of changes:
 services/keepstore/azure_blob_volume.go      | 134 ++++++++++++++++++++++++---
 services/keepstore/azure_blob_volume_test.go |  21 ++---
 services/keepstore/volume_generic_test.go    |  21 +++--
 services/keepstore/volume_unix.go            |   4 +-
 4 files changed, 145 insertions(+), 35 deletions(-)

       via  316f588fdd86e4dfb0193a78f0a71859b77d1714 (commit)
      from  241aac492beec3a7480b6e07e4e21d17474455e1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 316f588fdd86e4dfb0193a78f0a71859b77d1714
Author: radhika <radhika at curoverse.com>
Date:   Thu Mar 24 09:51:34 2016 -0400

    8556: add GetBlobMetada support in azure stub implementation.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index c393cfb..576f22e 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -10,6 +10,7 @@ import (
 	"log"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -133,6 +134,18 @@ func (v *AzureBlobVolume) Check() error {
 	return nil
 }
 
+// Return NotFoundError if trash marker is found on the block
+func (v *AzureBlobVolume) checkTrashed(loc string) error {
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return err
+	}
+	if metadata["expires_at"] != "" {
+		return v.translateError(NotFoundError)
+	}
+	return nil
+}
+
 // Get reads a Keep block that has been stored as a block blob in the
 // container.
 //
@@ -140,6 +153,9 @@ func (v *AzureBlobVolume) Check() error {
 // unexpectedly empty, assume a PutBlob operation is in progress, and
 // wait for it to finish writing.
 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+	if err := v.checkTrashed(loc); err != nil {
+		return nil, err
+	}
 	var deadline time.Time
 	haveDeadline := false
 	buf, err := v.get(loc)
@@ -244,6 +260,9 @@ func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
 
 // Compare the given data with existing stored data.
 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+	if err := v.checkTrashed(loc); err != nil {
+		return err
+	}
 	rdr, err := v.bsClient.GetBlob(v.containerName, loc)
 	if err != nil {
 		return v.translateError(err)
@@ -260,18 +279,40 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
 	return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
 }
 
+func (v *AzureBlobVolume) addToMetadata(loc, name, value string) error {
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return err
+	}
+	metadata[name] = value
+	return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata)
+}
+
+func (v *AzureBlobVolume) removeFromMetadata(loc, name string) error {
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return err
+	}
+	delete(metadata, name)
+	return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata)
+}
+
 // Touch updates the last-modified property of a block blob.
 func (v *AzureBlobVolume) Touch(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
 	}
-	return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-		"touch": fmt.Sprintf("%d", time.Now()),
-	})
+	if err := v.checkTrashed(loc); err != nil {
+		return err
+	}
+	return v.addToMetadata(loc, "touch", fmt.Sprintf("%d", time.Now()))
 }
 
 // Mtime returns the last-modified property of a block blob.
 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
+	if err := v.checkTrashed(loc); err != nil {
+		return time.Time{}, err
+	}
 	props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
 	if err != nil {
 		return time.Time{}, err
@@ -341,20 +382,34 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 		})
 	}
 	// Mark as trash
-	err = v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, 0, bytes.NewReader([]byte("")))
+	err = v.addToMetadata(loc, "expires_at", fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()))
 	if err != nil {
 		return err
 	}
-	err = v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
-		"expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
-	})
-	return v.bsClient.GetBlobMetadata(v.containerName, loc)
+	return v.bsClient.CreateBlockBlobFromReader(v.containerName,
+		fmt.Sprintf("trash.%d.%v", time.Now().Add(trashLifetime).Unix(), loc), 0, nil)
 }
 
 // Untrash a Keep block.
-// TBD
+// Delete the expires_at metadata attribute and trash marker
 func (v *AzureBlobVolume) Untrash(loc string) error {
-	return ErrNotImplemented
+	// if expires_at does not exist, return NotFoundError
+	metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+	if err != nil {
+		return err
+	}
+	if metadata["expires_at"] == "" {
+		return v.translateError(NotFoundError)
+	}
+	// reset expires_at metadata attribute
+	err = v.removeFromMetadata(loc, "expires_at")
+	if err != nil {
+		return err
+	}
+
+	// delete trash marker if exists
+	_, err = v.bsClient.DeleteBlobIfExists(v.containerName, fmt.Sprintf("trash.%v.%v", metadata["expires_at"], loc), map[string]string{})
+	return err
 }
 
 // Status returns a VolumeStatus struct with placeholder data.
@@ -403,8 +458,65 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 	return keepBlockRegexp.MatchString(s)
 }
 
+var azTrashLocRegexp = regexp.MustCompile(`trash\.(\d+)\.([0-9a-f]{32})$`)
+
 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
 // and deletes them from the volume.
-// TBD
 func (v *AzureBlobVolume) EmptyTrash() {
+	var bytesDeleted, bytesInTrash int64
+	var blocksDeleted, blocksInTrash int
+	var noMoreOldMarkers bool
+	params := storage.ListBlobsParameters{
+		Prefix: "trash.",
+	}
+	for {
+		resp, err := v.bsClient.ListBlobs(v.containerName, params)
+		if err != nil {
+			log.Printf("EmptyTrash: ListBlobs: %v", err)
+			break
+		}
+		for _, b := range resp.Blobs {
+			matches := azTrashLocRegexp.FindStringSubmatch(b.Name)
+			if len(matches) != 3 {
+				log.Printf("EmptyTrash: regexp mismatch for: %v", b.Name)
+				continue
+			}
+			blocksInTrash++
+			deadline, err := strconv.ParseInt(matches[1], 10, 64)
+			if err != nil {
+				log.Printf("EmptyTrash: %v: ParseInt(%v): %v", matches[1], err)
+				continue
+			}
+			if deadline > time.Now().Unix() {
+				noMoreOldMarkers = true
+				break
+			}
+
+			metadata, err := v.bsClient.GetBlobMetadata(v.containerName, matches[2])
+			if err != nil {
+				log.Printf("EmptyTrash: %v: GetBlobMetadata(%v): %v", matches[2], err)
+				continue
+			}
+
+			// Make sure the marker is for the current block, not an older one
+			if metadata["expires_at"] == matches[1] {
+				err = v.bsClient.DeleteBlob(v.containerName, matches[2], map[string]string{})
+				if err != nil {
+					log.Printf("EmptyTrash: %v: DeleteBlob(%v): %v", matches[2], err)
+				}
+				blocksDeleted++
+			}
+
+			err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{})
+			if err != nil {
+				log.Printf("EmptyTrash: %v: DeleteBlob(%v): %v", b.Name, err)
+			}
+		}
+		if resp.NextMarker == "" || noMoreOldMarkers == true {
+			break
+		}
+		params.Marker = resp.NextMarker
+	}
+
+	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index eb08d81..08166a8 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -4,7 +4,6 @@ import (
 	"bytes"
 	"crypto/md5"
 	"encoding/base64"
-	"encoding/gob"
 	"encoding/xml"
 	"flag"
 	"fmt"
@@ -72,10 +71,12 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
 func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
 	h.Lock()
 	defer h.Unlock()
+	metadata := make(map[string]string)
+	metadata["last_write_at"] = fmt.Sprintf("%d", time.Now().Unix())
 	h.blobs[container+"|"+hash] = &azBlob{
 		Data:        data,
 		Mtime:       time.Now(),
-		Metadata:    make(map[string]string),
+		Metadata:    metadata,
 		Uncommitted: make(map[string][]byte),
 	}
 }
@@ -138,6 +139,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			h.blobs[container+"|"+hash] = &azBlob{
 				Mtime:       time.Now(),
 				Uncommitted: make(map[string][]byte),
+				Metadata:    make(map[string]string),
 				Etag:        makeEtag(),
 			}
 			h.unlockAndRace()
@@ -146,6 +148,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			Data:        body,
 			Mtime:       time.Now(),
 			Uncommitted: make(map[string][]byte),
+			Metadata:    make(map[string]string),
 			Etag:        makeEtag(),
 		}
 		rw.WriteHeader(http.StatusCreated)
@@ -203,22 +206,16 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 		}
 		blob.Mtime = time.Now()
 		blob.Etag = makeEtag()
-	case r.Method == "GET" && r.Form.Get("comp") == "metadata" && hash != "":
+	case (r.Method == "GET" || r.Method == "HEAD") && r.Form.Get("comp") == "metadata" && hash != "":
 		// "Get Blob Metadata" API
 		if !blobExists {
 			rw.WriteHeader(http.StatusNotFound)
 			return
 		}
-		buf := new(bytes.Buffer)
-		encoder := gob.NewEncoder(buf)
-		err = encoder.Encode(blob.Metadata)
-		if err != nil {
-			log.Print(err)
-			rw.WriteHeader(http.StatusInternalServerError)
-			return
+		for k, v := range blob.Metadata {
+			rw.Header().Set(k, v)
 		}
-		rw.Write(buf.Bytes())
-		rw.Header().Set("Content-Length", strconv.Itoa(len(buf.Bytes())))
+		return
 	case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
 		// "Get Blob" API
 		if !blobExists {
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 5ac7290..2c1b266 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -120,7 +120,7 @@ func testCompareNonexistent(t TB, factory TestableVolumeFactory) {
 	defer v.Teardown()
 
 	err := v.Compare(TestHash, TestBlock)
-	if err != os.ErrNotExist {
+	if err != os.ErrNotExist && !strings.Contains(err.Error(), "Not Found") {
 		t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
 	}
 }
@@ -455,7 +455,8 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
 	if err := v.Trash(TestHash); err != nil {
 		t.Error(err)
 	}
-	if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
+	_, err := v.Get(TestHash)
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Errorf("os.IsNotExist(%v) should have been true", err)
 	}
 }
@@ -738,7 +739,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 		}
 	} else {
 		_, err = v.Get(TestHash)
-		if err == nil || !os.IsNotExist(err) {
+		if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 			t.Fatalf("os.IsNotExist(%v) should have been true", err)
 		}
 
@@ -799,7 +800,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}
 
 	err = checkGet()
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -819,7 +820,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	// Untrash should fail if the only block in the trash has
 	// already been untrashed.
 	err = v.Untrash(TestHash)
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -839,7 +840,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 	err = checkGet()
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -858,20 +859,20 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 	// goes away.
 	err = v.Trash(TestHash)
 	err = checkGet()
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 	v.EmptyTrash()
 
 	// Untrash won't find it
 	err = v.Untrash(TestHash)
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
 	// Get block won't find it
 	err = checkGet()
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
@@ -888,7 +889,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
 		t.Fatal(err)
 	}
 	err = checkGet()
-	if err == nil || !os.IsNotExist(err) {
+	if err == nil || (!os.IsNotExist(err) && !strings.Contains(err.Error(), "Not Found")) {
 		t.Fatalf("os.IsNotExist(%v) should have been true", err)
 	}
 
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 996068c..e02035d 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -540,7 +540,7 @@ func (v *UnixVolume) translateError(err error) error {
 	}
 }
 
-var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
@@ -556,7 +556,7 @@ func (v *UnixVolume) EmptyTrash() {
 		if info.Mode().IsDir() {
 			return nil
 		}
-		matches := trashLocRegexp.FindStringSubmatch(path)
+		matches := unixTrashLocRegexp.FindStringSubmatch(path)
 		if len(matches) != 3 {
 			return nil
 		}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list