[ARVADOS] updated: 32f7abdccc3ad293266e8df3e50344614ecd0dac
Git user
git at public.curoverse.com
Wed Mar 9 11:31:11 EST 2016
Summary of changes:
services/keepstore/azure_blob_volume.go | 7 ++
services/keepstore/keepstore.go | 43 +++++++-
services/keepstore/s3_volume.go | 8 ++
services/keepstore/volume.go | 4 +
services/keepstore/volume_generic_test.go | 159 +++++++++++++++++++++++++++++-
services/keepstore/volume_test.go | 4 +
services/keepstore/volume_unix.go | 80 ++++++++++++---
7 files changed, 288 insertions(+), 17 deletions(-)
via 32f7abdccc3ad293266e8df3e50344614ecd0dac (commit)
from 2be52f02cbf3d13f10c50617228a20db613221f5 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 32f7abdccc3ad293266e8df3e50344614ecd0dac
Author: radhika <radhika at curoverse.com>
Date: Wed Mar 9 11:29:57 2016 -0500
8554: Add EmptyTrash api to Volume and implementation in volume_unix. Add emptyTrash goroutine.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 7dfb84d..d096dc6 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -395,3 +395,10 @@ var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
func (v *AzureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *AzureBlobVolume) EmptyTrash() error {
+ return nil
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 104ae89..0861623 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -55,9 +55,13 @@ var dataManagerToken string
// actually deleting anything.
var neverDelete = true
-// trashLifetime is the time duration after a block is trashed
+// trashLifetime is the time duration in seconds after a block is trashed
// during which it can be recovered using an /untrash request
-var trashLifetime time.Duration
+var trashLifetime int
+
+// Interval in seconds at which the emptyTrash goroutine will check
+// and delete expired trashed blocks. Default is once a day.
+var trashCheckInterval int
var maxBuffers = 128
var bufs *bufferPool
@@ -205,11 +209,16 @@ func main() {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.DurationVar(
+ flag.IntVar(
&trashLifetime,
"trash-lifetime",
- 0*time.Second,
+ 0,
"Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
+ flag.IntVar(
+ &trashCheckInterval,
+ "trash-check-interval",
+ 24*60*60,
+ "Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
flag.Parse()
@@ -321,10 +330,14 @@ func main() {
trashq = NewWorkQueue()
go RunTrashWorker(trashq)
+ // Start emptyTrash goroutine
+ go emptyTrash(trashCheckInterval)
+
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
+ doneEmptyingTrash <- true
s := <-sig
log.Println("caught signal:", s)
listener.Close()
@@ -336,3 +349,25 @@ func main() {
srv := &http.Server{Addr: listen}
srv.Serve(listener)
}
+
+// Channel to stop emptying trash
+var doneEmptyingTrash = make(chan bool)
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(trashCheckInterval int) {
+ ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+
+ for {
+ select {
+ case <-ticker.C:
+ for _, v := range volumes {
+ if v.Writable() {
+ v.EmptyTrash()
+ }
+ }
+ case <-doneEmptyingTrash:
+ ticker.Stop()
+ return
+ }
+ }
+}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 7d9ba8a..5bcab1d 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -260,6 +260,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
return nil
}
+// Trash a Keep block.
func (v *S3Volume) Trash(loc string) error {
if v.readonly {
return MethodDisabledError
@@ -321,3 +322,10 @@ func (v *S3Volume) translateError(err error) error {
}
return err
}
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *S3Volume) EmptyTrash() error {
+ return nil
+}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 58710c0..bec1ee6 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -204,6 +204,10 @@ type Volume interface {
// underlying device. It will be passed on to clients in
// responses to PUT requests.
Replication() int
+
+ // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+ // and deletes them from the volume.
+ EmptyTrash() error
}
// A VolumeManager tells callers which volumes can read, which volumes
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 5810411..c614a08 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -78,6 +78,9 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
testPutFullBlock(t, factory)
testTrashUntrash(t, factory)
+ testEmptyTrashTrashLifetime0s(t, factory)
+ testEmptyTrashTrashLifetime3600s(t, factory)
+ testEmptyTrashTrashLifetime1s(t, factory)
}
// Put a test block, get it and verify content
@@ -710,7 +713,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
trashLifetime = 0
}()
- trashLifetime = 3600 * time.Second
+ trashLifetime = 3600
// put block and backdate it
v.PutRaw(TestHash, TestBlock)
@@ -758,3 +761,157 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
}
bufs.Put(buf)
}
+
+// With trashLifetime == 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
+// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
+// Get - which must fail to find the block, since it was deleted and hence not untrashed
+func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+ defer func() {
+ trashLifetime = 0
+ doneEmptyingTrash <- true
+ }()
+
+ trashLifetime = 0
+ trashCheckInterval = 1
+
+ go emptyTrash(trashCheckInterval)
+
+ // Trash old block; since trashLifetime = 0, Trash actually deletes the block
+ err := trashUntrashOldBlock(t, v, 0)
+
+ // Get it; for writable volumes, this should not find the block since it was deleted
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ if !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
+ }
+ } else {
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+ }
+}
+
+// With large trashLifetime, perform:
+// Run emptyTrash goroutine with much smaller trashCheckInterval
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - which either raises ErrNotImplemented or succeeds
+// Get - which must find the block
+func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+ defer func() {
+ trashLifetime = 0
+ doneEmptyingTrash <- true
+ }()
+
+ trashLifetime = 3600
+ trashCheckInterval = 1
+
+ go emptyTrash(trashCheckInterval)
+
+ // Trash old block
+ err := trashUntrashOldBlock(t, v, 2)
+
+ // Get is expected to succeed after untrash before EmptyTrash
+ // It is still found on readonly volumes
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ if !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
+ }
+ } else {
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+ }
+}
+
+// With trashLifetime = 1, perform:
+// Run emptyTrash goroutine
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - after emptyTrash goroutine ticks, and hence does not actually untrash
+// Get - which must fail to find the block
+func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
+ v := factory(t)
+ defer v.Teardown()
+ defer func() {
+ trashLifetime = 0
+ doneEmptyingTrash <- true
+ }()
+
+ volumes = append(volumes, v)
+
+ trashLifetime = 1
+ trashCheckInterval = 1
+
+ go emptyTrash(trashCheckInterval)
+
+ // Trash old block and untrash a little after first trashCheckInterval
+ err := trashUntrashOldBlock(t, v, 3)
+
+ // Get is expected to fail due to EmptyTrash before Untrash
+ // It is still found on readonly volumes
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ if !os.IsNotExist(err) {
+ t.Errorf("os.IsNotExist(%v) should have been true", err)
+ }
+ } else {
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+ }
+}
+
+// Put a block, backdate it, trash it, untrash it after the untrashAfter seconds
+func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
+ // put block and backdate it
+ v.PutRaw(TestHash, TestBlock)
+ v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+ buf, err := v.Get(TestHash)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if bytes.Compare(buf, TestBlock) != 0 {
+ t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+ }
+ bufs.Put(buf)
+
+ // Trash
+ err = v.Trash(TestHash)
+ if err != nil {
+ if err != ErrNotImplemented && err != MethodDisabledError {
+ t.Fatal(err)
+ } else {
+ // To test emptyTrash goroutine effectively, we need to give the
+ // ticker a couple rounds, adding some sleep time to the test.
+ // This delay is unnecessary for volumes that are currently
+ // not yet supporting trashLifetime > 0 (this case is already
+ // covered in the testTrashUntrash already)
+ return err
+ }
+ } else {
+ _, err = v.Get(TestHash)
+ if err == nil || !os.IsNotExist(err) {
+ t.Fatalf("os.IsNotExist(%v) should have been true", err)
+ }
+ }
+
+ // Untrash after give wait time
+ time.Sleep(time.Duration(untrashAfter) * time.Second)
+ err = v.Untrash(TestHash)
+ if err != nil {
+ if err != ErrNotImplemented && err != MethodDisabledError {
+ t.Fatal(err)
+ }
+ }
+ return err
+}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 53ffeef..508c7fa 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -223,3 +223,7 @@ func (v *MockVolume) Writable() bool {
func (v *MockVolume) Replication() int {
return 1
}
+
+func (v *MockVolume) EmptyTrash() error {
+ return nil
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 27ee242..eca0aee 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -23,9 +23,6 @@ type unixVolumeAdder struct {
}
func (vs *unixVolumeAdder) Set(value string) error {
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
if dirs := strings.Split(value, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
@@ -365,22 +362,22 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
}
}
-// Delete deletes the block data from the unix storage
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
- // and Delete() because either (a) the file will be deleted and Touch()
+ // and Trash() because either (a) the file will be trashed and Touch()
// will signal to the caller that the file is not present (and needs to
// be re-written), or (b) Touch() will update the file's timestamp and
- // Delete() will read the correct up-to-date timestamp and choose not to
- // delete the file.
+ // Trash() will read the correct up-to-date timestamp and choose not to
+ // trash the file.
if v.readonly {
return MethodDisabledError
}
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
@@ -408,11 +405,21 @@ func (v *UnixVolume) Trash(loc string) error {
return nil
}
}
- return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+
+ if trashLifetime == 0 {
+ return os.Remove(p)
+ }
+ return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
}
// Untrash moves block from trash back into store
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
func (v *UnixVolume) Untrash(loc string) (err error) {
+ if v.readonly {
+ return MethodDisabledError
+ }
+
prefix := fmt.Sprintf("%v.trash.", loc)
files, _ := ioutil.ReadDir(v.blockDir(loc))
for _, f := range files {
@@ -423,7 +430,8 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
}
}
}
- return err
+
+ return
}
// blockDir returns the fully qualified directory name for the directory
@@ -517,3 +525,51 @@ func (v *UnixVolume) translateError(err error) error {
return err
}
}
+
+var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() error {
+ var bytesDeleted, bytesInTrash int64
+ var blocksDeleted, blocksInTrash int
+
+ err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ log.Printf("EmptyTrash error for %v: %v", path, err)
+ } else if !info.Mode().IsDir() {
+ matches := trashRegexp.FindStringSubmatch(path)
+ if len(matches) == 3 {
+ deadline, err := strconv.Atoi(matches[2])
+ if err != nil {
+ log.Printf("EmptyTrash error for %v: %v", matches[1], err)
+ } else {
+ if int64(deadline) < time.Now().Unix() {
+ err = os.Remove(path)
+ if err != nil {
+ log.Printf("Error deleting %v: %v", matches[1], err)
+ bytesInTrash += info.Size()
+ blocksInTrash++
+ } else {
+ bytesDeleted += info.Size()
+ blocksDeleted++
+ }
+ } else {
+ bytesInTrash += info.Size()
+ blocksInTrash++
+ }
+ }
+ }
+ }
+ return nil
+ })
+
+ if err != nil {
+ log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+ } else {
+ log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
+ v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+ }
+
+ return nil
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list