[ARVADOS] created: c47c3d3b01cbb5e966a963513a02f20e5d840cc1
Git user
git at public.curoverse.com
Mon Jul 4 15:41:28 EDT 2016
at c47c3d3b01cbb5e966a963513a02f20e5d840cc1 (commit)
commit c47c3d3b01cbb5e966a963513a02f20e5d840cc1
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jul 4 13:24:07 2016 -0400
8788: Serve HEAD requests without using the buffer pool.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 48cb026..60565e6 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -259,6 +259,18 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
return actualSize, nil
}
+func (v *AzureBlobVolume) GetReader(loc string) (io.ReadCloser, error) {
+ trashed, _, err := v.checkTrashed(loc)
+ if err != nil {
+ return nil, err
+ }
+ if trashed {
+ return nil, os.ErrNotExist
+ }
+ rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ return rdr, v.translateError(err)
+}
+
// Compare the given data with existing stored data.
func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
trashed, _, err := v.checkTrashed(loc)
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index f698982..363ca70 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -31,16 +31,19 @@ import (
func MakeRESTRouter() *mux.Router {
rest := mux.NewRouter()
- rest.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/{hash:[0-9a-f]{32}}+{hints}`,
- GetBlockHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}+{hints}`, GetBlockHandler).Methods("GET")
+
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, Head).Methods("HEAD")
+ rest.HandleFunc(`/{hash:[0-9a-f]{32}}+{hints}`, Head).Methods("HEAD")
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+
rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+
// List all blocks stored here. Privileged client only.
rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
+
// List blocks stored here whose hash has the given prefix.
// Privileged client only.
rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
@@ -64,19 +67,66 @@ func MakeRESTRouter() *mux.Router {
return rest
}
+func permissionOK(resp http.ResponseWriter, req *http.Request) bool {
+ if !enforcePermissions {
+ return true
+ }
+ locator := req.URL.Path[1:] // strip leading slash
+ if err := VerifySignature(locator, GetApiToken(req)); err == nil {
+ return true
+ } else {
+ http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
+ return false
+ }
+}
+
// BadRequestHandler is a HandleFunc to address bad requests.
func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
+// Head computes the checksum of stored data and returns 200 if it
+// finds a copy on any volume with a valid checksum.
+func Head(resp http.ResponseWriter, req *http.Request) {
+ if !permissionOK(resp, req) {
+ return
+ }
+ locator := mux.Vars(req)["hash"]
+ for _, vol := range KeepVM.AllReadable() {
+ rdr, err := vol.GetReader(locator)
+ if err != nil {
+ continue
+ }
+ h := md5.New()
+ size, err := io.Copy(h, rdr)
+ if err != nil {
+ if !os.IsNotExist(err) {
+ log.Printf("%s: read %s: %s", vol, locator, err)
+ }
+ continue
+ }
+ if rdr, ok := rdr.(io.Closer); ok {
+ err = rdr.Close()
+ if err != nil {
+ log.Printf("%s: read %s: close: %s", vol, locator, err)
+ }
+ continue
+ }
+ if h := fmt.Sprintf("%x", h); h != locator {
+ log.Printf("%s: verify %s: checksum mismatch: %s", vol, locator, h)
+ continue
+ }
+ resp.Header().Set("Content-Length", strconv.FormatInt(size, 10))
+ resp.Header().Set("Content-Type", "application/octet-stream")
+ return
+ }
+ http.Error(resp, "Not found", http.StatusNotFound)
+}
+
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- if enforcePermissions {
- locator := req.URL.Path[1:] // strip leading slash
- if err := VerifySignature(locator, GetApiToken(req)); err != nil {
- http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
- return
- }
+ if !permissionOK(resp, req) {
+ return
}
// TODO: Probe volumes to check whether the block _might_
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 98e1203..4628329 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -169,6 +169,14 @@ func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
}
}
+func (v *S3Volume) GetReader(loc string) (io.ReadCloser, error) {
+ rdr, err := v.Bucket.GetReader(loc)
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ return rdr, err
+}
+
func (v *S3Volume) Compare(loc string, expect []byte) error {
rdr, err := v.Bucket.GetReader(loc)
if err != nil {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 8ae6660..cd475bc 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -47,6 +47,13 @@ type Volume interface {
// was encountered opening/reading the stored data.
Compare(loc string, data []byte) error
+ // Return an io.ReadCloser that reads data for the given
+ // locator.
+ //
+ // As with Get, if the block does not exist, the returned
+ // error should satisfy os.IsNotExist(err).
+ GetReader(loc string) (io.ReadCloser, error)
+
// Put writes a block to an underlying storage device.
//
// loc is as described in Get.
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 5671b8d..686b8e8 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
+ "io/ioutil"
"os"
"strings"
"sync"
@@ -125,6 +126,17 @@ func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
return 0, os.ErrNotExist
}
+func (v *MockVolume) GetReader(loc string) (io.ReadCloser, error) {
+ v.gotCall("GetReader")
+ <-v.Gate
+ if v.Bad {
+ return nil, errors.New("Bad volume")
+ } else if block, ok := v.Store[loc]; ok {
+ return ioutil.NopCloser(bytes.NewReader(block)), nil
+ }
+ return nil, os.ErrNotExist
+}
+
func (v *MockVolume) Put(loc string, block []byte) error {
v.gotCall("Put")
<-v.Gate
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 90189dc..a39e762 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -180,6 +180,25 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
return stat, err
}
+// GetReader returns a ReadCloser that respects the serialize lock.
+func (v *UnixVolume) GetReader(loc string) (io.ReadCloser, error) {
+ unlock := nop
+ if v.locker != nil {
+ v.locker.Lock()
+ unlock = v.locker.Unlock
+ }
+ path := v.blockPath(loc)
+ f, err := os.Open(path)
+ if err != nil {
+ unlock()
+ return nil, err
+ }
+ return &unlockingReadCloser{
+ ReadCloser: f,
+ unlock: unlock,
+ }, nil
+}
+
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
@@ -583,3 +602,15 @@ func (v *UnixVolume) EmptyTrash() {
log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
+
+type unlockingReadCloser struct {
+ io.ReadCloser
+ unlock func()
+}
+
+func (urc *unlockingReadCloser) Close() error {
+ defer urc.unlock()
+ return urc.ReadCloser.Close()
+}
+
+func nop() {}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list