[ARVADOS] created: 537a84cd45c915ea7b6f607b17c1cf665ed71561
git at public.curoverse.com
git at public.curoverse.com
Thu Dec 3 17:27:08 EST 2015
at 537a84cd45c915ea7b6f607b17c1cf665ed71561 (commit)
commit 537a84cd45c915ea7b6f607b17c1cf665ed71561
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Dec 3 17:35:30 2015 -0500
7888: Option to use multiple concurrent range requests when fetching from Azure.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index e9fda2a..d57ec4f 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -11,12 +11,14 @@ import (
"os"
"regexp"
"strings"
+ "sync"
"time"
"github.com/curoverse/azure-sdk-for-go/storage"
)
var (
+ azureMaxGetBytes int
azureStorageAccountName string
azureStorageAccountKeyFile string
azureStorageReplication int
@@ -85,6 +87,11 @@ func init() {
"azure-storage-replication",
3,
"Replication level to report to clients when data is stored in an Azure container.")
+ flag.IntVar(
+ &azureMaxGetBytes,
+ "azure-max-get-bytes",
+ BlockSize,
+ fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
}
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -163,20 +170,73 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
}
func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
- rdr, err := v.bsClient.GetBlob(v.containerName, loc)
- if err != nil {
- return nil, v.translateError(err)
+ expectSize := BlockSize
+ if azureMaxGetBytes < BlockSize {
+ // Unfortunately the handler doesn't tell us how long the blob
+ // is expected to be, so we have to ask Azure.
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
+ return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+ }
+ expectSize = int(props.ContentLength)
}
- defer rdr.Close()
- buf := bufs.Get(BlockSize)
- n, err := io.ReadFull(rdr, buf)
- switch err {
- case nil, io.EOF, io.ErrUnexpectedEOF:
- return buf[:n], nil
- default:
- bufs.Put(buf)
- return nil, err
+
+ buf := bufs.Get(expectSize)
+ if expectSize == 0 {
+ return buf, nil
+ }
+
+ // We'll update this actualSize if/when we get the last piece.
+ actualSize := -1
+ pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
+ errors := make([]error, pieces)
+ var wg sync.WaitGroup
+ wg.Add(pieces)
+ for p := 0; p < pieces; p++ {
+ go func(p int) {
+ defer wg.Done()
+ startPos := p * azureMaxGetBytes
+ endPos := startPos + azureMaxGetBytes
+ if endPos > expectSize {
+ endPos = expectSize
+ }
+ var rdr io.ReadCloser
+ var err error
+ if startPos == 0 && endPos == expectSize {
+ rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+ } else {
+ rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d/*", startPos, endPos-1))
+ }
+ if err != nil {
+ errors[p] = err
+ return
+ }
+ defer rdr.Close()
+ n, err := io.ReadFull(rdr, buf[startPos:endPos])
+ if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+ // If we don't know the actual size,
+ // and just tried reading 64 MiB, it's
+ // normal to encounter EOF.
+ } else if err != nil {
+ log.Printf("debug: [%d:%d] %s", startPos, endPos, err)
+ errors[p] = err
+ }
+ if p == pieces-1 {
+ actualSize = startPos + n
+ }
+ }(p)
}
+ wg.Wait()
+ for _, err := range errors {
+ if err != nil {
+ bufs.Put(buf)
+ return nil, v.translateError(err)
+ }
+ }
+ return buf[:actualSize], nil
}
// Compare the given data with existing stored data.
@@ -317,6 +377,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
}
var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+
func (v *AzureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index a240c23..d6c5060 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -92,6 +92,8 @@ func (h *azStubHandler) unlockAndRace() {
h.Lock()
}
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)/(\*|\d+)$`)
+
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
@@ -204,11 +206,24 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusNotFound)
return
}
+ data := blob.Data
+ if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+ b0, err0 := strconv.Atoi(rangeSpec[1])
+ b1, err1 := strconv.Atoi(rangeSpec[2])
+ if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+ rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+ rw.WriteHeader(http.StatusPartialContent)
+ data = data[b0:b1+1]
+ }
rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
- rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
if r.Method == "GET" {
- if _, err := rw.Write(blob.Data); err != nil {
- log.Printf("write %+q: %s", blob.Data, err)
+ if _, err := rw.Write(data); err != nil {
+ log.Printf("write %+q: %s", data, err)
}
}
h.unlockAndRace()
@@ -346,6 +361,26 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
})
}
+func TestAzureBlobVolume4MiBRangeWithGeneric(t *testing.T) {
+ defer func(b int) {
+ azureMaxGetBytes = b
+ }(azureMaxGetBytes)
+ azureMaxGetBytes = 2 << 22
+
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
+ DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ })
+}
+
+
func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
defer func(t http.RoundTripper) {
http.DefaultTransport = t
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list