[ARVADOS] created: 5f6f2e8e908829f3b73f423cba82458eb4b221f1

Git user git at public.curoverse.com
Mon Jan 2 13:59:44 EST 2017


        at  5f6f2e8e908829f3b73f423cba82458eb4b221f1 (commit)


commit 5f6f2e8e908829f3b73f423cba82458eb4b221f1
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jan 2 13:57:31 2017 -0500

    10682: Add backend stats for Azure volumes.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 7534489..3837598 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -103,7 +103,27 @@ type AzureBlobVolume struct {
 	RequestTimeout        arvados.Duration
 
 	azClient storage.Client
-	bsClient storage.BlobStorageClient
+	bsClient *azureBlobClient
+}
+
+// azureBlobClient wraps storage.BlobStorageClient in order to count
+// I/O and API usage stats.
+type azureBlobClient struct {
+	client *storage.BlobStorageClient
+	stats  azureBlobStats
+}
+
+type azureBlobStats struct {
+	statsTicker
+	Ops            uint64
+	GetOps         uint64
+	GetRangeOps    uint64
+	CreateOps      uint64
+	SetMetadataOps uint64
+	DelOps         uint64
+	ListOps        uint64
+
+	lock sync.Mutex
 }
 
 // Examples implements VolumeWithExamples.
@@ -147,7 +167,10 @@ func (v *AzureBlobVolume) Start() error {
 	v.azClient.HTTPClient = &http.Client{
 		Timeout: time.Duration(v.RequestTimeout),
 	}
-	v.bsClient = v.azClient.GetBlobService()
+	bs := v.azClient.GetBlobService()
+	v.bsClient = &azureBlobClient{
+		client: &bs,
+	}
 
 	ok, err := v.bsClient.ContainerExists(v.ContainerName)
 	if err != nil {
@@ -623,3 +646,72 @@ func (v *AzureBlobVolume) EmptyTrash() {
 
 	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *AzureBlobVolume) InternalStats() interface{} {
+	return &v.bsClient.stats
+}
+
+func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+	c.stats.Tick(&c.stats.Ops)
+	ok, err := c.client.ContainerExists(cname)
+	c.stats.TickErr(err)
+	return ok, err
+}
+
+func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+	c.stats.Tick(&c.stats.Ops)
+	m, err := c.client.GetBlobMetadata(cname, bname)
+	c.stats.TickErr(err)
+	return m, err
+}
+
+func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+	c.stats.Tick(&c.stats.Ops)
+	p, err := c.client.GetBlobProperties(cname, bname)
+	c.stats.TickErr(err)
+	return p, err
+}
+
+func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+	c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
+	rdr, err := c.client.GetBlob(cname, bname)
+	c.stats.TickErr(err)
+	return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+	c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
+	rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+	c.stats.TickErr(err)
+	return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+	c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
+	rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+	err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+	c.stats.TickErr(err)
+	return err
+}
+
+func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+	c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
+	err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+	c.stats.TickErr(err)
+	return err
+}
+
+func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+	c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
+	resp, err := c.client.ListBlobs(cname, params)
+	c.stats.TickErr(err)
+	return resp, err
+}
+
+func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+	c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
+	err := c.client.DeleteBlob(cname, bname, hdrs)
+	c.stats.TickErr(err)
+	return err
+}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 232382c..a120f1a 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -5,6 +5,7 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/base64"
+	"encoding/json"
 	"encoding/xml"
 	"flag"
 	"fmt"
@@ -23,6 +24,7 @@ import (
 
 	log "github.com/Sirupsen/logrus"
 	"github.com/curoverse/azure-sdk-for-go/storage"
+	check "gopkg.in/check.v1"
 )
 
 const (
@@ -366,12 +368,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
 		}
 	}
 
+	bs := azClient.GetBlobService()
 	v := &AzureBlobVolume{
 		ContainerName:    container,
 		ReadOnly:         readonly,
 		AzureReplication: replication,
 		azClient:         azClient,
-		bsClient:         azClient.GetBlobService(),
+		bsClient:         &azureBlobClient{client: &bs},
 	}
 
 	return &TestableAzureBlobVolume{
@@ -382,6 +385,29 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
 	}
 }
 
+var _ = check.Suite(&StubbedAzureBlobSuite{})
+
+type StubbedAzureBlobSuite struct {
+	volume            *TestableAzureBlobVolume
+	origHTTPTransport http.RoundTripper
+}
+
+func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+	s.origHTTPTransport = http.DefaultTransport
+	http.DefaultTransport = &http.Transport{
+		Dial: (&azStubDialer{}).Dial,
+	}
+	azureWriteRaceInterval = time.Millisecond
+	azureWriteRacePollTime = time.Nanosecond
+
+	s.volume = NewTestableAzureBlobVolume(c, false, 3)
+}
+
+func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+	s.volume.Teardown()
+	http.DefaultTransport = s.origHTTPTransport
+}
+
 func TestAzureBlobVolumeWithGeneric(t *testing.T) {
 	defer func(t http.RoundTripper) {
 		http.DefaultTransport = t
@@ -640,6 +666,35 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex
 	}()
 }
 
+func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+	stats := func() string {
+		buf, err := json.Marshal(s.volume.InternalStats())
+		c.Check(err, check.IsNil)
+		return string(buf)
+	}
+
+	c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+	c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+	loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+	_, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+	c.Check(err, check.NotNil)
+	c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+	c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+	c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+	err = s.volume.Put(context.Background(), loc, []byte("foo"))
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+	c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+
+	_, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+	c.Check(err, check.IsNil)
+	_, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
 	v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index ca5b1a2..c1d2105 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -14,7 +14,6 @@ import (
 	"regexp"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -456,10 +455,10 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 		Prefix:   "recent/" + prefix,
 		PageSize: v.IndexPageSize,
 	}
-	v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
-	v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+	v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+	v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
 	for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
-		v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+		v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
 		if data.Key >= "g" {
 			// Conveniently, "recent/*" and "trash/*" are
 			// lexically greater than all hex-encoded data
@@ -481,12 +480,12 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 		for recent != nil {
 			if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
 				recent = recentL.Next()
-				v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+				v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
 				continue
 			} else if cmp == 0 {
 				stamp = recent
 				recent = recentL.Next()
-				v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+				v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
 				break
 			} else {
 				// recent/X marker is missing: we'll
@@ -872,74 +871,58 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 	rdr, err := b.Bucket.GetReader(path)
-	b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+	b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
 	b.stats.tickErr(err)
-	return NewCountingReader(rdr, b.stats.tickInBytes), err
+	return NewCountingReader(rdr, b.stats.TickInBytes), err
 }
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
 	resp, err := b.Bucket.Head(path, headers)
-	b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+	b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
 	b.stats.tickErr(err)
 	return resp, err
 }
 
 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
-	err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
-	b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+	err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
+	b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
 	b.stats.tickErr(err)
 	return err
 }
 
 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
-	err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
-	b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+	err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
+	b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
 	b.stats.tickErr(err)
 	return err
 }
 
 func (b *s3bucket) Del(path string) error {
 	err := b.Bucket.Del(path)
-	b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+	b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
 	b.stats.tickErr(err)
 	return err
 }
 
 type s3bucketStats struct {
-	Errors   uint64
-	Ops      uint64
-	GetOps   uint64
-	PutOps   uint64
-	HeadOps  uint64
-	DelOps   uint64
-	ListOps  uint64
-	InBytes  uint64
-	OutBytes uint64
+	statsTicker
+	Ops     uint64
+	GetOps  uint64
+	PutOps  uint64
+	HeadOps uint64
+	DelOps  uint64
+	ListOps uint64
 
 	ErrorCodes map[string]uint64 `json:",omitempty"`
 
 	lock sync.Mutex
 }
 
-func (s *s3bucketStats) tickInBytes(n uint64) {
-	atomic.AddUint64(&s.InBytes, n)
-}
-
-func (s *s3bucketStats) tickOutBytes(n uint64) {
-	atomic.AddUint64(&s.OutBytes, n)
-}
-
-func (s *s3bucketStats) tick(counters ...*uint64) {
-	for _, counter := range counters {
-		atomic.AddUint64(counter, 1)
-	}
-}
-
 func (s *s3bucketStats) tickErr(err error) {
 	if err == nil {
 		return
 	}
-	atomic.AddUint64(&s.Errors, 1)
+	s.TickErr(err)
 	errStr := fmt.Sprintf("%T", err)
 	if err, ok := err.(*s3.Error); ok {
 		errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
diff --git a/services/keepstore/stats.go b/services/keepstore/stats.go
new file mode 100644
index 0000000..02d260c
--- /dev/null
+++ b/services/keepstore/stats.go
@@ -0,0 +1,34 @@
+package main
+
+import (
+	"sync/atomic"
+)
+
+type statsTicker struct {
+	Errors   uint64
+	InBytes  uint64
+	OutBytes uint64
+}
+
+// Tick increments each of the given counters by 1 using
+// atomic.AddUint64.
+func (s *statsTicker) Tick(counters ...*uint64) {
+	for _, counter := range counters {
+		atomic.AddUint64(counter, 1)
+	}
+}
+
+func (s *statsTicker) TickErr(err error) {
+	if err == nil {
+		return
+	}
+	s.Tick(&s.Errors)
+}
+
+func (s *statsTicker) TickInBytes(n uint64) {
+	atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *statsTicker) TickOutBytes(n uint64) {
+	atomic.AddUint64(&s.OutBytes, n)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list