[ARVADOS] updated: e9f437d9e590cc37ada8534401d254bd5e0a5e85
git at public.curoverse.com
git at public.curoverse.com
Tue Sep 29 14:10:42 EDT 2015
Summary of changes:
services/keepstore/azure_blob_volume.go | 104 +++++++---
services/keepstore/azure_blob_volume_test.go | 291 +++++++++++++++++++++++++--
services/keepstore/collision.go | 35 ++++
services/keepstore/handler_test.go | 16 ++
services/keepstore/handlers.go | 19 +-
services/keepstore/keepstore_test.go | 22 +-
services/keepstore/pull_worker.go | 2 +-
services/keepstore/volume.go | 5 +
services/keepstore/volume_generic_test.go | 40 ++--
services/keepstore/volume_test.go | 4 +
services/keepstore/volume_unix.go | 41 +---
11 files changed, 464 insertions(+), 115 deletions(-)
via e9f437d9e590cc37ada8534401d254bd5e0a5e85 (commit)
via 852eadc79b7103b3889eed53a851a1c26c4daeab (commit)
via 4eac79ebafa9b7979bbd295c2da85acbb3981bac (commit)
via b7f7878f8f0648ba5a53e24abb109ce9ad59bfc3 (commit)
via b2bcd45082d2df2b5a17645eb60473cc17c76e88 (commit)
from da74a60c2d276ed8612f138d73e73787f450ea2e (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e9f437d9e590cc37ada8534401d254bd5e0a5e85
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Sep 29 14:19:40 2015 -0400
7241: Use If-Match header to address Delete/Put races.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 2bf80bf..79123a9 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -96,11 +96,11 @@ type AzureBlobVolume struct {
func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
return &AzureBlobVolume{
- azClient: client,
- bsClient: client.GetBlobService(),
+ azClient: client,
+ bsClient: client.GetBlobService(),
containerName: containerName,
- readonly: readonly,
- replication: replication,
+ readonly: readonly,
+ replication: replication,
}
}
@@ -200,16 +200,26 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
}
func (v *AzureBlobVolume) Delete(loc string) error {
- // TODO: Use leases to handle races with Touch and Put.
if v.readonly {
return MethodDisabledError
}
+ // Ideally we would use If-Unmodified-Since, but that
+ // particular condition seems to be ignored by Azure. Instead,
+ // we get the Etag before checking Mtime, and use If-Match to
+ // ensure we don't delete data if Put() or Touch() happens
+ // between our calls to Mtime() and DeleteBlob().
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return err
+ }
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < blobSignatureTTL {
return nil
}
- return v.bsClient.DeleteBlob(v.containerName, loc)
+ return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ "If-Match": props.Etag,
+ })
}
func (v *AzureBlobVolume) Status() *VolumeStatus {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index ba4af03..66b0ea0 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -4,8 +4,10 @@ import (
"encoding/base64"
"encoding/xml"
"flag"
+ "fmt"
"io/ioutil"
"log"
+ "math/rand"
"net"
"net/http"
"net/http/httptest"
@@ -23,7 +25,7 @@ import (
const (
// The same fake credentials used by Microsoft's Azure emulator
emulatorAccountName = "devstoreaccount1"
- emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
+ emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
var azureTestContainer string
@@ -36,15 +38,17 @@ func init() {
"Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
}
-type azBlob struct{
+type azBlob struct {
Data []byte
+ Etag string
+ Metadata map[string]string
Mtime time.Time
Uncommitted map[string][]byte
}
type azStubHandler struct {
sync.Mutex
- blobs map[string]*azBlob
+ blobs map[string]*azBlob
}
func newAzStubHandler() *azStubHandler {
@@ -54,7 +58,7 @@ func newAzStubHandler() *azStubHandler {
}
func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
- if blob, ok := h.blobs[container + "|" + hash]; !ok {
+ if blob, ok := h.blobs[container+"|"+hash]; !ok {
return
} else {
blob.Mtime = t
@@ -64,9 +68,9 @@ func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
h.Lock()
defer h.Unlock()
- h.blobs[container + "|" + hash] = &azBlob{
- Data: data,
- Mtime: time.Now(),
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: data,
+ Mtime: time.Now(),
Uncommitted: make(map[string][]byte),
}
}
@@ -99,17 +103,20 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
Uncommitted []string
}
- blob, blobExists := h.blobs[container + "|" + hash]
+ blob, blobExists := h.blobs[container+"|"+hash]
switch {
case r.Method == "PUT" && r.Form.Get("comp") == "":
- rw.WriteHeader(http.StatusCreated)
- h.blobs[container + "|" + hash] = &azBlob{
- Data: body,
- Mtime: time.Now(),
+ // "Put Blob" API
+ h.blobs[container+"|"+hash] = &azBlob{
+ Data: body,
+ Mtime: time.Now(),
Uncommitted: make(map[string][]byte),
+ Etag: makeEtag(),
}
+ rw.WriteHeader(http.StatusCreated)
case r.Method == "PUT" && r.Form.Get("comp") == "block":
+ // "Put Block" API
if !blobExists {
log.Printf("Got block for nonexistent blob: %+v", r)
rw.WriteHeader(http.StatusBadRequest)
@@ -124,6 +131,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
blob.Uncommitted[string(blockID)] = body
rw.WriteHeader(http.StatusCreated)
case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+ // "Put Block List" API
bl := &blockListRequestBody{}
if err := xml.Unmarshal(body, bl); err != nil {
log.Printf("xml Unmarshal: %s", err)
@@ -138,10 +146,31 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
return
}
blob.Data = blob.Uncommitted[string(blockID)]
- log.Printf("body %+q, bl %+v, blockID %+q, data %+q", body, bl, blockID, blob.Data)
+ blob.Etag = makeEtag()
+ blob.Mtime = time.Now()
+ delete(blob.Uncommitted, string(blockID))
}
rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "metadata":
+ // "Set Metadata Headers" API. We don't bother
+ // stubbing "Get Metadata Headers": AzureBlobVolume
+ // sets metadata headers only as a way to bump Etag
+ // and Last-Modified.
+ if !blobExists {
+ log.Printf("Got metadata for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Metadata = make(map[string]string)
+ for k, v := range r.Header {
+ if strings.HasPrefix(strings.ToLower(k), "x-ms-meta-") {
+ blob.Metadata[k] = v[0]
+ }
+ }
+ blob.Mtime = time.Now()
+ blob.Etag = makeEtag()
case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+ // "Get Blob" API
if !blobExists {
rw.WriteHeader(http.StatusNotFound)
return
@@ -154,13 +183,15 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
}
case r.Method == "DELETE" && hash != "":
+ // "Delete Blob" API
if !blobExists {
rw.WriteHeader(http.StatusNotFound)
return
}
- delete(h.blobs, container + "|" + hash)
+ delete(h.blobs, container+"|"+hash)
rw.WriteHeader(http.StatusAccepted)
case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+ // "List Blobs" API
prefix := container + "|" + r.Form.Get("prefix")
marker := r.Form.Get("marker")
@@ -170,7 +201,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
}
resp := storage.BlobListResponse{
- Marker: marker,
+ Marker: marker,
NextMarker: "",
MaxResults: int64(maxResults),
}
@@ -187,12 +218,13 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
break
}
if len(resp.Blobs) > 0 || marker == "" || marker == hash {
- blob := h.blobs[container + "|" + hash]
+ blob := h.blobs[container+"|"+hash]
resp.Blobs = append(resp.Blobs, storage.Blob{
Name: hash,
Properties: storage.BlobProperties{
- LastModified: blob.Mtime.Format(time.RFC1123),
+ LastModified: blob.Mtime.Format(time.RFC1123),
ContentLength: int64(len(blob.Data)),
+ Etag: blob.Etag,
},
})
}
@@ -217,6 +249,7 @@ type azStubDialer struct {
}
var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
+
func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
if hp := localHostPortRe.FindString(address); hp != "" {
log.Println("azStubDialer: dial", hp, "instead of", address)
@@ -263,9 +296,9 @@ func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) Te
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
- azHandler: azHandler,
- azStub: azStub,
- t: t,
+ azHandler: azHandler,
+ azStub: azStub,
+ t: t,
}
}
@@ -314,3 +347,7 @@ func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Tim
func (v *TestableAzureBlobVolume) Teardown() {
v.azStub.Close()
}
+
+func makeEtag() string {
+ return fmt.Sprintf("0x%x", rand.Int63())
+}
commit 852eadc79b7103b3889eed53a851a1c26c4daeab
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Sep 28 14:41:24 2015 -0400
7241: Use new CreateBlockBlobFromReader and SetBlobMetadata APIs for Put and Touch.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index d45db9b..2bf80bf 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -1,6 +1,7 @@
package main
import (
+ "bytes"
"errors"
"flag"
"fmt"
@@ -11,7 +12,7 @@ import (
"strings"
"time"
- "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/curoverse/azure-sdk-for-go/storage"
)
var (
@@ -155,26 +156,16 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
if v.readonly {
return MethodDisabledError
}
- if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
- return err
- }
- // We use the same block ID, base64("0")=="MA==", for everything.
- if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil {
- return err
- }
- return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}})
+ return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
}
func (v *AzureBlobVolume) Touch(loc string) error {
if v.readonly {
return MethodDisabledError
}
- if exists, err := v.bsClient.BlobExists(v.containerName, loc); err != nil {
- return err
- } else if !exists {
- return os.ErrNotExist
- }
- return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusCommitted}})
+ return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
+ "touch": fmt.Sprintf("%d", time.Now()),
+ })
}
func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index dc1a7e4..ba4af03 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -17,7 +17,7 @@ import (
"testing"
"time"
- "github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/curoverse/azure-sdk-for-go/storage"
)
const (
@@ -102,7 +102,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
blob, blobExists := h.blobs[container + "|" + hash]
switch {
- case r.Method == "PUT" && r.Form.Get("comp") == "" && r.Header.Get("Content-Length") == "0":
+ case r.Method == "PUT" && r.Form.Get("comp") == "":
rw.WriteHeader(http.StatusCreated)
h.blobs[container + "|" + hash] = &azBlob{
Data: body,
commit 4eac79ebafa9b7979bbd295c2da85acbb3981bac
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Sep 25 15:31:27 2015 -0400
7241: Do not bother trying to add a volume if credential args are missing/empty.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 8f93c18..d45db9b 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -40,6 +40,9 @@ func (s *azureVolumeAdder) Set(containerName string) error {
if containerName == "" {
return errors.New("no container name given")
}
+ if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
+ return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
+ }
accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
if err != nil {
return err
commit b7f7878f8f0648ba5a53e24abb109ce9ad59bfc3
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Sep 25 14:56:27 2015 -0400
7241: Add -azure-storage-replication flag.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 35b1dc7..8f93c18 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -17,6 +17,7 @@ import (
var (
azureStorageAccountName string
azureStorageAccountKeyFile string
+ azureStorageReplication int
)
func readKeyFromFile(file string) (string, error) {
@@ -50,7 +51,7 @@ func (s *azureVolumeAdder) Set(containerName string) error {
if flagSerializeIO {
log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
}
- v := NewAzureBlobVolume(azClient, containerName, flagReadonly)
+ v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
if err := v.Check(); err != nil {
return err
}
@@ -72,6 +73,11 @@ func init() {
"azure-storage-account-key-file",
"",
"File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ flag.IntVar(
+ &azureStorageReplication,
+ "azure-storage-replication",
+ 3,
+ "Replication level to report to clients when data is stored in an Azure container.")
}
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -81,14 +87,16 @@ type AzureBlobVolume struct {
bsClient storage.BlobStorageClient
containerName string
readonly bool
+ replication int
}
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool) *AzureBlobVolume {
+func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
return &AzureBlobVolume{
azClient: client,
bsClient: client.GetBlobService(),
containerName: containerName,
readonly: readonly,
+ replication: replication,
}
}
@@ -225,3 +233,7 @@ func (v *AzureBlobVolume) String() string {
func (v *AzureBlobVolume) Writable() bool {
return !v.readonly
}
+
+func (v *AzureBlobVolume) Replication() int {
+ return v.replication
+}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 619c013..dc1a7e4 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -232,7 +232,7 @@ type TestableAzureBlobVolume struct {
t *testing.T
}
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
azHandler := newAzStubHandler()
azStub := httptest.NewServer(azHandler)
@@ -259,7 +259,7 @@ func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
}
}
- v := NewAzureBlobVolume(azClient, container, readonly)
+ v := NewAzureBlobVolume(azClient, container, readonly, replication)
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
@@ -277,7 +277,7 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
Dial: (&azStubDialer{}).Dial,
}
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
- return NewTestableAzureBlobVolume(t, false)
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
})
}
@@ -289,10 +289,20 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
Dial: (&azStubDialer{}).Dial,
}
DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
- return NewTestableAzureBlobVolume(t, true)
+ return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
})
}
+func TestAzureBlobVolumeReplication(t *testing.T) {
+ for r := 1; r <= 4; r++ {
+ v := NewTestableAzureBlobVolume(t, false, r)
+ defer v.Teardown()
+ if n := v.Replication(); n != r {
+ t.Errorf("Got replication %d, expected %d", n, r)
+ }
+ }
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.containerName, locator, data)
}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index dd83666..38c06ac 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -924,3 +924,19 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
case <-ok:
}
}
+
+func TestPutReplicationHeader(t *testing.T) {
+ defer teardown()
+
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
+ resp := IssueRequest(&RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ })
+ if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
+ t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
+ }
+}
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2b96dbc..c44bfb0 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -120,7 +120,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- err = PutBlock(buf, hash)
+ replication, err := PutBlock(buf, hash)
bufs.Put(buf)
if err != nil {
@@ -137,6 +137,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
expiry := time.Now().Add(blobSignatureTTL)
returnHash = SignLocator(returnHash, apiToken, expiry)
}
+ resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
resp.Write([]byte(returnHash + "\n"))
}
@@ -517,40 +518,40 @@ func GetBlock(hash string) ([]byte, error) {
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) error {
+func PutBlock(block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
+ return 0, RequestHashError
}
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
- return err
+ return 0, err
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
if err := vol.Put(hash, block); err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
}
writables := KeepVM.AllWritable()
if len(writables) == 0 {
log.Print("No writable volumes.")
- return FullError
+ return 0, FullError
}
allFull := true
for _, vol := range writables {
err := vol.Put(hash, block)
if err == nil {
- return nil // success!
+ return vol.Replication(), nil // success!
}
if err != FullError {
// The volume is not full but the
@@ -563,10 +564,10 @@ func PutBlock(block []byte, hash string) error {
if allFull {
log.Print("All volumes are full.")
- return FullError
+ return 0, FullError
}
// Already logged the non-full errors.
- return GenericError
+ return 0, GenericError
}
// CompareAndTouch returns nil if one of the volumes already has the
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index daa9199..8682e23 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -122,8 +122,8 @@ func TestPutBlockOK(t *testing.T) {
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
@@ -152,8 +152,8 @@ func TestPutBlockOneVol(t *testing.T) {
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
result, err := GetBlock(TestHash)
@@ -180,7 +180,7 @@ func TestPutBlockMD5Fail(t *testing.T) {
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
t.Error("Expected RequestHashError, got %v", err)
}
@@ -205,8 +205,8 @@ func TestPutBlockCorrupt(t *testing.T) {
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, BadBlock)
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Errorf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
@@ -235,10 +235,10 @@ func TestPutBlockCollision(t *testing.T) {
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(b1, locator); err != nil {
t.Error(err)
}
- if err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
@@ -269,8 +269,8 @@ func TestPutBlockTouchFails(t *testing.T) {
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if err := PutBlock(TestBlock, TestHash); err != nil {
- t.Fatalf("PutBlock: %v", err)
+ if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 9f0b96f..2626d4b 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -95,6 +95,6 @@ func GenerateRandomAPIToken() string {
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- err = PutBlock(content, locator)
+ _, err = PutBlock(content, locator)
return
}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 9bf291b..7966c41 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -195,6 +195,11 @@ type Volume interface {
// will fail because it is full, but Mtime or Delete can
// succeed -- then Writable should return false.
Writable() bool
+
+ // Replication returns the storage redundancy of the
+ // underlying device. It will be passed on to clients in
+ // responses to PUT requests.
+ Replication() int
}
// A VolumeManager tells callers which volumes can read, which volumes
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index f272c84..d671436 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -214,3 +214,7 @@ func (v *MockVolume) String() string {
func (v *MockVolume) Writable() bool {
return !v.Readonly
}
+
+func (v *MockVolume) Replication() int {
+ return 1
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index f498c3c..98c31d1 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -467,6 +467,10 @@ func (v *UnixVolume) Writable() bool {
return !v.readonly
}
+func (v *UnixVolume) Replication() int {
+ return 1
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
commit b2bcd45082d2df2b5a17645eb60473cc17c76e88
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Sep 24 18:31:43 2015 -0400
7241: Stub Azure API calls
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 0d0e546..35b1dc7 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"log"
+ "os"
"strings"
"time"
@@ -18,6 +19,18 @@ var (
azureStorageAccountKeyFile string
)
+func readKeyFromFile(file string) (string, error) {
+ buf, err := ioutil.ReadFile(file)
+ if err != nil {
+ return "", errors.New("reading key from " + file + ": " + err.Error())
+ }
+ accountKey := strings.TrimSpace(string(buf))
+ if accountKey == "" {
+ return "", errors.New("empty account key in " + file)
+ }
+ return accountKey, nil
+}
+
type azureVolumeAdder struct {
*volumeSet
}
@@ -26,13 +39,9 @@ func (s *azureVolumeAdder) Set(containerName string) error {
if containerName == "" {
return errors.New("no container name given")
}
- buf, err := ioutil.ReadFile(azureStorageAccountKeyFile)
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
if err != nil {
- return errors.New("reading key from " + azureStorageAccountKeyFile + ": " + err.Error())
- }
- accountKey := strings.TrimSpace(string(buf))
- if accountKey == "" {
- return errors.New("empty account key in " + azureStorageAccountKeyFile)
+ return err
}
azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
if err != nil {
@@ -98,6 +107,16 @@ func (v *AzureBlobVolume) Check() error {
func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
rdr, err := v.bsClient.GetBlob(v.containerName, loc)
if err != nil {
+ if strings.Contains(err.Error(), "404 Not Found") {
+ // "storage: service returned without a response body (404 Not Found)"
+ return nil, os.ErrNotExist
+ }
+ return nil, err
+ }
+ switch err := err.(type) {
+ case nil:
+ default:
+ log.Printf("ERROR IN Get(): %T %#v", err, err)
return nil, err
}
defer rdr.Close()
@@ -112,11 +131,19 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
}
}
-func (v *AzureBlobVolume) Compare(loc string, data []byte) error {
- return NotFoundError
+func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ if err != nil {
+ return err
+ }
+ defer rdr.Close()
+ return compareReaderWithBuf(rdr, expect, loc[:32])
}
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
return err
}
@@ -128,6 +155,14 @@ func (v *AzureBlobVolume) Put(loc string, block []byte) error {
}
func (v *AzureBlobVolume) Touch(loc string) error {
+ if v.readonly {
+ return MethodDisabledError
+ }
+ if exists, err := v.bsClient.BlobExists(v.containerName, loc); err != nil {
+ return err
+ } else if !exists {
+ return os.ErrNotExist
+ }
return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusCommitted}})
}
@@ -153,7 +188,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
if err != nil {
return err
}
- fmt.Fprintf(writer, "%s+%d\n", b.Name, t.Unix())
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
}
if resp.NextMarker == "" {
return nil
@@ -164,6 +199,9 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
func (v *AzureBlobVolume) Delete(loc string) error {
// TODO: Use leases to handle races with Touch and Put.
+ if v.readonly {
+ return MethodDisabledError
+ }
if t, err := v.Mtime(loc); err != nil {
return err
} else if time.Since(t) < blobSignatureTTL {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 59021c0..619c013 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -1,12 +1,19 @@
package main
import (
+ "encoding/base64"
+ "encoding/xml"
+ "flag"
+ "io/ioutil"
"log"
"net"
"net/http"
"net/http/httptest"
"regexp"
+ "sort"
+ "strconv"
"strings"
+ "sync"
"testing"
"time"
@@ -19,9 +26,187 @@ const (
emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
)
-type azStubHandler struct {}
+var azureTestContainer string
-func (azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+func init() {
+ flag.StringVar(
+ &azureTestContainer,
+ "test.azure-storage-container-volume",
+ "",
+ "Name of Azure container to use for testing. Do not use a container with real data! Use -azure-storage-account-name and -azure-storage-key-file arguments to supply credentials.")
+}
+
+type azBlob struct{
+ Data []byte
+ Mtime time.Time
+ Uncommitted map[string][]byte
+}
+
+type azStubHandler struct {
+ sync.Mutex
+ blobs map[string]*azBlob
+}
+
+func newAzStubHandler() *azStubHandler {
+ return &azStubHandler{
+ blobs: make(map[string]*azBlob),
+ }
+}
+
+func (h *azStubHandler) TouchWithDate(container, hash string, t time.Time) {
+ if blob, ok := h.blobs[container + "|" + hash]; !ok {
+ return
+ } else {
+ blob.Mtime = t
+ }
+}
+
+func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
+ h.Lock()
+ defer h.Unlock()
+ h.blobs[container + "|" + hash] = &azBlob{
+ Data: data,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ }
+}
+
+func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
+ h.Lock()
+ defer h.Unlock()
+ // defer log.Printf("azStubHandler: %+v", r)
+
+ path := strings.Split(r.URL.Path, "/")
+ container := path[1]
+ hash := ""
+ if len(path) > 2 {
+ hash = path[2]
+ }
+
+ if err := r.ParseForm(); err != nil {
+ log.Printf("azStubHandler(%+v): %s", r, err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+
+ body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return
+ }
+
+ type blockListRequestBody struct {
+ XMLName xml.Name `xml:"BlockList"`
+ Uncommitted []string
+ }
+
+ blob, blobExists := h.blobs[container + "|" + hash]
+
+ switch {
+ case r.Method == "PUT" && r.Form.Get("comp") == "" && r.Header.Get("Content-Length") == "0":
+ rw.WriteHeader(http.StatusCreated)
+ h.blobs[container + "|" + hash] = &azBlob{
+ Data: body,
+ Mtime: time.Now(),
+ Uncommitted: make(map[string][]byte),
+ }
+ case r.Method == "PUT" && r.Form.Get("comp") == "block":
+ if !blobExists {
+ log.Printf("Got block for nonexistent blob: %+v", r)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blockID, err := base64.StdEncoding.DecodeString(r.Form.Get("blockid"))
+ if err != nil || len(blockID) == 0 {
+ log.Printf("Invalid blockid: %+q", r.Form.Get("blockid"))
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Uncommitted[string(blockID)] = body
+ rw.WriteHeader(http.StatusCreated)
+ case r.Method == "PUT" && r.Form.Get("comp") == "blocklist":
+ bl := &blockListRequestBody{}
+ if err := xml.Unmarshal(body, bl); err != nil {
+ log.Printf("xml Unmarshal: %s", err)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ for _, encBlockID := range bl.Uncommitted {
+ blockID, err := base64.StdEncoding.DecodeString(encBlockID)
+ if err != nil || len(blockID) == 0 || blob.Uncommitted[string(blockID)] == nil {
+ log.Printf("Invalid blockid: %+q", encBlockID)
+ rw.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ blob.Data = blob.Uncommitted[string(blockID)]
+ log.Printf("body %+q, bl %+v, blockID %+q, data %+q", body, bl, blockID, blob.Data)
+ }
+ rw.WriteHeader(http.StatusCreated)
+ case (r.Method == "GET" || r.Method == "HEAD") && hash != "":
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ if r.Method == "GET" {
+ if _, err := rw.Write(blob.Data); err != nil {
+ log.Printf("write %+q: %s", blob.Data, err)
+ }
+ }
+ case r.Method == "DELETE" && hash != "":
+ if !blobExists {
+ rw.WriteHeader(http.StatusNotFound)
+ return
+ }
+ delete(h.blobs, container + "|" + hash)
+ rw.WriteHeader(http.StatusAccepted)
+ case r.Method == "GET" && r.Form.Get("comp") == "list" && r.Form.Get("restype") == "container":
+ prefix := container + "|" + r.Form.Get("prefix")
+ marker := r.Form.Get("marker")
+
+ maxResults := 2
+ if n, err := strconv.Atoi(r.Form.Get("maxresults")); err == nil && n >= 1 && n <= 5000 {
+ maxResults = n
+ }
+
+ resp := storage.BlobListResponse{
+ Marker: marker,
+ NextMarker: "",
+ MaxResults: int64(maxResults),
+ }
+ var hashes sort.StringSlice
+ for k := range h.blobs {
+ if strings.HasPrefix(k, prefix) {
+ hashes = append(hashes, k[len(container)+1:])
+ }
+ }
+ hashes.Sort()
+ for _, hash := range hashes {
+ if len(resp.Blobs) == maxResults {
+ resp.NextMarker = hash
+ break
+ }
+ if len(resp.Blobs) > 0 || marker == "" || marker == hash {
+ blob := h.blobs[container + "|" + hash]
+ resp.Blobs = append(resp.Blobs, storage.Blob{
+ Name: hash,
+ Properties: storage.BlobProperties{
+ LastModified: blob.Mtime.Format(time.RFC1123),
+ ContentLength: int64(len(blob.Data)),
+ },
+ })
+ }
+ }
+ buf, err := xml.Marshal(resp)
+ if err != nil {
+ log.Print(err)
+ rw.WriteHeader(http.StatusInternalServerError)
+ }
+ rw.Write(buf)
+ default:
+ log.Printf("azStubHandler: not implemented: %+v Body:%+q", r, body)
+ rw.WriteHeader(http.StatusNotImplemented)
+ }
}
// azStubDialer is a net.Dialer that notices when the Azure driver
@@ -34,7 +219,7 @@ type azStubDialer struct {
var localHostPortRe = regexp.MustCompile(`(127\.0\.0\.1|localhost|\[::1\]):\d+`)
func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
if hp := localHostPortRe.FindString(address); hp != "" {
- log.Println("custom dialer: dial", hp, "instead of", address)
+ log.Println("azStubDialer: dial", hp, "instead of", address)
address = hp
}
return d.Dialer.Dial(network, address)
@@ -42,23 +227,43 @@ func (d *azStubDialer) Dial(network, address string) (net.Conn, error) {
type TestableAzureBlobVolume struct {
*AzureBlobVolume
- azStub *httptest.Server
- t *testing.T
+ azHandler *azStubHandler
+ azStub *httptest.Server
+ t *testing.T
}
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool) *TestableAzureBlobVolume {
- azStub := httptest.NewServer(azStubHandler{})
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool) TestableVolume {
+ azHandler := newAzStubHandler()
+ azStub := httptest.NewServer(azHandler)
- stubURLBase := strings.Split(azStub.URL, "://")[1]
- azClient, err := storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false)
- if err != nil {
- t.Fatal(err)
+ var azClient storage.Client
+
+ container := azureTestContainer
+ if container == "" {
+ // Connect to stub instead of real Azure storage service
+ stubURLBase := strings.Split(azStub.URL, "://")[1]
+ var err error
+ if azClient, err = storage.NewClient(emulatorAccountName, emulatorAccountKey, stubURLBase, storage.DefaultAPIVersion, false); err != nil {
+ t.Fatal(err)
+ }
+ container = "fakecontainername"
+ } else {
+ // Connect to real Azure storage service
+ accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
+ if err != nil {
+ t.Fatal(err)
+ }
+ azClient, err = storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ t.Fatal(err)
+ }
}
- v := NewAzureBlobVolume(azClient, "fakecontainername", readonly)
+ v := NewAzureBlobVolume(azClient, container, readonly)
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
+ azHandler: azHandler,
azStub: azStub,
t: t,
}
@@ -89,10 +294,11 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
}
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.Put(locator, data)
+ v.azHandler.PutRaw(v.containerName, locator, data)
}
func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
+ v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
}
func (v *TestableAzureBlobVolume) Teardown() {
diff --git a/services/keepstore/collision.go b/services/keepstore/collision.go
index 210286a..447ffa0 100644
--- a/services/keepstore/collision.go
+++ b/services/keepstore/collision.go
@@ -1,6 +1,7 @@
package main
import (
+ "bytes"
"crypto/md5"
"fmt"
"io"
@@ -47,3 +48,37 @@ func collisionOrCorrupt(expectMD5 string, buf1, buf2 []byte, rdr io.Reader) erro
}
return <-outcome
}
+
+func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+ bufLen := 1 << 20
+ if bufLen > len(expect) && len(expect) > 0 {
+ // No need for bufLen to be longer than
+ // expect, except that len(buf)==0 would
+ // prevent us from handling empty readers the
+ // same way as non-empty readers: reading 0
+ // bytes at a time never reaches EOF.
+ bufLen = len(expect)
+ }
+ buf := make([]byte, bufLen)
+ cmp := expect
+
+ // Loop invariants: all data read so far matched what
+ // we expected, and the first N bytes of cmp are
+ // expected to equal the next N bytes read from
+ // rdr.
+ for {
+ n, err := rdr.Read(buf)
+ if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
+ }
+ cmp = cmp[n:]
+ if err == io.EOF {
+ if len(cmp) != 0 {
+ return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], nil, nil)
+ }
+ return nil
+ } else if err != nil {
+ return err
+ }
+ }
+}
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index b64f345..503e6b9 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -171,14 +171,14 @@ func testPutBlockWithDifferentContent(t *testing.T, factory TestableVolumeFactor
// Put must not return a nil error unless it has
// overwritten the existing data.
if bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("Put succeeded but Get returned %+v, expected %+v", buf, TestBlock2)
+ t.Errorf("Put succeeded but Get returned %+q, expected %+q", buf, TestBlock2)
}
} else {
// It is permissible for Put to fail, but it must
// leave us with either the original data, the new
// data, or nothing at all.
if getErr == nil && bytes.Compare(buf, TestBlock) != 0 && bytes.Compare(buf, TestBlock2) != 0 {
- t.Errorf("Put failed but Get returned %+v, which is neither %+v nor %+v", buf, TestBlock, TestBlock2)
+ t.Errorf("Put failed but Get returned %+q, which is neither %+q nor %+q", buf, TestBlock, TestBlock2)
}
}
if getErr == nil {
@@ -214,26 +214,32 @@ func testPutMultipleBlocks(t *testing.T, factory TestableVolumeFactory) {
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash2)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock2) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock2)
+ } else {
+ if bytes.Compare(data, TestBlock2) != 0 {
+ t.Errorf("Block present, but got %+q, expected %+q", data, TestBlock2)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
data, err = v.Get(TestHash3)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock3) != 0 {
- t.Errorf("Block present, but content is incorrect: Expected: %v Found: %v", data, TestBlock3)
+ } else {
+ if bytes.Compare(data, TestBlock3) != 0 {
+ t.Errorf("Block present, but to %+q, expected %+q", data, TestBlock3)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// testPutAndTouch
@@ -360,6 +366,7 @@ func testIndexTo(t *testing.T, factory TestableVolumeFactory) {
func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
@@ -373,10 +380,12 @@ func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
data, err := v.Get(TestHash)
if err != nil {
t.Error(err)
- } else if bytes.Compare(data, TestBlock) != 0 {
- t.Error("Block still present, but content is incorrect: %+v != %+v", data, TestBlock)
+ } else {
+ if bytes.Compare(data, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", data, TestBlock)
+ }
+ bufs.Put(data)
}
- bufs.Put(data)
}
// Calling Delete() for a block with a timestamp older than
@@ -385,13 +394,14 @@ func testDeleteNewBlock(t *testing.T, factory TestableVolumeFactory) {
func testDeleteOldBlock(t *testing.T, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+ blobSignatureTTL = 300 * time.Second
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL*time.Second))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
if err := v.Delete(TestHash); err != nil {
t.Error(err)
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5d09e84..f498c3c 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -2,7 +2,6 @@ package main
import (
"bufio"
- "bytes"
"errors"
"flag"
"fmt"
@@ -209,43 +208,11 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
// bytes.Compare(), but uses less memory.
func (v *UnixVolume) Compare(loc string, expect []byte) error {
path := v.blockPath(loc)
- stat, err := v.stat(path)
- if err != nil {
+ if _, err := v.stat(path); err != nil {
return err
}
- bufLen := 1 << 20
- if int64(bufLen) > stat.Size() {
- bufLen = int(stat.Size())
- if bufLen < 1 {
- // len(buf)==0 would prevent us from handling
- // empty files the same way as non-empty
- // files, because reading 0 bytes at a time
- // never reaches EOF.
- bufLen = 1
- }
- }
- cmp := expect
- buf := make([]byte, bufLen)
return v.getFunc(path, func(rdr io.Reader) error {
- // Loop invariants: all data read so far matched what
- // we expected, and the first N bytes of cmp are
- // expected to equal the next N bytes read from
- // reader.
- for {
- n, err := rdr.Read(buf)
- if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
- }
- cmp = cmp[n:]
- if err == io.EOF {
- if len(cmp) != 0 {
- return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
- }
- return nil
- } else if err != nil {
- return err
- }
- }
+ return compareReaderWithBuf(rdr, expect, loc[:32])
})
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list