[ARVADOS] updated: a8193ca9f94dcda612859af3c420183e5fa6dadb

Git user git at public.curoverse.com
Sun Mar 13 08:54:02 EDT 2016


Summary of changes:
 services/keepstore/keepstore.go           | 39 ++++++++++---------
 services/keepstore/volume_generic_test.go | 63 ++++++++-----------------------
 services/keepstore/volume_unix.go         | 31 ++++++++-------
 3 files changed, 55 insertions(+), 78 deletions(-)

       via  a8193ca9f94dcda612859af3c420183e5fa6dadb (commit)
      from  32f7abdccc3ad293266e8df3e50344614ecd0dac (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a8193ca9f94dcda612859af3c420183e5fa6dadb
Author: radhika <radhika at curoverse.com>
Date:   Sun Mar 13 08:52:49 2016 -0400

    8554: Untrash returns os.ErrNotExist when no trash found for the given locator and other updates.

diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 0861623..9473690 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -11,6 +11,7 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
+	"regexp"
 	"strings"
 	"syscall"
 	"time"
@@ -55,13 +56,15 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
-// trashLifetime is the time duration in seconds after a block is trashed
+// trashLifetime is the time duration after a block is trashed
 // during which it can be recovered using an /untrash request
-var trashLifetime int
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
 
-// Interval in seconds at which the emptyTrash goroutine will check
-// and delete expired trashed blocks. Default is once a day.
-var trashCheckInterval int
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
 
 var maxBuffers = 128
 var bufs *bufferPool
@@ -124,6 +127,8 @@ var (
 	volumes         volumeSet
 )
 
+var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32}).trash.(\d+)$`)
+
 func (vs *volumeSet) String() string {
 	return fmt.Sprintf("%+v", (*vs)[:])
 }
@@ -209,16 +214,16 @@ func main() {
 		"max-buffers",
 		maxBuffers,
 		fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-	flag.IntVar(
+	flag.DurationVar(
 		&trashLifetime,
 		"trash-lifetime",
-		0,
-		"Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
-	flag.IntVar(
+		0*time.Second,
+		"Time duration after a block is trashed during which it can be recovered using an /untrash request")
+	flag.DurationVar(
 		&trashCheckInterval,
 		"trash-check-interval",
-		24*60*60,
-		"Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+		24*time.Hour,
+		"Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
 	flag.Parse()
 
@@ -331,15 +336,16 @@ func main() {
 	go RunTrashWorker(trashq)
 
 	// Start emptyTrash goroutine
-	go emptyTrash(trashCheckInterval)
+	doneEmptyingTrash := make(chan bool)
+	go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
 	// Shut down the server gracefully (by closing the listener)
 	// if SIGTERM is received.
 	term := make(chan os.Signal, 1)
 	go func(sig <-chan os.Signal) {
-		doneEmptyingTrash <- true
 		s := <-sig
 		log.Println("caught signal:", s)
+		doneEmptyingTrash <- true
 		listener.Close()
 	}(term)
 	signal.Notify(term, syscall.SIGTERM)
@@ -350,12 +356,9 @@ func main() {
 	srv.Serve(listener)
 }
 
-// Channel to stop emptying trash
-var doneEmptyingTrash = make(chan bool)
-
 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(trashCheckInterval int) {
-	ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+	ticker := time.NewTicker(trashCheckInterval)
 
 	for {
 		select {
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index c614a08..5b0016f 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -78,7 +78,6 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
 	testPutFullBlock(t, factory)
 
 	testTrashUntrash(t, factory)
-	testEmptyTrashTrashLifetime0s(t, factory)
 	testEmptyTrashTrashLifetime3600s(t, factory)
 	testEmptyTrashTrashLifetime1s(t, factory)
 }
@@ -710,10 +709,10 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
 	defer func() {
-		trashLifetime = 0
+		trashLifetime = 0 * time.Second
 	}()
 
-	trashLifetime = 3600
+	trashLifetime = 3600 * time.Second
 
 	// put block and backdate it
 	v.PutRaw(TestHash, TestBlock)
@@ -762,40 +761,6 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	bufs.Put(buf)
 }
 
-// With trashLifetime == 0, perform:
-// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
-// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
-// Get - which must fail to find the block, since it was deleted and hence not untrashed
-func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
-	v := factory(t)
-	defer v.Teardown()
-	defer func() {
-		trashLifetime = 0
-		doneEmptyingTrash <- true
-	}()
-
-	trashLifetime = 0
-	trashCheckInterval = 1
-
-	go emptyTrash(trashCheckInterval)
-
-	// Trash old block; since trashLifetime = 0, Trash actually deletes the block
-	err := trashUntrashOldBlock(t, v, 0)
-
-	// Get it; for writable volumes, this should not find the block since it was deleted
-	buf, err := v.Get(TestHash)
-	if err != nil {
-		if !os.IsNotExist(err) {
-			t.Errorf("os.IsNotExist(%v) should have been true", err)
-		}
-	} else {
-		if bytes.Compare(buf, TestBlock) != 0 {
-			t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
-		}
-		bufs.Put(buf)
-	}
-}
-
 // With large trashLifetime, perform:
 // Run emptyTrash goroutine with much smaller trashCheckInterval
 // Trash an old block - which either raises ErrNotImplemented or succeeds
@@ -804,15 +769,17 @@ func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
 func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
+
+	doneEmptyingTrash := make(chan bool)
 	defer func() {
-		trashLifetime = 0
+		trashLifetime = 0 * time.Second
 		doneEmptyingTrash <- true
 	}()
 
-	trashLifetime = 3600
-	trashCheckInterval = 1
+	trashLifetime = 3600 * time.Second
+	trashCheckInterval = 1 * time.Second
 
-	go emptyTrash(trashCheckInterval)
+	go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
 	// Trash old block
 	err := trashUntrashOldBlock(t, v, 2)
@@ -840,17 +807,19 @@ func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
 func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
 	v := factory(t)
 	defer v.Teardown()
+
+	doneEmptyingTrash := make(chan bool)
 	defer func() {
-		trashLifetime = 0
+		trashLifetime = 0 * time.Second
 		doneEmptyingTrash <- true
 	}()
 
 	volumes = append(volumes, v)
 
-	trashLifetime = 1
-	trashCheckInterval = 1
+	trashLifetime = 1 * time.Second
+	trashCheckInterval = 1 * time.Second
 
-	go emptyTrash(trashCheckInterval)
+	go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
 	// Trash old block and untrash a little after first trashCheckInterval
 	err := trashUntrashOldBlock(t, v, 3)
@@ -905,11 +874,11 @@ func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
 		}
 	}
 
-	// Untrash after give wait time
+	// Untrash after give wait time; it may have been deleted by emptyTrash goroutine
 	time.Sleep(time.Duration(untrashAfter) * time.Second)
 	err = v.Untrash(TestHash)
 	if err != nil {
-		if err != ErrNotImplemented && err != MethodDisabledError {
+		if err != ErrNotImplemented && err != MethodDisabledError && err != os.ErrNotExist {
 			t.Fatal(err)
 		}
 	}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index eca0aee..9183a39 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -409,7 +409,7 @@ func (v *UnixVolume) Trash(loc string) error {
 	if trashLifetime == 0 {
 		return os.Remove(p)
 	}
-	return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
+	return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
 }
 
 // Untrash moves block from trash back into store
@@ -421,11 +421,17 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 	}
 
 	prefix := fmt.Sprintf("%v.trash.", loc)
-	files, _ := ioutil.ReadDir(v.blockDir(loc))
+	files, err := ioutil.ReadDir(v.blockDir(loc))
+	if err != nil {
+		return err
+	}
+	if len(files) == 0 {
+		return os.ErrNotExist
+	}
 	for _, f := range files {
 		if strings.HasPrefix(f.Name(), prefix) {
 			err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
-			if err != nil {
+			if err == nil {
 				break
 			}
 		}
@@ -526,8 +532,6 @@ func (v *UnixVolume) translateError(err error) error {
 	}
 }
 
-var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
-
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() error {
@@ -536,15 +540,17 @@ func (v *UnixVolume) EmptyTrash() error {
 
 	err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
 		if err != nil {
-			log.Printf("EmptyTrash error for %v: %v", path, err)
-		} else if !info.Mode().IsDir() {
-			matches := trashRegexp.FindStringSubmatch(path)
+			return err
+		}
+
+		if !info.Mode().IsDir() {
+			matches := trashLocRegexp.FindStringSubmatch(path)
 			if len(matches) == 3 {
-				deadline, err := strconv.Atoi(matches[2])
+				deadline, err := strconv.ParseInt(matches[2], 10, 64)
 				if err != nil {
 					log.Printf("EmptyTrash error for %v: %v", matches[1], err)
 				} else {
-					if int64(deadline) < time.Now().Unix() {
+					if int64(deadline) <= time.Now().Unix() {
 						err = os.Remove(path)
 						if err != nil {
 							log.Printf("Error deleting %v: %v", matches[1], err)
@@ -566,10 +572,9 @@ func (v *UnixVolume) EmptyTrash() error {
 
 	if err != nil {
 		log.Printf("EmptyTrash error for %v: %v", v.String(), err)
-	} else {
-		log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
-			v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
 	}
 
+	log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v", v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+
 	return nil
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list