[ARVADOS] updated: a8193ca9f94dcda612859af3c420183e5fa6dadb
Git user
git at public.curoverse.com
Sun Mar 13 08:54:02 EDT 2016
Summary of changes:
services/keepstore/keepstore.go | 39 ++++++++++---------
services/keepstore/volume_generic_test.go | 63 ++++++++-----------------------
services/keepstore/volume_unix.go | 31 ++++++++-------
3 files changed, 55 insertions(+), 78 deletions(-)
via a8193ca9f94dcda612859af3c420183e5fa6dadb (commit)
from 32f7abdccc3ad293266e8df3e50344614ecd0dac (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit a8193ca9f94dcda612859af3c420183e5fa6dadb
Author: radhika <radhika at curoverse.com>
Date: Sun Mar 13 08:52:49 2016 -0400
8554: Untrash returns os.ErrNotExist when no trash found for the given locator and other updates.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 0861623..9473690 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -11,6 +11,7 @@ import (
"net/http"
"os"
"os/signal"
+ "regexp"
"strings"
"syscall"
"time"
@@ -55,13 +56,15 @@ var dataManagerToken string
// actually deleting anything.
var neverDelete = true
-// trashLifetime is the time duration in seconds after a block is trashed
+// trashLifetime is the time duration after a block is trashed
// during which it can be recovered using an /untrash request
-var trashLifetime int
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
-// Interval in seconds at which the emptyTrash goroutine will check
-// and delete expired trashed blocks. Default is once a day.
-var trashCheckInterval int
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
var maxBuffers = 128
var bufs *bufferPool
@@ -124,6 +127,8 @@ var (
volumes volumeSet
)
+var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32}).trash.(\d+)$`)
+
func (vs *volumeSet) String() string {
return fmt.Sprintf("%+v", (*vs)[:])
}
@@ -209,16 +214,16 @@ func main() {
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.IntVar(
+ flag.DurationVar(
&trashLifetime,
"trash-lifetime",
- 0,
- "Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
- flag.IntVar(
+ 0*time.Second,
+ "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+ flag.DurationVar(
&trashCheckInterval,
"trash-check-interval",
- 24*60*60,
- "Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+ 24*time.Hour,
+ "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
flag.Parse()
@@ -331,15 +336,16 @@ func main() {
go RunTrashWorker(trashq)
// Start emptyTrash goroutine
- go emptyTrash(trashCheckInterval)
+ doneEmptyingTrash := make(chan bool)
+ go emptyTrash(doneEmptyingTrash, trashCheckInterval)
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
- doneEmptyingTrash <- true
s := <-sig
log.Println("caught signal:", s)
+ doneEmptyingTrash <- true
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
@@ -350,12 +356,9 @@ func main() {
srv.Serve(listener)
}
-// Channel to stop emptying trash
-var doneEmptyingTrash = make(chan bool)
-
// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(trashCheckInterval int) {
- ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+ ticker := time.NewTicker(trashCheckInterval)
for {
select {
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index c614a08..5b0016f 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -78,7 +78,6 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
testPutFullBlock(t, factory)
testTrashUntrash(t, factory)
- testEmptyTrashTrashLifetime0s(t, factory)
testEmptyTrashTrashLifetime3600s(t, factory)
testEmptyTrashTrashLifetime1s(t, factory)
}
@@ -710,10 +709,10 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
defer func() {
- trashLifetime = 0
+ trashLifetime = 0 * time.Second
}()
- trashLifetime = 3600
+ trashLifetime = 3600 * time.Second
// put block and backdate it
v.PutRaw(TestHash, TestBlock)
@@ -762,40 +761,6 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
bufs.Put(buf)
}
-// With trashLifetime == 0, perform:
-// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
-// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
-// Get - which must fail to find the block, since it was deleted and hence not untrashed
-func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
- v := factory(t)
- defer v.Teardown()
- defer func() {
- trashLifetime = 0
- doneEmptyingTrash <- true
- }()
-
- trashLifetime = 0
- trashCheckInterval = 1
-
- go emptyTrash(trashCheckInterval)
-
- // Trash old block; since trashLifetime = 0, Trash actually deletes the block
- err := trashUntrashOldBlock(t, v, 0)
-
- // Get it; for writable volumes, this should not find the block since it was deleted
- buf, err := v.Get(TestHash)
- if err != nil {
- if !os.IsNotExist(err) {
- t.Errorf("os.IsNotExist(%v) should have been true", err)
- }
- } else {
- if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
- }
- bufs.Put(buf)
- }
-}
-
// With large trashLifetime, perform:
// Run emptyTrash goroutine with much smaller trashCheckInterval
// Trash an old block - which either raises ErrNotImplemented or succeeds
@@ -804,15 +769,17 @@ func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+
+ doneEmptyingTrash := make(chan bool)
defer func() {
- trashLifetime = 0
+ trashLifetime = 0 * time.Second
doneEmptyingTrash <- true
}()
- trashLifetime = 3600
- trashCheckInterval = 1
+ trashLifetime = 3600 * time.Second
+ trashCheckInterval = 1 * time.Second
- go emptyTrash(trashCheckInterval)
+ go emptyTrash(doneEmptyingTrash, trashCheckInterval)
// Trash old block
err := trashUntrashOldBlock(t, v, 2)
@@ -840,17 +807,19 @@ func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
+
+ doneEmptyingTrash := make(chan bool)
defer func() {
- trashLifetime = 0
+ trashLifetime = 0 * time.Second
doneEmptyingTrash <- true
}()
volumes = append(volumes, v)
- trashLifetime = 1
- trashCheckInterval = 1
+ trashLifetime = 1 * time.Second
+ trashCheckInterval = 1 * time.Second
- go emptyTrash(trashCheckInterval)
+ go emptyTrash(doneEmptyingTrash, trashCheckInterval)
// Trash old block and untrash a little after first trashCheckInterval
err := trashUntrashOldBlock(t, v, 3)
@@ -905,11 +874,11 @@ func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
}
}
- // Untrash after give wait time
+ // Untrash after give wait time; it may have been deleted by emptyTrash goroutine
time.Sleep(time.Duration(untrashAfter) * time.Second)
err = v.Untrash(TestHash)
if err != nil {
- if err != ErrNotImplemented && err != MethodDisabledError {
+ if err != ErrNotImplemented && err != MethodDisabledError && err != os.ErrNotExist {
t.Fatal(err)
}
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index eca0aee..9183a39 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -409,7 +409,7 @@ func (v *UnixVolume) Trash(loc string) error {
if trashLifetime == 0 {
return os.Remove(p)
}
- return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
+ return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
}
// Untrash moves block from trash back into store
@@ -421,11 +421,17 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
}
prefix := fmt.Sprintf("%v.trash.", loc)
- files, _ := ioutil.ReadDir(v.blockDir(loc))
+ files, err := ioutil.ReadDir(v.blockDir(loc))
+ if err != nil {
+ return err
+ }
+ if len(files) == 0 {
+ return os.ErrNotExist
+ }
for _, f := range files {
if strings.HasPrefix(f.Name(), prefix) {
err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
- if err != nil {
+ if err == nil {
break
}
}
@@ -526,8 +532,6 @@ func (v *UnixVolume) translateError(err error) error {
}
}
-var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
-
// EmptyTrash walks hierarchy looking for {hash}.trash.*
// and deletes those with deadline < now.
func (v *UnixVolume) EmptyTrash() error {
@@ -536,15 +540,17 @@ func (v *UnixVolume) EmptyTrash() error {
err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
if err != nil {
- log.Printf("EmptyTrash error for %v: %v", path, err)
- } else if !info.Mode().IsDir() {
- matches := trashRegexp.FindStringSubmatch(path)
+ return err
+ }
+
+ if !info.Mode().IsDir() {
+ matches := trashLocRegexp.FindStringSubmatch(path)
if len(matches) == 3 {
- deadline, err := strconv.Atoi(matches[2])
+ deadline, err := strconv.ParseInt(matches[2], 10, 64)
if err != nil {
log.Printf("EmptyTrash error for %v: %v", matches[1], err)
} else {
- if int64(deadline) < time.Now().Unix() {
+ if int64(deadline) <= time.Now().Unix() {
err = os.Remove(path)
if err != nil {
log.Printf("Error deleting %v: %v", matches[1], err)
@@ -566,10 +572,9 @@ func (v *UnixVolume) EmptyTrash() error {
if err != nil {
log.Printf("EmptyTrash error for %v: %v", v.String(), err)
- } else {
- log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
- v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
}
+ log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v", v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+
return nil
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list