[ARVADOS] created: 5f6f2e8e908829f3b73f423cba82458eb4b221f1
Git user
git at public.curoverse.com
Mon Jan 2 13:59:44 EST 2017
at 5f6f2e8e908829f3b73f423cba82458eb4b221f1 (commit)
commit 5f6f2e8e908829f3b73f423cba82458eb4b221f1
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jan 2 13:57:31 2017 -0500
10682: Add backend stats for Azure volumes.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 7534489..3837598 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -103,7 +103,27 @@ type AzureBlobVolume struct {
RequestTimeout arvados.Duration
azClient storage.Client
- bsClient storage.BlobStorageClient
+ bsClient *azureBlobClient
+}
+
+// azureBlobClient wraps storage.BlobStorageClient in order to count
+// I/O and API usage stats.
+type azureBlobClient struct {
+ client *storage.BlobStorageClient
+ stats azureBlobStats
+}
+
+type azureBlobStats struct {
+ statsTicker
+ Ops uint64
+ GetOps uint64
+ GetRangeOps uint64
+ CreateOps uint64
+ SetMetadataOps uint64
+ DelOps uint64
+ ListOps uint64
+
+ lock sync.Mutex
}
// Examples implements VolumeWithExamples.
@@ -147,7 +167,10 @@ func (v *AzureBlobVolume) Start() error {
v.azClient.HTTPClient = &http.Client{
Timeout: time.Duration(v.RequestTimeout),
}
- v.bsClient = v.azClient.GetBlobService()
+ bs := v.azClient.GetBlobService()
+ v.bsClient = &azureBlobClient{
+ client: &bs,
+ }
ok, err := v.bsClient.ContainerExists(v.ContainerName)
if err != nil {
@@ -623,3 +646,72 @@ func (v *AzureBlobVolume) EmptyTrash() {
log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
+
+// InternalStats returns bucket I/O and API call counters.
+func (v *AzureBlobVolume) InternalStats() interface{} {
+ return &v.bsClient.stats
+}
+
+func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+ c.stats.Tick(&c.stats.Ops)
+ ok, err := c.client.ContainerExists(cname)
+ c.stats.TickErr(err)
+ return ok, err
+}
+
+func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+ c.stats.Tick(&c.stats.Ops)
+ m, err := c.client.GetBlobMetadata(cname, bname)
+ c.stats.TickErr(err)
+ return m, err
+}
+
+func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+ c.stats.Tick(&c.stats.Ops)
+ p, err := c.client.GetBlobProperties(cname, bname)
+ c.stats.TickErr(err)
+ return p, err
+}
+
+func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+ c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
+ rdr, err := c.client.GetBlob(cname, bname)
+ c.stats.TickErr(err)
+ return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+ c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
+ rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+ c.stats.TickErr(err)
+ return NewCountingReader(rdr, c.stats.TickInBytes), err
+}
+
+func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+ c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
+ rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+ err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+ c.stats.TickErr(err)
+ return err
+}
+
+func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+ c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
+ err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+ c.stats.TickErr(err)
+ return err
+}
+
+func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
+ resp, err := c.client.ListBlobs(cname, params)
+ c.stats.TickErr(err)
+ return resp, err
+}
+
+func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+ c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
+ err := c.client.DeleteBlob(cname, bname, hdrs)
+ c.stats.TickErr(err)
+ return err
+}
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 232382c..a120f1a 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -5,6 +5,7 @@ import (
"context"
"crypto/md5"
"encoding/base64"
+ "encoding/json"
"encoding/xml"
"flag"
"fmt"
@@ -23,6 +24,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/curoverse/azure-sdk-for-go/storage"
+ check "gopkg.in/check.v1"
)
const (
@@ -366,12 +368,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
}
}
+ bs := azClient.GetBlobService()
v := &AzureBlobVolume{
ContainerName: container,
ReadOnly: readonly,
AzureReplication: replication,
azClient: azClient,
- bsClient: azClient.GetBlobService(),
+ bsClient: &azureBlobClient{client: &bs},
}
return &TestableAzureBlobVolume{
@@ -382,6 +385,29 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
}
}
+var _ = check.Suite(&StubbedAzureBlobSuite{})
+
+type StubbedAzureBlobSuite struct {
+ volume *TestableAzureBlobVolume
+ origHTTPTransport http.RoundTripper
+}
+
+func (s *StubbedAzureBlobSuite) SetUpTest(c *check.C) {
+ s.origHTTPTransport = http.DefaultTransport
+ http.DefaultTransport = &http.Transport{
+ Dial: (&azStubDialer{}).Dial,
+ }
+ azureWriteRaceInterval = time.Millisecond
+ azureWriteRacePollTime = time.Nanosecond
+
+ s.volume = NewTestableAzureBlobVolume(c, false, 3)
+}
+
+func (s *StubbedAzureBlobSuite) TearDownTest(c *check.C) {
+ s.volume.Teardown()
+ http.DefaultTransport = s.origHTTPTransport
+}
+
func TestAzureBlobVolumeWithGeneric(t *testing.T) {
defer func(t http.RoundTripper) {
http.DefaultTransport = t
@@ -640,6 +666,35 @@ func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Contex
}()
}
+func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
+ stats := func() string {
+ buf, err := json.Marshal(s.volume.InternalStats())
+ c.Check(err, check.IsNil)
+ return string(buf)
+ }
+
+ c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+ c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.NotNil)
+ c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+ err = s.volume.Put(context.Background(), loc, []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+ c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+
+ _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
v.azHandler.PutRaw(v.ContainerName, locator, data)
}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index ca5b1a2..c1d2105 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -14,7 +14,6 @@ import (
"regexp"
"strings"
"sync"
- "sync/atomic"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -456,10 +455,10 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
}
- v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
- v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
- v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
if data.Key >= "g" {
// Conveniently, "recent/*" and "trash/*" are
// lexically greater than all hex-encoded data
@@ -481,12 +480,12 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
for recent != nil {
if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
recent = recentL.Next()
- v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
continue
} else if cmp == 0 {
stamp = recent
recent = recentL.Next()
- v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
break
} else {
// recent/X marker is missing: we'll
@@ -872,74 +871,58 @@ type s3bucket struct {
func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
rdr, err := b.Bucket.GetReader(path)
- b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+ b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.tickErr(err)
- return NewCountingReader(rdr, b.stats.tickInBytes), err
+ return NewCountingReader(rdr, b.stats.TickInBytes), err
}
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
resp, err := b.Bucket.Head(path, headers)
- b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+ b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
b.stats.tickErr(err)
return resp, err
}
func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
- err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
- b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
+ b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.tickErr(err)
return err
}
func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
- err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
- b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
+ b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.tickErr(err)
return err
}
func (b *s3bucket) Del(path string) error {
err := b.Bucket.Del(path)
- b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.tickErr(err)
return err
}
type s3bucketStats struct {
- Errors uint64
- Ops uint64
- GetOps uint64
- PutOps uint64
- HeadOps uint64
- DelOps uint64
- ListOps uint64
- InBytes uint64
- OutBytes uint64
+ statsTicker
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
}
-func (s *s3bucketStats) tickInBytes(n uint64) {
- atomic.AddUint64(&s.InBytes, n)
-}
-
-func (s *s3bucketStats) tickOutBytes(n uint64) {
- atomic.AddUint64(&s.OutBytes, n)
-}
-
-func (s *s3bucketStats) tick(counters ...*uint64) {
- for _, counter := range counters {
- atomic.AddUint64(counter, 1)
- }
-}
-
func (s *s3bucketStats) tickErr(err error) {
if err == nil {
return
}
- atomic.AddUint64(&s.Errors, 1)
+ s.TickErr(err)
errStr := fmt.Sprintf("%T", err)
if err, ok := err.(*s3.Error); ok {
errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
diff --git a/services/keepstore/stats.go b/services/keepstore/stats.go
new file mode 100644
index 0000000..02d260c
--- /dev/null
+++ b/services/keepstore/stats.go
@@ -0,0 +1,34 @@
+package main
+
+import (
+ "sync/atomic"
+)
+
+type statsTicker struct {
+ Errors uint64
+ InBytes uint64
+ OutBytes uint64
+}
+
+// Tick increments each of the given counters by 1 using
+// atomic.AddUint64.
+func (s *statsTicker) Tick(counters ...*uint64) {
+ for _, counter := range counters {
+ atomic.AddUint64(counter, 1)
+ }
+}
+
+func (s *statsTicker) TickErr(err error) {
+ if err == nil {
+ return
+ }
+ s.Tick(&s.Errors)
+}
+
+func (s *statsTicker) TickInBytes(n uint64) {
+ atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *statsTicker) TickOutBytes(n uint64) {
+ atomic.AddUint64(&s.OutBytes, n)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list