[ARVADOS] updated: 97ece5619eb80acce46ec5c7b521c08ecaaf0e86

git at public.curoverse.com git at public.curoverse.com
Mon Oct 12 12:00:51 EDT 2015


Summary of changes:
 services/keepstore/azure_blob_volume.go      |  44 +++++++++
 services/keepstore/azure_blob_volume_test.go | 129 ++++++++++++++++++++++++++-
 2 files changed, 172 insertions(+), 1 deletion(-)

       via  97ece5619eb80acce46ec5c7b521c08ecaaf0e86 (commit)
       via  5b8507cca827ef8de5d2bdc74dd74659f2c67e39 (commit)
       via  cf85808b2e97062450565c93ace5025bbd6db365 (commit)
       via  e028e5daadb83ef695b7e0b1ef6bd45eef4cfc29 (commit)
       via  634d2e25d36b20961d660df0e69628e60099d893 (commit)
       via  14a47e56d3afc1d3f3d7be3bf5ad9d9f6d60b6a2 (commit)
      from  e55f6e9fcb652a2b1505364b34d9d48e79adaeaf (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 97ece5619eb80acce46ec5c7b521c08ecaaf0e86
Merge: e55f6e9 5b8507c
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Oct 12 12:06:47 2015 -0400

    Merge branch '7159-empty-blob-race' refs #7159


commit 5b8507cca827ef8de5d2bdc74dd74659f2c67e39
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Oct 12 09:33:24 2015 -0400

    7159: Shorten race waits during generic tests

diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index c3fea9a..a4c6e62 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -339,6 +339,8 @@ func TestAzureBlobVolumeWithGeneric(t *testing.T) {
 	http.DefaultTransport = &http.Transport{
 		Dial: (&azStubDialer{}).Dial,
 	}
+	azureWriteRaceInterval = time.Millisecond
+	azureWriteRacePollTime = time.Nanosecond
 	DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
 		return NewTestableAzureBlobVolume(t, false, azureStorageReplication)
 	})
@@ -351,6 +353,8 @@ func TestReadonlyAzureBlobVolumeWithGeneric(t *testing.T) {
 	http.DefaultTransport = &http.Transport{
 		Dial: (&azStubDialer{}).Dial,
 	}
+	azureWriteRaceInterval = time.Millisecond
+	azureWriteRacePollTime = time.Nanosecond
 	DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
 		return NewTestableAzureBlobVolume(t, true, azureStorageReplication)
 	})

commit cf85808b2e97062450565c93ace5025bbd6db365
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Oct 9 17:09:42 2015 -0400

    7159: Log when waiting for get/put races

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 7545e52..9531564 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -122,7 +122,7 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
 	var deadline time.Time
 	haveDeadline := false
 	buf, err := v.get(loc)
-	for err == nil && len(buf) == 0 && loc[:32] != "d41d8cd98f00b204e9800998ecf8427e" {
+	for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
 		// Seeing a brand new empty block probably means we're
 		// in a race with CreateBlob, which under the hood
 		// (apparently) does "CreateEmpty" and "CommitData"
@@ -134,15 +134,21 @@ func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
 				break
 			}
 			deadline = t.Add(azureWriteRaceInterval)
+			if time.Now().After(deadline) {
+				break
+			}
+			log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
 			haveDeadline = true
-		}
-		if time.Now().After(deadline) {
+		} else if time.Now().After(deadline) {
 			break
 		}
 		bufs.Put(buf)
 		time.Sleep(azureWriteRacePollTime)
 		buf, err = v.get(loc)
 	}
+	if haveDeadline {
+		log.Printf("Race ended with len(buf)==%d", len(buf))
+	}
 	return buf, err
 }
 

commit e028e5daadb83ef695b7e0b1ef6bd45eef4cfc29
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Oct 8 14:00:00 2015 -0400

    7159: Exclude new empty blocks from index.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 836b8f1..7545e52 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -220,6 +220,14 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
 			if err != nil {
 				return err
 			}
+			if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
+				// A new zero-length blob is probably
+				// just a new non-empty blob that
+				// hasn't committed its data yet (see
+				// Get()), and in any case has no
+				// value.
+				continue
+			}
 			fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
 		}
 		if resp.NextMarker == "" {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 74c94ec..c3fea9a 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bytes"
 	"encoding/base64"
 	"encoding/xml"
 	"flag"
@@ -423,6 +424,13 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 	azureWriteRacePollTime = 5 * time.Millisecond
 
 	v.PutRaw(TestHash, []byte{})
+
+	buf := new(bytes.Buffer)
+	v.IndexTo("", buf)
+	if buf.Len() != 0 {
+		t.Errorf("Index %+q should be empty", buf.Bytes())
+	}
+
 	v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
 
 	allDone := make(chan struct{})
@@ -443,6 +451,12 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
 	case <-time.After(time.Second):
 		t.Error("Get should have stopped waiting for race when block was 2s old")
 	}
+
+	buf.Reset()
+	v.IndexTo("", buf)
+	if !bytes.HasPrefix(buf.Bytes(), []byte(TestHash+"+0")) {
+		t.Errorf("Index %+q should have %+q", buf.Bytes(), TestHash+"+0")
+	}
 }
 
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {

commit 634d2e25d36b20961d660df0e69628e60099d893
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Oct 8 13:30:18 2015 -0400

    7159: Test race deadline

diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 889a026..74c94ec 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -408,6 +408,43 @@ func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
 	<-allDone
 }
 
+func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
+	defer func(t http.RoundTripper) {
+		http.DefaultTransport = t
+	}(http.DefaultTransport)
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+
+	v := NewTestableAzureBlobVolume(t, false, 3)
+	defer v.Teardown()
+
+	azureWriteRaceInterval = 2 * time.Second
+	azureWriteRacePollTime = 5 * time.Millisecond
+
+	v.PutRaw(TestHash, []byte{})
+	v.TouchWithDate(TestHash, time.Now().Add(-1982 * time.Millisecond))
+
+	allDone := make(chan struct{})
+	go func() {
+		defer close(allDone)
+		buf, err := v.Get(TestHash)
+		if err != nil {
+			t.Error(err)
+			return
+		}
+		if len(buf) != 0 {
+			t.Errorf("Got %+q, expected empty buf", buf)
+		}
+		bufs.Put(buf)
+	}()
+	select {
+	case <-allDone:
+	case <-time.After(time.Second):
+		t.Error("Get should have stopped waiting for race when block was 2s old")
+	}
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
 	v.azHandler.PutRaw(v.containerName, locator, data)
 }

commit 14a47e56d3afc1d3f3d7be3bf5ad9d9f6d60b6a2
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Oct 8 12:52:17 2015 -0400

    7159: Work around CreateBlob race by polling for updates when a brand new blob is found empty.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 79123a9..836b8f1 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -19,6 +19,8 @@ var (
 	azureStorageAccountName    string
 	azureStorageAccountKeyFile string
 	azureStorageReplication    int
+	azureWriteRaceInterval     time.Duration = 15 * time.Second
+	azureWriteRacePollTime     time.Duration = time.Second
 )
 
 func readKeyFromFile(file string) (string, error) {
@@ -117,6 +119,34 @@ func (v *AzureBlobVolume) Check() error {
 }
 
 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
+	var deadline time.Time
+	haveDeadline := false
+	buf, err := v.get(loc)
+	for err == nil && len(buf) == 0 && loc[:32] != "d41d8cd98f00b204e9800998ecf8427e" {
+		// Seeing a brand new empty block probably means we're
+		// in a race with CreateBlob, which under the hood
+		// (apparently) does "CreateEmpty" and "CommitData"
+		// with no additional transaction locking.
+		if !haveDeadline {
+			t, err := v.Mtime(loc)
+			if err != nil {
+				log.Print("Got empty block (possible race) but Mtime failed: ", err)
+				break
+			}
+			deadline = t.Add(azureWriteRaceInterval)
+			haveDeadline = true
+		}
+		if time.Now().After(deadline) {
+			break
+		}
+		bufs.Put(buf)
+		time.Sleep(azureWriteRacePollTime)
+		buf, err = v.get(loc)
+	}
+	return buf, err
+}
+
+func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
 	rdr, err := v.bsClient.GetBlob(v.containerName, loc)
 	if err != nil {
 		if strings.Contains(err.Error(), "404 Not Found") {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 66b0ea0..889a026 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -49,6 +49,7 @@ type azBlob struct {
 type azStubHandler struct {
 	sync.Mutex
 	blobs map[string]*azBlob
+	race  chan chan struct{}
 }
 
 func newAzStubHandler() *azStubHandler {
@@ -75,6 +76,21 @@ func (h *azStubHandler) PutRaw(container, hash string, data []byte) {
 	}
 }
 
+func (h *azStubHandler) unlockAndRace() {
+	if h.race == nil {
+		return
+	}
+	h.Unlock()
+	// Signal caller that race is starting by reading from
+	// h.race. If we get a channel, block until that channel is
+	// ready to receive. If we get nil (or h.race is closed) just
+	// proceed.
+	if c := <-h.race; c != nil {
+		c <- struct{}{}
+	}
+	h.Lock()
+}
+
 func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 	h.Lock()
 	defer h.Unlock()
@@ -108,6 +124,18 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 	switch {
 	case r.Method == "PUT" && r.Form.Get("comp") == "":
 		// "Put Blob" API
+		if _, ok := h.blobs[container+"|"+hash]; !ok {
+			// Like the real Azure service, we offer a
+			// race window during which other clients can
+			// list/get the new blob before any data is
+			// committed.
+			h.blobs[container+"|"+hash] = &azBlob{
+				Mtime:       time.Now(),
+				Uncommitted: make(map[string][]byte),
+				Etag:        makeEtag(),
+			}
+			h.unlockAndRace()
+		}
 		h.blobs[container+"|"+hash] = &azBlob{
 			Data:        body,
 			Mtime:       time.Now(),
@@ -182,6 +210,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
 				log.Printf("write %+q: %s", blob.Data, err)
 			}
 		}
+		h.unlockAndRace()
 	case r.Method == "DELETE" && hash != "":
 		// "Delete Blob" API
 		if !blobExists {
@@ -265,7 +294,7 @@ type TestableAzureBlobVolume struct {
 	t         *testing.T
 }
 
-func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) TestableVolume {
+func NewTestableAzureBlobVolume(t *testing.T, readonly bool, replication int) *TestableAzureBlobVolume {
 	azHandler := newAzStubHandler()
 	azStub := httptest.NewServer(azHandler)
 
@@ -336,6 +365,49 @@ func TestAzureBlobVolumeReplication(t *testing.T) {
 	}
 }
 
+func TestAzureBlobVolumeCreateBlobRace(t *testing.T) {
+	defer func(t http.RoundTripper) {
+		http.DefaultTransport = t
+	}(http.DefaultTransport)
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+
+	v := NewTestableAzureBlobVolume(t, false, 3)
+	defer v.Teardown()
+
+	azureWriteRaceInterval = time.Second
+	azureWriteRacePollTime = time.Millisecond
+
+	allDone := make(chan struct{})
+	v.azHandler.race = make(chan chan struct{})
+	go func() {
+		err := v.Put(TestHash, TestBlock)
+		if err != nil {
+			t.Error(err)
+		}
+	}()
+	continuePut := make(chan struct{})
+	// Wait for the stub's Put to create the empty blob
+	v.azHandler.race <- continuePut
+	go func() {
+		buf, err := v.Get(TestHash)
+		if err != nil {
+			t.Error(err)
+		} else {
+			bufs.Put(buf)
+		}
+		close(allDone)
+	}()
+	// Wait for the stub's Get to get the empty blob
+	close(v.azHandler.race)
+	// Allow stub's Put to continue, so the real data is ready
+	// when the volume's Get retries
+	<-continuePut
+	// Wait for volume's Get to return the real data
+	<-allDone
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
 	v.azHandler.PutRaw(v.containerName, locator, data)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list