[ARVADOS] updated: 38b15ad14b941e8935bdfae3ddcdb83f7d8e20af
git at public.curoverse.com
git at public.curoverse.com
Wed Dec 16 15:37:26 EST 2015
Summary of changes:
.../workbench/test/integration/ajax_errors_test.rb | 1 +
services/keepstore/azure_blob_volume.go | 84 +++++++++++++++++---
services/keepstore/azure_blob_volume_test.go | 89 +++++++++++++++++++++-
services/keepstore/volume_generic_test.go | 18 ++---
4 files changed, 167 insertions(+), 25 deletions(-)
via 38b15ad14b941e8935bdfae3ddcdb83f7d8e20af (commit)
via 80f547e339c452e5b03be5beee00e845d56d8e18 (commit)
via 2653b4df0d2e8d13d039573820ee5cfb1b86b22a (commit)
via 2bbb912c6a41bf8046cf36a77b40541f224b703d (commit)
from 39ccab11524517c101fad39eab02603022f15a99 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 38b15ad14b941e8935bdfae3ddcdb83f7d8e20af
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 16 15:34:43 2015 -0500
7942: Skip unreliable test. refs #7942
diff --git a/apps/workbench/test/integration/ajax_errors_test.rb b/apps/workbench/test/integration/ajax_errors_test.rb
index 14ce35d..b8ed8a1 100644
--- a/apps/workbench/test/integration/ajax_errors_test.rb
+++ b/apps/workbench/test/integration/ajax_errors_test.rb
@@ -7,6 +7,7 @@ class AjaxErrorsTest < ActionDispatch::IntegrationTest
end
test 'load pane with deleted session' do
+ skip 'unreliable test'
# Simulate loading a page in browser-tab A, hitting "Log out" in
# browser-tab B, then returning to browser-tab A and choosing a
# different tab. (Automatic tab refreshes will behave similarly.)
commit 80f547e339c452e5b03be5beee00e845d56d8e18
Merge: 39ccab1 2653b4d
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 16 14:51:44 2015 -0500
Merge branch '7888-azure-read-mux' refs #7888
commit 2653b4df0d2e8d13d039573820ee5cfb1b86b22a
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 16 14:51:25 2015 -0500
7888: Fix whitespace / gofmt
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index fae4a9e..7580a20 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -14,15 +14,15 @@ import (
)
type TB interface {
- Error(args ...interface{})
- Errorf(format string, args ...interface{})
- Fail()
- FailNow()
- Failed() bool
- Fatal(args ...interface{})
- Fatalf(format string, args ...interface{})
- Log(args ...interface{})
- Logf(format string, args ...interface{})
+ Error(args ...interface{})
+ Errorf(format string, args ...interface{})
+ Fail()
+ FailNow()
+ Failed() bool
+ Fatal(args ...interface{})
+ Fatalf(format string, args ...interface{})
+ Log(args ...interface{})
+ Logf(format string, args ...interface{})
}
// A TestableVolumeFactory returns a new TestableVolume. The factory
commit 2bbb912c6a41bf8046cf36a77b40541f224b703d
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Dec 16 12:43:35 2015 -0500
7888: Option to use multiple concurrent range requests when fetching from Azure.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 0f98e6e..c0033d9 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -11,12 +11,14 @@ import (
"os"
"regexp"
"strings"
+ "sync"
"time"
"github.com/curoverse/azure-sdk-for-go/storage"
)
var (
+ azureMaxGetBytes int
azureStorageAccountName string
azureStorageAccountKeyFile string
azureStorageReplication int
@@ -85,6 +87,11 @@ func init() {
"azure-storage-replication",
3,
"Replication level to report to clients when data is stored in an Azure container.")
+ flag.IntVar(
+ &azureMaxGetBytes,
+ "azure-max-get-bytes",
+ BlockSize,
+ fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
}
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
@@ -163,20 +170,72 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
}
func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
- rdr, err := v.bsClient.GetBlob(v.containerName, loc)
- if err != nil {
- return nil, v.translateError(err)
+ expectSize := BlockSize
+ if azureMaxGetBytes < BlockSize {
+ // Unfortunately the handler doesn't tell us how long the blob
+ // is expected to be, so we have to ask Azure.
+ props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ if err != nil {
+ return nil, v.translateError(err)
+ }
+ if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
+ return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
+ }
+ expectSize = int(props.ContentLength)
}
- defer rdr.Close()
- buf := bufs.Get(BlockSize)
- n, err := io.ReadFull(rdr, buf)
- switch err {
- case nil, io.EOF, io.ErrUnexpectedEOF:
- return buf[:n], nil
- default:
- bufs.Put(buf)
- return nil, err
+
+ buf := bufs.Get(expectSize)
+ if expectSize == 0 {
+ return buf, nil
+ }
+
+ // We'll update this actualSize if/when we get the last piece.
+ actualSize := -1
+ pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
+ errors := make([]error, pieces)
+ var wg sync.WaitGroup
+ wg.Add(pieces)
+ for p := 0; p < pieces; p++ {
+ go func(p int) {
+ defer wg.Done()
+ startPos := p * azureMaxGetBytes
+ endPos := startPos + azureMaxGetBytes
+ if endPos > expectSize {
+ endPos = expectSize
+ }
+ var rdr io.ReadCloser
+ var err error
+ if startPos == 0 && endPos == expectSize {
+ rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+ } else {
+ rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
+ }
+ if err != nil {
+ errors[p] = err
+ return
+ }
+ defer rdr.Close()
+ n, err := io.ReadFull(rdr, buf[startPos:endPos])
+ if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
+ // If we don't know the actual size,
+ // and just tried reading 64 MiB, it's
+ // normal to encounter EOF.
+ } else if err != nil {
+ errors[p] = err
+ }
+ if p == pieces-1 {
+ actualSize = startPos + n
+ }
+ }(p)
}
+ wg.Wait()
+ for _, err := range errors {
+ if err != nil {
+ bufs.Put(buf)
+ return nil, v.translateError(err)
+ }
+ }
+ return buf[:actualSize], nil
}
// Compare the given data with existing stored data.
@@ -317,6 +376,7 @@ func (v *AzureBlobVolume) translateError(err error) error {
}
var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
+
func (v *AzureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index b8bf5cb..439b402 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -2,6 +2,7 @@ package main
import (
"bytes"
+ "crypto/md5"
"encoding/base64"
"encoding/xml"
"flag"
@@ -92,6 +93,8 @@ func (h *azStubHandler) unlockAndRace() {
h.Lock()
}
+var rangeRegexp = regexp.MustCompile(`^bytes=(\d+)-(\d+)$`)
+
func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
h.Lock()
defer h.Unlock()
@@ -204,11 +207,24 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusNotFound)
return
}
+ data := blob.Data
+ if rangeSpec := rangeRegexp.FindStringSubmatch(r.Header.Get("Range")); rangeSpec != nil {
+ b0, err0 := strconv.Atoi(rangeSpec[1])
+ b1, err1 := strconv.Atoi(rangeSpec[2])
+ if err0 != nil || err1 != nil || b0 >= len(data) || b1 >= len(data) || b0 > b1 {
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data)))
+ rw.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ rw.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", b0, b1, len(data)))
+ rw.WriteHeader(http.StatusPartialContent)
+ data = data[b0 : b1+1]
+ }
rw.Header().Set("Last-Modified", blob.Mtime.Format(time.RFC1123))
- rw.Header().Set("Content-Length", strconv.Itoa(len(blob.Data)))
+ rw.Header().Set("Content-Length", strconv.Itoa(len(data)))
if r.Method == "GET" {
- if _, err := rw.Write(blob.Data); err != nil {
- log.Printf("write %+q: %s", blob.Data, err)
+ if _, err := rw.Write(data); err != nil {
+ log.Printf("write %+q: %s", data, err)
}
}
h.unlockAndRace()
@@ -346,6 +362,27 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
})
}
+func TestAzureBlobVolumeConcurrentRanges(t *testing.T) {
+ defer func(b int) {
+ azureMaxGetBytes = b
+ }(azureMaxGetBytes)
+
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
+ // Test (BlockSize mod azureMaxGetBytes)==0 and !=0 cases
+ for _, azureMaxGetBytes = range []int{2 << 22, 2<<22 - 1} {
+ DoGenericVolumeTests(t, func(t TB) TestableVolume {
+ return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
+ })
+ }
+}
+
func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
defer func(t http.RoundTripper) {
http.DefaultTransport = t
@@ -360,6 +397,50 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
})
}
+func TestAzureBlobVolumeRangeFenceposts(t *testing.T) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+
+ for _, size := range []int{
+ 2<<22 - 1, // one <max read
+ 2 << 22, // one =max read
+ 2<<22 + 1, // one =max read, one <max
+ 2 << 23, // two =max reads
+ BlockSize - 1,
+ BlockSize,
+ } {
+ data := make([]byte, size)
+ for i := range data {
+ data[i] = byte((i + 7) & 0xff)
+ }
+ hash := fmt.Sprintf("%x", md5.Sum(data))
+ err := v.Put(hash, data)
+ if err != nil {
+ t.Error(err)
+ }
+ gotData, err := v.Get(hash)
+ if err != nil {
+ t.Error(err)
+ }
+ gotHash := fmt.Sprintf("%x", md5.Sum(gotData))
+ gotLen := len(gotData)
+ bufs.Put(gotData)
+ if gotLen != size {
+ t.Error("length mismatch: got %d != %d", gotLen, size)
+ }
+ if gotHash != hash {
+ t.Error("hash mismatch: got %s != %s", gotHash, hash)
+ }
+ }
+}
+
func TestAzureBlobVolumeReplication(t *testing.T) {
for r := 1; r <= 4; r++ {
v := NewTestableAzureBlobVolume(t, false, r)
@@ -435,7 +516,7 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
t.Errorf("Index %+q should be empty", buf.Bytes())
}
- v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+ v.TouchWithDate(TestHash, time.Now().Add(-1982*time.Millisecond))
allDone := make(chan struct{})
go func() {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list