[ARVADOS] created: 537a84cd45c915ea7b6f607b17c1cf665ed71561

git at public.curoverse.com git at public.curoverse.com
Thu Dec 3 17:27:08 EST 2015


        at  537a84cd45c915ea7b6f607b17c1cf665ed71561 (commit)


commit 537a84cd45c915ea7b6f607b17c1cf665ed71561
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Dec 3 17:35:30 2015 -0500

    7888: Option to use multiple concurrent range requests when fetching from Azure.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index e9fda2a..d57ec4f 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -11,12 +11,14 @@ import (
 	"os"
 	"regexp"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/curoverse/azure-sdk-for-go/storage"
 )
 
 var (
+	azureMaxGetBytes           int
 	azureStorageAccountName    string
 	azureStorageAccountKeyFile string
 	azureStorageReplication    int
@@ -85,6 +87,11 @@ func init() {
 		"azure-storage-replication",
 		3,
 		"Replication level to report to clients when data is stored in an Azure container.")
+	flag.IntVar(
+		&azureMaxGetBytes,
+		"azure-max-get-bytes",
+		BlockSize,
+		fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
 }
 
 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -163,20 +170,73 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
 }
 
 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
-	rdr, err := v.bsClient.GetBlob(v.containerName, loc)
-	if err != nil {
-		return nil, v.translateError(err)
+	expectSize := BlockSize
+	if azureMaxGetBytes < BlockSize {
+		// Unfortunately the handler doesn't tell us how long the blob
+		// is expected to be, so we have to ask Azure.
+		props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+		if err != nil {
+			return nil, v.translateError(err)
+		}
+		if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
+			return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+		}
+		expectSize = int(props.ContentLength)
 	}
-	defer rdr.Close()
-	buf := bufs.Get(BlockSize)
-	n, err := io.ReadFull(rdr, buf)
-	switch err {
-	case nil, io.EOF, io.ErrUnexpectedEOF:
-		return buf[:n], nil
-	default:
-		bufs.Put(buf)
-		return nil, err
+
+	buf := bufs.Get(expectSize)
+	if expectSize == 0 {
+		return buf, nil
+	}
+
+	// We'll update this actualSize if/when we get the last piece.
+	actualSize := -1
+	pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
+	errors := make([]error, pieces)
+	var wg sync.WaitGroup
+	wg.Add(pieces)
+	for p := 0; p < pieces; p++ {
+		go func(p int) {
+			defer wg.Done()
+			startPos := p * azureMaxGetBytes
+			endPos := startPos + azureMaxGetBytes
+			if endPos > expectSize {
+				endPos = expectSize
+			}
+			var rdr io.ReadCloser
+			var err error
+			if startPos == 0 && endPos == expectSize {
+				rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+			} else {
+				rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d/*", startPos, endPos-1))
+			}
+			if err != nil {
+				errors[p] = err
+				return
+			}
+			defer rdr.Close()
+			n, err := io.ReadFull(rdr, buf[startPos:endPos])
+			if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+				// If we don't know the actual size,
+				// and just tried reading 64 MiB, it's
+				// normal to encounter EOF.
+			} else if err != nil {
+				log.Printf("debug: [%d:%d] %s", startPos, endPos, err)
+				errors[p] = err
+			}
+			if p == pieces-1 {
+				actualSize = startPos + n
+			}
+		}(p)
 	}
+	wg.Wait()
+	for _, err := range errors {
+		if err != nil {
+			bufs.Put(buf)
+			return nil, v.translateError(err)
+		}
+	}
+	return buf[:actualSize], nil
 }
 
 // Compare the given data with existing stored data.
@@ -317,6 +377,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
 }
 
 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+
 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 	return keepBlockRegexp.MatchString(s)
 }
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index a240c23..d6c5060 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -92,6 +92,8 @@ func (h *azStubHandler) unlockAndRace() {
 	h.Lock()
 }
 
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)/(\*|\d+)$`)
+
 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 	h.Lock()
 	defer h.Unlock()
@@ -204,11 +206,24 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			rw.WriteHeader(http.StatusNotFound)
 			return
 		}
+		data := blob.Data
+		if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+			b0, err0 := strconv.Atoi(rangeSpec[1])
+			b1, err1 := strconv.Atoi(rangeSpec[2])
+			if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+				rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+				rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+				return
+			}
+			rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+			rw.WriteHeader(http.StatusPartialContent)
+			data = data[b0:b1+1]
+		}
 		rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
-		rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+		rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
 		if r.Method == "GET" {
-			if _, err := rw.Write(blob.Data); err != nil {
-				log.Printf("write %+q: %s", blob.Data, err)
+			if _, err := rw.Write(data); err != nil {
+				log.Printf("write %+q: %s", data, err)
 			}
 		}
 		h.unlockAndRace()
@@ -346,6 +361,26 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
 	})
 }
 
+func TestAzureBlobVolume4MiBRangeWithGeneric(t *testing.T) {
+	defer func(b int) {
+		azureMaxGetBytes = b
+	}(azureMaxGetBytes)
+	azureMaxGetBytes = 2 << 22
+
+	defer func(t http.RoundTripper) {
+		http.DefaultTransport = t
+	}(http.DefaultTransport)
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+	azureWriteRaceInterval = time.Millisecond
+	azureWriteRacePollTime = time.Nanosecond
+	DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
+		return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+	})
+}
+
+
 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
 	defer func(t http.RoundTripper) {
 		http.DefaultTransport = t

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list