[ARVADOS] created: 837949653b69e357cfa90fb0b8855a37e9c406d7

Git user git at public.curoverse.com
Tue Jan 10 15:51:48 EST 2017


        at  837949653b69e357cfa90fb0b8855a37e9c406d7 (commit)


commit 837949653b69e357cfa90fb0b8855a37e9c406d7
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jan 10 15:51:34 2017 -0500

    10682: Report IO stats for filesystem-backed volumes.

diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 234eec1..681095d 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -111,6 +111,8 @@ type UnixVolume struct {
 	// something to lock during IO, typically a sync.Mutex (or nil
 	// to skip locking)
 	locker sync.Locker
+
+	os osWithStats
 }
 
 // Examples implements VolumeWithExamples.
@@ -145,7 +147,7 @@ func (v *UnixVolume) Start() error {
 	if v.DirectoryReplication == 0 {
 		v.DirectoryReplication = 1
 	}
-	_, err := os.Stat(v.Root)
+	_, err := v.os.Stat(v.Root)
 	return err
 }
 
@@ -155,7 +157,7 @@ func (v *UnixVolume) Touch(loc string) error {
 		return MethodDisabledError
 	}
 	p := v.blockPath(loc)
-	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+	f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
 	if err != nil {
 		return err
 	}
@@ -164,18 +166,21 @@ func (v *UnixVolume) Touch(loc string) error {
 		return err
 	}
 	defer v.unlock()
-	if e := lockfile(f); e != nil {
+	if e := v.lockfile(f); e != nil {
 		return e
 	}
-	defer unlockfile(f)
+	defer v.unlockfile(f)
 	ts := syscall.NsecToTimespec(time.Now().UnixNano())
-	return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
+	v.os.stats.Tick(&v.os.stats.UtimesOps)
+	err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
+	v.os.stats.TickErr(err)
+	return err
 }
 
 // Mtime returns the stored timestamp for the given locator.
 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 	p := v.blockPath(loc)
-	fi, err := os.Stat(p)
+	fi, err := v.os.Stat(p)
 	if err != nil {
 		return time.Time{}, err
 	}
@@ -189,17 +194,17 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader
 		return err
 	}
 	defer v.unlock()
-	f, err := os.Open(path)
+	f, err := v.os.Open(path)
 	if err != nil {
 		return err
 	}
 	defer f.Close()
-	return fn(f)
+	return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
 }
 
 // stat is os.Stat() with some extra sanity checks.
 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
-	stat, err := os.Stat(path)
+	stat, err := v.os.Stat(path)
 	if err == nil {
 		if stat.Size() < 0 {
 			err = os.ErrInvalid
@@ -268,7 +273,7 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 		return err
 	}
 
-	tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
+	tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
 	if tmperr != nil {
 		log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
 		return tmperr
@@ -280,21 +285,22 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 		return err
 	}
 	defer v.unlock()
-	if _, err := io.Copy(tmpfile, rdr); err != nil {
+	n, err := io.Copy(tmpfile, rdr)
+	v.os.stats.TickOutBytes(uint64(n))
+	if err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
 		tmpfile.Close()
-		os.Remove(tmpfile.Name())
+		v.os.Remove(tmpfile.Name())
 		return err
 	}
 	if err := tmpfile.Close(); err != nil {
 		log.Printf("closing %s: %s\n", tmpfile.Name(), err)
-		os.Remove(tmpfile.Name())
+		v.os.Remove(tmpfile.Name())
 		return err
 	}
-	if err := os.Rename(tmpfile.Name(), bpath); err != nil {
+	if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
 		log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
-		os.Remove(tmpfile.Name())
-		return err
+		return v.os.Remove(tmpfile.Name())
 	}
 	return nil
 }
@@ -303,18 +309,15 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 // current state, or nil if an error occurs.
 //
 func (v *UnixVolume) Status() *VolumeStatus {
-	var fs syscall.Statfs_t
-	var devnum uint64
-
-	if fi, err := os.Stat(v.Root); err == nil {
-		devnum = fi.Sys().(*syscall.Stat_t).Dev
-	} else {
+	fi, err := v.os.Stat(v.Root)
+	if err != nil {
 		log.Printf("%s: os.Stat: %s\n", v, err)
 		return nil
 	}
+	devnum := fi.Sys().(*syscall.Stat_t).Dev
 
-	err := syscall.Statfs(v.Root, &fs)
-	if err != nil {
+	var fs syscall.Statfs_t
+	if err := syscall.Statfs(v.Root, &fs); err != nil {
 		log.Printf("%s: statfs: %s\n", v, err)
 		return nil
 	}
@@ -350,7 +353,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 	var lastErr error
-	rootdir, err := os.Open(v.Root)
+	rootdir, err := v.os.Open(v.Root)
 	if err != nil {
 		return err
 	}
@@ -370,7 +373,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 			continue
 		}
 		blockdirpath := filepath.Join(v.Root, names[0])
-		blockdir, err := os.Open(blockdirpath)
+		blockdir, err := v.os.Open(blockdirpath)
 		if err != nil {
 			log.Print("Error reading ", blockdirpath, ": ", err)
 			lastErr = err
@@ -423,31 +426,31 @@ func (v *UnixVolume) Trash(loc string) error {
 	}
 	defer v.unlock()
 	p := v.blockPath(loc)
-	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+	f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
 	if err != nil {
 		return err
 	}
 	defer f.Close()
-	if e := lockfile(f); e != nil {
+	if e := v.lockfile(f); e != nil {
 		return e
 	}
-	defer unlockfile(f)
+	defer v.unlockfile(f)
 
 	// If the block has been PUT in the last blobSignatureTTL
 	// seconds, return success without removing the block. This
 	// protects data from garbage collection until it is no longer
 	// possible for clients to retrieve the unreferenced blocks
 	// anyway (because the permission signatures have expired).
-	if fi, err := os.Stat(p); err != nil {
+	if fi, err := v.os.Stat(p); err != nil {
 		return err
 	} else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
 		return nil
 	}
 
 	if theConfig.TrashLifetime == 0 {
-		return os.Remove(p)
+		return v.os.Remove(p)
 	}
-	return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
+	return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
@@ -472,7 +475,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 	for _, f := range files {
 		if strings.HasPrefix(f.Name(), prefix) {
 			foundTrash = true
-			err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+			err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
 			if err == nil {
 				break
 			}
@@ -559,6 +562,11 @@ func (v *UnixVolume) Replication() int {
 	return v.DirectoryReplication
 }
 
+// InternalStats returns I/O and filesystem ops counters.
+func (v *UnixVolume) InternalStats() interface{} {
+	return &v.os.stats
+}
+
 // lock acquires the serialize lock, if one is in use. If ctx is done
 // before the lock is acquired, lock returns ctx.Err() instead of
 // acquiring the lock.
@@ -592,12 +600,17 @@ func (v *UnixVolume) unlock() {
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func lockfile(f *os.File) error {
-	return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+func (v *UnixVolume) lockfile(f *os.File) error {
+	v.os.stats.Tick(&v.os.stats.FlockOps)
+	err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+	v.os.stats.TickErr(err)
+	return err
 }
 
-func unlockfile(f *os.File) error {
-	return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+func (v *UnixVolume) unlockfile(f *os.File) error {
+	err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+	v.os.stats.TickErr(err)
+	return err
 }
 
 // Where appropriate, translate a more specific filesystem error to an
@@ -643,7 +656,7 @@ func (v *UnixVolume) EmptyTrash() {
 		if deadline > time.Now().Unix() {
 			return nil
 		}
-		err = os.Remove(path)
+		err = v.os.Remove(path)
 		if err != nil {
 			log.Printf("EmptyTrash: Remove %v: %v", path, err)
 			return nil
@@ -659,3 +672,68 @@ func (v *UnixVolume) EmptyTrash() {
 
 	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
+
+type unixStats struct {
+	statsTicker
+	OpenOps    uint64
+	StatOps    uint64
+	FlockOps   uint64
+	UtimesOps  uint64
+	CreateOps  uint64
+	RenameOps  uint64
+	UnlinkOps  uint64
+	ReaddirOps uint64
+}
+
+func (s *unixStats) TickErr(err error) {
+	if err == nil {
+		return
+	}
+	s.statsTicker.TickErr(err, fmt.Sprintf("%T", err))
+}
+
+type osWithStats struct {
+	stats unixStats
+}
+
+func (o *osWithStats) Open(name string) (*os.File, error) {
+	o.stats.Tick(&o.stats.OpenOps)
+	f, err := os.Open(name)
+	o.stats.TickErr(err)
+	return f, err
+}
+
+func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+	o.stats.Tick(&o.stats.OpenOps)
+	f, err := os.OpenFile(name, flag, perm)
+	o.stats.TickErr(err)
+	return f, err
+}
+
+func (o *osWithStats) Remove(path string) error {
+	o.stats.Tick(&o.stats.UnlinkOps)
+	err := os.Remove(path)
+	o.stats.TickErr(err)
+	return err
+}
+
+func (o *osWithStats) Rename(a, b string) error {
+	o.stats.Tick(&o.stats.RenameOps)
+	err := os.Rename(a, b)
+	o.stats.TickErr(err)
+	return err
+}
+
+func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+	o.stats.Tick(&o.stats.StatOps)
+	fi, err := os.Stat(path)
+	o.stats.TickErr(err)
+	return fi, err
+}
+
+func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+	o.stats.Tick(&o.stats.CreateOps)
+	f, err := ioutil.TempFile(dir, base)
+	o.stats.TickErr(err)
+	return f, err
+}
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index 7b02a15..0edf9b8 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -3,6 +3,7 @@ package main
 import (
 	"bytes"
 	"context"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -13,6 +14,8 @@ import (
 	"syscall"
 	"testing"
 	"time"
+
+	check "gopkg.in/check.v1"
 )
 
 type TestableUnixVolume struct {
@@ -362,3 +365,61 @@ func TestUnixVolumeContextCancelGet(t *testing.T) {
 		t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
 	}
 }
+
+var _ = check.Suite(&UnixVolumeSuite{})
+
+type UnixVolumeSuite struct {
+	volume *TestableUnixVolume
+}
+
+func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
+	if s.volume != nil {
+		s.volume.Teardown()
+	}
+}
+
+func (s *UnixVolumeSuite) TestStats(c *check.C) {
+	s.volume = NewTestableUnixVolume(c, false, false)
+	stats := func() string {
+		buf, err := json.Marshal(s.volume.InternalStats())
+		c.Check(err, check.IsNil)
+		return string(buf)
+	}
+
+	c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
+	c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
+
+	loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+	_, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+	c.Check(err, check.NotNil)
+	c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
+	c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
+	c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
+	c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+	c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
+	c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
+
+	err = s.volume.Put(context.Background(), loc, []byte("foo"))
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+	c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
+	c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
+	c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
+
+	err = s.volume.Touch(loc)
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
+	c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
+	c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
+
+	_, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+	c.Check(err, check.IsNil)
+	err = s.volume.Compare(context.Background(), loc, []byte("foo"))
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+	c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
+
+	err = s.volume.Trash(loc)
+	c.Check(err, check.IsNil)
+	c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list