[ARVADOS] updated: d0dc273718a2f657643a2b2800d984c7a3a62f78
Git user
git at public.curoverse.com
Thu Dec 15 16:21:54 EST 2016
Summary of changes:
services/keepstore/azure_blob_volume.go | 45 +++++++++++++++----
services/keepstore/azure_blob_volume_test.go | 64 ++++++++++++++++++++++++++++
2 files changed, 101 insertions(+), 8 deletions(-)
discards 64325b327855daea30553ac43bb95851bb7f860a (commit)
via d0dc273718a2f657643a2b2800d984c7a3a62f78 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (64325b327855daea30553ac43bb95851bb7f860a)
\
N -- N -- N (d0dc273718a2f657643a2b2800d984c7a3a62f78)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d0dc273718a2f657643a2b2800d984c7a3a62f78
Author: Tom Clegg <tom at curoverse.com>
Date: Thu Dec 15 16:20:10 2016 -0500
10467: Abandon Azure requests if client hangs up.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 43cf83a..7534489 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -187,7 +187,7 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
}
var deadline time.Time
haveDeadline := false
- size, err := v.get(loc, buf)
+ size, err := v.get(ctx, loc, buf)
for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
// Seeing a brand new empty block probably means we're
// in a race with CreateBlob, which under the hood
@@ -208,8 +208,12 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
} else if time.Now().After(deadline) {
break
}
- time.Sleep(azureWriteRacePollTime)
- size, err = v.get(loc, buf)
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ case <-time.After(azureWriteRacePollTime):
+ }
+ size, err = v.get(ctx, loc, buf)
}
if haveDeadline {
log.Printf("Race ended with size==%d", size)
@@ -217,7 +221,9 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
return size, err
}
-func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
expectSize := len(buf)
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
@@ -239,10 +245,18 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
// We'll update this actualSize if/when we get the last piece.
actualSize := -1
pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
- errors := make([]error, pieces)
+ errors := make(chan error, pieces)
var wg sync.WaitGroup
wg.Add(pieces)
for p := 0; p < pieces; p++ {
+ // Each goroutine retrieves one piece. If we hit an
+ // error, it is sent to the errors chan so get() can
+ // return it -- but only if the error happens before
+ // ctx is done. This way, if ctx is done before we hit
+ // any other error (e.g., requesting client has hung
+ // up), we return the original ctx.Err() instead of
+ // the secondary errors from the transfers that got
+ // interrupted as a result.
go func(p int) {
defer wg.Done()
startPos := p * azureMaxGetBytes
@@ -252,23 +266,51 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
}
var rdr io.ReadCloser
var err error
- if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
- } else {
- rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ gotRdr := make(chan struct{})
+ go func() {
+ defer close(gotRdr)
+ if startPos == 0 && endPos == expectSize {
+ rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ } else {
+ rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ go func() {
+ <-gotRdr
+ if err == nil {
+ rdr.Close()
+ }
+ }()
+ return
+ case <-gotRdr:
}
if err != nil {
- errors[p] = err
+ errors <- err
+ cancel()
return
}
- defer rdr.Close()
+ go func() {
+ // Close the reader when the client
+ // hangs up or another piece fails
+ // (possibly interrupting ReadFull())
+ // or when all pieces succeed and
+ // get() returns.
+ <-ctx.Done()
+ rdr.Close()
+ }()
n, err := io.ReadFull(rdr, buf[startPos:endPos])
if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
// If we don't know the actual size,
// and just tried reading 64 MiB, it's
// normal to encounter EOF.
} else if err != nil {
- errors[p] = err
+ if ctx.Err() == nil {
+ errors <- err
+ }
+ cancel()
+ return
}
if p == pieces-1 {
actualSize = startPos + n
@@ -276,10 +318,12 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
}(p)
}
wg.Wait()
- for _, err := range errors {
- if err != nil {
- return 0, v.translateError(err)
- }
+ close(errors)
+ if len(errors) > 0 {
+ return 0, v.translateError(<-errors)
+ }
+ if ctx.Err() != nil {
+ return 0, ctx.Err()
}
return actualSize, nil
}
@@ -293,7 +337,23 @@ func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte
if trashed {
return os.ErrNotExist
}
- rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
+ var rdr io.ReadCloser
+ gotRdr := make(chan struct{})
+ go func() {
+ defer close(gotRdr)
+ rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+ }()
+ select {
+ case <-ctx.Done():
+ go func() {
+ <-gotRdr
+ if err == nil {
+ rdr.Close()
+ }
+ }()
+ return ctx.Err()
+ case <-gotRdr:
+ }
if err != nil {
return v.translateError(err)
}
@@ -306,7 +366,36 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err
if v.ReadOnly {
return MethodDisabledError
}
- return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+ // Send the block data through a pipe, so that (if we need to)
+ // we can close the pipe early and abandon our
+ // CreateBlockBlobFromReader() goroutine, without worrying
+ // about CreateBlockBlobFromReader() accessing our block
+ // buffer after we release it.
+ bufr, bufw := io.Pipe()
+ go func() {
+ io.Copy(bufw, bytes.NewReader(block))
+ bufw.Close()
+ }()
+ errChan := make(chan error)
+ go func() {
+ errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
+ // Our pipe might be stuck in Write(), waiting for
+ // io.Copy() to read. If so, un-stick it. This means
+ // CreateBlockBlobFromReader will get corrupt data,
+ // but that's OK: the size won't match, so the write
+ // will fail.
+ go io.Copy(ioutil.Discard, bufr)
+ // CloseWithError() will return once pending I/O is done.
+ bufw.CloseWithError(ctx.Err())
+ theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
+ return ctx.Err()
+ case err := <-errChan:
+ return err
+ }
}
// Touch updates the last-modified property of a block blob.
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c5dbc8f..232382c 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -576,6 +576,70 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
}
}
+func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ v.PutRaw(TestHash, TestBlock)
+ _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+ return err
+ })
+}
+
+func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ return v.Put(ctx, TestHash, make([]byte, BlockSize))
+ })
+}
+
+func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
+ testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+ v.PutRaw(TestHash, TestBlock)
+ return v.Compare(ctx, TestHash, TestBlock2)
+ })
+}
+
+func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
+ defer func(t http.RoundTripper) {
+ http.DefaultTransport = t
+ }(http.DefaultTransport)
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+
+ v := NewTestableAzureBlobVolume(t, false, 3)
+ defer v.Teardown()
+ v.azHandler.race = make(chan chan struct{})
+
+ ctx, cancel := context.WithCancel(context.Background())
+ allDone := make(chan struct{})
+ go func() {
+ defer close(allDone)
+ err := testFunc(ctx, v)
+ if err != context.Canceled {
+ t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+ }
+ }()
+ releaseHandler := make(chan struct{})
+ select {
+ case <-allDone:
+ t.Error("testFunc finished without waiting for v.azHandler.race")
+ case <-time.After(10 * time.Second):
+ t.Error("timed out waiting to enter handler")
+ case v.azHandler.race <- releaseHandler:
+ }
+
+ cancel()
+
+ select {
+ case <-time.After(10 * time.Second):
+ t.Error("timed out waiting to cancel")
+ case <-allDone:
+ }
+
+ go func() {
+ <-releaseHandler
+ }()
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list