[ARVADOS] updated: 38b15ad14b941e8935bdfae3ddcdb83f7d8e20af

git at public.curoverse.com git at public.curoverse.com
Wed Dec 16 15:37:26 EST 2015


Summary of changes:
 .../workbench/test/integration/ajax_errors_test.rb |  1 +
 services/keepstore/azure_blob_volume.go            | 84 +++++++++++++++++---
 services/keepstore/azure_blob_volume_test.go       | 89 +++++++++++++++++++++-
 services/keepstore/volume_generic_test.go          | 18 ++---
 4 files changed, 167 insertions(+), 25 deletions(-)

       via  38b15ad14b941e8935bdfae3ddcdb83f7d8e20af (commit)
       via  80f547e339c452e5b03be5beee00e845d56d8e18 (commit)
       via  2653b4df0d2e8d13d039573820ee5cfb1b86b22a (commit)
       via  2bbb912c6a41bf8046cf36a77b40541f224b703d (commit)
      from  39ccab11524517c101fad39eab02603022f15a99 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 38b15ad14b941e8935bdfae3ddcdb83f7d8e20af
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Dec 16 15:34:43 2015 -0500

    7942: Skip unreliable test. refs #7942

diff --git a/apps/workbench/test/integration/ajax_errors_test.rb b/apps/workbench/test/integration/ajax_errors_test.rb
index 14ce35d..b8ed8a1 100644
--- a/apps/workbench/test/integration/ajax_errors_test.rb
+++ b/apps/workbench/test/integration/ajax_errors_test.rb
@@ -7,6 +7,7 @@ class AjaxErrorsTest < ActionDispatch::IntegrationTest
   end
 
   test 'load pane with deleted session' do
+    skip 'unreliable test'
     # Simulate loading a page in browser-tab A, hitting "Log out" in
     # browser-tab B, then returning to browser-tab A and choosing a
     # different tab. (Automatic tab refreshes will behave similarly.)

commit 80f547e339c452e5b03be5beee00e845d56d8e18
Merge: 39ccab1 2653b4d
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Dec 16 14:51:44 2015 -0500

    Merge branch '7888-azure-read-mux' refs #7888


commit 2653b4df0d2e8d13d039573820ee5cfb1b86b22a
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Dec 16 14:51:25 2015 -0500

    7888: Fix whitespace / gofmt

diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index fae4a9e..7580a20 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -14,15 +14,15 @@ import (
 )
 
 type TB interface {
-        Error(args ...interface{})
-        Errorf(format string, args ...interface{})
-        Fail()
-        FailNow()
-        Failed() bool
-        Fatal(args ...interface{})
-        Fatalf(format string, args ...interface{})
-        Log(args ...interface{})
-        Logf(format string, args ...interface{})
+	Error(args ...interface{})
+	Errorf(format string, args ...interface{})
+	Fail()
+	FailNow()
+	Failed() bool
+	Fatal(args ...interface{})
+	Fatalf(format string, args ...interface{})
+	Log(args ...interface{})
+	Logf(format string, args ...interface{})
 }
 
 // A TestableVolumeFactory returns a new TestableVolume. The factory

commit 2bbb912c6a41bf8046cf36a77b40541f224b703d
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Dec 16 12:43:35 2015 -0500

    7888: Option to use multiple concurrent range requests when fetching from Azure.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 0f98e6e..c0033d9 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -11,12 +11,14 @@ import (
 	"os"
 	"regexp"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/curoverse/azure-sdk-for-go/storage"
 )
 
 var (
+	azureMaxGetBytes           int
 	azureStorageAccountName    string
 	azureStorageAccountKeyFile string
 	azureStorageReplication    int
@@ -85,6 +87,11 @@ func init() {
 		"azure-storage-replication",
 		3,
 		"Replication level to report to clients when data is stored in an Azure container.")
+	flag.IntVar(
+		&azureMaxGetBytes,
+		"azure-max-get-bytes",
+		BlockSize,
+		fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
 }
 
 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -163,20 +170,72 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
 }
 
 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
-	rdr, err := v.bsClient.GetBlob(v.containerName, loc)
-	if err != nil {
-		return nil, v.translateError(err)
+	expectSize := BlockSize
+	if azureMaxGetBytes < BlockSize {
+		// Unfortunately the handler doesn't tell us how long the blob
+		// is expected to be, so we have to ask Azure.
+		props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+		if err != nil {
+			return nil, v.translateError(err)
+		}
+		if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
+			return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+		}
+		expectSize = int(props.ContentLength)
 	}
-	defer rdr.Close()
-	buf := bufs.Get(BlockSize)
-	n, err := io.ReadFull(rdr, buf)
-	switch err {
-	case nil, io.EOF, io.ErrUnexpectedEOF:
-		return buf[:n], nil
-	default:
-		bufs.Put(buf)
-		return nil, err
+
+	buf := bufs.Get(expectSize)
+	if expectSize == 0 {
+		return buf, nil
+	}
+
+	// We'll update this actualSize if/when we get the last piece.
+	actualSize := -1
+	pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
+	errors := make([]error, pieces)
+	var wg sync.WaitGroup
+	wg.Add(pieces)
+	for p := 0; p < pieces; p++ {
+		go func(p int) {
+			defer wg.Done()
+			startPos := p * azureMaxGetBytes
+			endPos := startPos + azureMaxGetBytes
+			if endPos > expectSize {
+				endPos = expectSize
+			}
+			var rdr io.ReadCloser
+			var err error
+			if startPos == 0 && endPos == expectSize {
+				rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+			} else {
+				rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
+			}
+			if err != nil {
+				errors[p] = err
+				return
+			}
+			defer rdr.Close()
+			n, err := io.ReadFull(rdr, buf[startPos:endPos])
+			if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+				// If we don't know the actual size,
+				// and just tried reading 64 MiB, it's
+				// normal to encounter EOF.
+			} else if err != nil {
+				errors[p] = err
+			}
+			if p == pieces-1 {
+				actualSize = startPos + n
+			}
+		}(p)
 	}
+	wg.Wait()
+	for _, err := range errors {
+		if err != nil {
+			bufs.Put(buf)
+			return nil, v.translateError(err)
+		}
+	}
+	return buf[:actualSize], nil
 }
 
 // Compare the given data with existing stored data.
@@ -317,6 +376,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
 }
 
 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+
 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 	return keepBlockRegexp.MatchString(s)
 }
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index b8bf5cb..439b402 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"crypto/md5"
 	"encoding/base64"
 	"encoding/xml"
 	"flag"
@@ -92,6 +93,8 @@ func (h *azStubHandler) unlockAndRace() {
 	h.Lock()
 }
 
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
+
 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 	h.Lock()
 	defer h.Unlock()
@@ -204,11 +207,24 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 			rw.WriteHeader(http.StatusNotFound)
 			return
 		}
+		data := blob.Data
+		if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+			b0, err0 := strconv.Atoi(rangeSpec[1])
+			b1, err1 := strconv.Atoi(rangeSpec[2])
+			if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+				rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+				rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+				return
+			}
+			rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+			rw.WriteHeader(http.StatusPartialContent)
+			data = data[b0 : b1+1]
+		}
 		rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
-		rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+		rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
 		if r.Method == "GET" {
-			if _, err := rw.Write(blob.Data); err != nil {
-				log.Printf("write %+q: %s", blob.Data, err)
+			if _, err := rw.Write(data); err != nil {
+				log.Printf("write %+q: %s", data, err)
 			}
 		}
 		h.unlockAndRace()
@@ -346,6 +362,27 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
 	})
 }
 
+func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
+	defer func(b int) {
+		azureMaxGetBytes = b
+	}(azureMaxGetBytes)
+
+	defer func(t http.RoundTripper) {
+		http.DefaultTransport = t
+	}(http.DefaultTransport)
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+	azureWriteRaceInterval = time.Millisecond
+	azureWriteRacePollTime = time.Nanosecond
+	// Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
+	for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
+		DoGenericVolumeTests(t, func(t TB) TestableVolume {
+			return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+		})
+	}
+}
+
 func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
 	defer func(t http.RoundTripper) {
 		http.DefaultTransport = t
@@ -360,6 +397,50 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
 	})
 }
 
+func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
+	defer func(t http.RoundTripper) {
+		http.DefaultTransport = t
+	}(http.DefaultTransport)
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+
+	v := NewTestableAzureBlobVolume(t, false, 3)
+	defer v.Teardown()
+
+	for _, size := range []int{
+		2<<22 - 1, // one <max read
+		2 << 22,   // one =max read
+		2<<22 + 1, // one =max read, one <max
+		2 << 23,   // two =max reads
+		BlockSize - 1,
+		BlockSize,
+	} {
+		data := make([]byte, size)
+		for i := range data {
+			data[i] = byte((i + 7) & 0xff)
+		}
+		hash := fmt.Sprintf("%x", md5.Sum(data))
+		err := v.Put(hash, data)
+		if err != nil {
+			t.Error(err)
+		}
+		gotData, err := v.Get(hash)
+		if err != nil {
+			t.Error(err)
+		}
+		gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
+		gotLen := len(gotData)
+		bufs.Put(gotData)
+		if gotLen != size {
+			t.Error("length mismatch: got %d != %d", gotLen, size)
+		}
+		if gotHash != hash {
+			t.Error("hash mismatch: got %s != %s", gotHash, hash)
+		}
+	}
+}
+
 func TestAzureBlobVolumeReplication(t *testing.T) {
 	for r := 1; r <= 4; r++ {
 		v := NewTestableAzureBlobVolume(t, false, r)
@@ -435,7 +516,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 		t.Errorf("Index %+q should be empty", buf.Bytes())
 	}
 
-	v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+	v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
 
 	allDone := make(chan struct{})
 	go func() {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list