[ARVADOS] updated: 32f7abdccc3ad293266e8df3e50344614ecd0dac

Git user git at public.curoverse.com
Wed Mar 9 11:31:11 EST 2016


Summary of changes:
 services/keepstore/azure_blob_volume.go   |   7 ++
 services/keepstore/keepstore.go           |  43 +++++++-
 services/keepstore/s3_volume.go           |   8 ++
 services/keepstore/volume.go              |   4 +
 services/keepstore/volume_generic_test.go | 159 +++++++++++++++++++++++++++++-
 services/keepstore/volume_test.go         |   4 +
 services/keepstore/volume_unix.go         |  80 ++++++++++++---
 7 files changed, 288 insertions(+), 17 deletions(-)

       via  32f7abdccc3ad293266e8df3e50344614ecd0dac (commit)
      from  2be52f02cbf3d13f10c50617228a20db613221f5 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 32f7abdccc3ad293266e8df3e50344614ecd0dac
Author: radhika <radhika at curoverse.com>
Date:   Wed Mar 9 11:29:57 2016 -0500

    8554: Add EmptyTrash api to Volume and implementation in volume_unix. Add emptyTrash goroutine.

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 7dfb84d..d096dc6 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -395,3 +395,10 @@ var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 	return keepBlockRegexp.MatchString(s)
 }
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *AzureBlobVolume) EmptyTrash() error {
+	return nil
+}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 104ae89..0861623 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -55,9 +55,13 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
-// trashLifetime is the time duration after a block is trashed
+// trashLifetime is the time duration in seconds after a block is trashed
 // during which it can be recovered using an /untrash request
-var trashLifetime time.Duration
+var trashLifetime int
+
+// Interval in seconds at which the emptyTrash goroutine will check
+// and delete expired trashed blocks. Default is once a day.
+var trashCheckInterval int
 
 var maxBuffers = 128
 var bufs *bufferPool
@@ -205,11 +209,16 @@ func main() {
 		"max-buffers",
 		maxBuffers,
 		fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-	flag.DurationVar(
+	flag.IntVar(
 		&trashLifetime,
 		"trash-lifetime",
-		0*time.Second,
+		0,
 		"Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
+	flag.IntVar(
+		&trashCheckInterval,
+		"trash-check-interval",
+		24*60*60,
+		"Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
 	flag.Parse()
 
@@ -321,10 +330,14 @@ func main() {
 	trashq = NewWorkQueue()
 	go RunTrashWorker(trashq)
 
+	// Start emptyTrash goroutine
+	go emptyTrash(trashCheckInterval)
+
 	// Shut down the server gracefully (by closing the listener)
 	// if SIGTERM is received.
 	term := make(chan os.Signal, 1)
 	go func(sig <-chan os.Signal) {
+		doneEmptyingTrash <- true
 		s := <-sig
 		log.Println("caught signal:", s)
 		listener.Close()
@@ -336,3 +349,25 @@ func main() {
 	srv := &http.Server{Addr: listen}
 	srv.Serve(listener)
 }
+
+// Channel to stop emptying trash
+var doneEmptyingTrash = make(chan bool)
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(trashCheckInterval int) {
+	ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+
+	for {
+		select {
+		case <-ticker.C:
+			for _, v := range volumes {
+				if v.Writable() {
+					v.EmptyTrash()
+				}
+			}
+		case <-doneEmptyingTrash:
+			ticker.Stop()
+			return
+		}
+	}
+}
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 7d9ba8a..5bcab1d 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -260,6 +260,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	return nil
 }
 
+// Trash a Keep block.
 func (v *S3Volume) Trash(loc string) error {
 	if v.readonly {
 		return MethodDisabledError
@@ -321,3 +322,10 @@ func (v *S3Volume) translateError(err error) error {
 	}
 	return err
 }
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *S3Volume) EmptyTrash() error {
+	return nil
+}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 58710c0..bec1ee6 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -204,6 +204,10 @@ type Volume interface {
 	// underlying device. It will be passed on to clients in
 	// responses to PUT requests.
 	Replication() int
+
+	// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+	// and deletes them from the volume.
+	EmptyTrash() error
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index 5810411..c614a08 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -78,6 +78,9 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
 	testPutFullBlock(t, factory)
 
 	testTrashUntrash(t, factory)
+	testEmptyTrashTrashLifetime0s(t, factory)
+	testEmptyTrashTrashLifetime3600s(t, factory)
+	testEmptyTrashTrashLifetime1s(t, factory)
 }
 
 // Put a test block, get it and verify content
@@ -710,7 +713,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 		trashLifetime = 0
 	}()
 
-	trashLifetime = 3600 * time.Second
+	trashLifetime = 3600
 
 	// put block and backdate it
 	v.PutRaw(TestHash, TestBlock)
@@ -758,3 +761,157 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
 	}
 	bufs.Put(buf)
 }
+
+// With trashLifetime == 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
+// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
+// Get - which must fail to find the block, since it was deleted and hence not untrashed
+func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
+	v := factory(t)
+	defer v.Teardown()
+	defer func() {
+		trashLifetime = 0
+		doneEmptyingTrash <- true
+	}()
+
+	trashLifetime = 0
+	trashCheckInterval = 1
+
+	go emptyTrash(trashCheckInterval)
+
+	// Trash old block; since trashLifetime = 0, Trash actually deletes the block
+	err := trashUntrashOldBlock(t, v, 0)
+
+	// Get it; for writable volumes, this should not find the block since it was deleted
+	buf, err := v.Get(TestHash)
+	if err != nil {
+		if !os.IsNotExist(err) {
+			t.Errorf("os.IsNotExist(%v) should have been true", err)
+		}
+	} else {
+		if bytes.Compare(buf, TestBlock) != 0 {
+			t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+		}
+		bufs.Put(buf)
+	}
+}
+
+// With large trashLifetime, perform:
+// Run emptyTrash goroutine with much smaller trashCheckInterval
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - which either raises ErrNotImplemented or succeeds
+// Get - which must find the block
+func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
+	v := factory(t)
+	defer v.Teardown()
+	defer func() {
+		trashLifetime = 0
+		doneEmptyingTrash <- true
+	}()
+
+	trashLifetime = 3600
+	trashCheckInterval = 1
+
+	go emptyTrash(trashCheckInterval)
+
+	// Trash old block
+	err := trashUntrashOldBlock(t, v, 2)
+
+	// Get is expected to succeed after untrash before EmptyTrash
+	// It is still found on readonly volumes
+	buf, err := v.Get(TestHash)
+	if err != nil {
+		if !os.IsNotExist(err) {
+			t.Errorf("os.IsNotExist(%v) should have been true", err)
+		}
+	} else {
+		if bytes.Compare(buf, TestBlock) != 0 {
+			t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+		}
+		bufs.Put(buf)
+	}
+}
+
+// With trashLifetime = 1, perform:
+// Run emptyTrash goroutine
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - after emptyTrash goroutine ticks, and hence does not actually untrash
+// Get - which must fail to find the block
+func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
+	v := factory(t)
+	defer v.Teardown()
+	defer func() {
+		trashLifetime = 0
+		doneEmptyingTrash <- true
+	}()
+
+	volumes = append(volumes, v)
+
+	trashLifetime = 1
+	trashCheckInterval = 1
+
+	go emptyTrash(trashCheckInterval)
+
+	// Trash old block and untrash a little after first trashCheckInterval
+	err := trashUntrashOldBlock(t, v, 3)
+
+	// Get is expected to fail due to EmptyTrash before Untrash
+	// It is still found on readonly volumes
+	buf, err := v.Get(TestHash)
+	if err != nil {
+		if !os.IsNotExist(err) {
+			t.Errorf("os.IsNotExist(%v) should have been true", err)
+		}
+	} else {
+		if bytes.Compare(buf, TestBlock) != 0 {
+			t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+		}
+		bufs.Put(buf)
+	}
+}
+
+// Put a block, backdate it, trash it, untrash it after the untrashAfter seconds
+func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
+	// put block and backdate it
+	v.PutRaw(TestHash, TestBlock)
+	v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+	buf, err := v.Get(TestHash)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if bytes.Compare(buf, TestBlock) != 0 {
+		t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+	}
+	bufs.Put(buf)
+
+	// Trash
+	err = v.Trash(TestHash)
+	if err != nil {
+		if err != ErrNotImplemented && err != MethodDisabledError {
+			t.Fatal(err)
+		} else {
+			// To test emptyTrash goroutine effectively, we need to give the
+			// ticker a couple rounds, adding some sleep time to the test.
+			// This delay is unnecessary for volumes that are currently
+			// not yet supporting trashLifetime > 0 (this case is already
+			// covered in the testTrashUntrash already)
+			return err
+		}
+	} else {
+		_, err = v.Get(TestHash)
+		if err == nil || !os.IsNotExist(err) {
+			t.Fatalf("os.IsNotExist(%v) should have been true", err)
+		}
+	}
+
+	// Untrash after give wait time
+	time.Sleep(time.Duration(untrashAfter) * time.Second)
+	err = v.Untrash(TestHash)
+	if err != nil {
+		if err != ErrNotImplemented && err != MethodDisabledError {
+			t.Fatal(err)
+		}
+	}
+	return err
+}
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 53ffeef..508c7fa 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -223,3 +223,7 @@ func (v *MockVolume) Writable() bool {
 func (v *MockVolume) Replication() int {
 	return 1
 }
+
+func (v *MockVolume) EmptyTrash() error {
+	return nil
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 27ee242..eca0aee 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -23,9 +23,6 @@ type unixVolumeAdder struct {
 }
 
 func (vs *unixVolumeAdder) Set(value string) error {
-	if trashLifetime != 0 {
-		return ErrNotImplemented
-	}
 	if dirs := strings.Split(value, ","); len(dirs) > 1 {
 		log.Print("DEPRECATED: using comma-separated volume list.")
 		for _, dir := range dirs {
@@ -365,22 +362,22 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 	}
 }
 
-// Delete deletes the block data from the unix storage
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
 func (v *UnixVolume) Trash(loc string) error {
 	// Touch() must be called before calling Write() on a block.  Touch()
 	// also uses lockfile().  This avoids a race condition between Write()
-	// and Delete() because either (a) the file will be deleted and Touch()
+	// and Trash() because either (a) the file will be trashed and Touch()
 	// will signal to the caller that the file is not present (and needs to
 	// be re-written), or (b) Touch() will update the file's timestamp and
-	// Delete() will read the correct up-to-date timestamp and choose not to
-	// delete the file.
+	// Trash() will read the correct up-to-date timestamp and choose not to
+	// trash the file.
 
 	if v.readonly {
 		return MethodDisabledError
 	}
-	if trashLifetime != 0 {
-		return ErrNotImplemented
-	}
 	if v.locker != nil {
 		v.locker.Lock()
 		defer v.locker.Unlock()
@@ -408,11 +405,21 @@ func (v *UnixVolume) Trash(loc string) error {
 			return nil
 		}
 	}
-	return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+
+	if trashLifetime == 0 {
+		return os.Remove(p)
+	}
+	return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
 }
 
 // Untrash moves block from trash back into store
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
+	if v.readonly {
+		return MethodDisabledError
+	}
+
 	prefix := fmt.Sprintf("%v.trash.", loc)
 	files, _ := ioutil.ReadDir(v.blockDir(loc))
 	for _, f := range files {
@@ -423,7 +430,8 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
 			}
 		}
 	}
-	return err
+
+	return
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -517,3 +525,51 @@ func (v *UnixVolume) translateError(err error) error {
 		return err
 	}
 }
+
+var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() error {
+	var bytesDeleted, bytesInTrash int64
+	var blocksDeleted, blocksInTrash int
+
+	err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			log.Printf("EmptyTrash error for %v: %v", path, err)
+		} else if !info.Mode().IsDir() {
+			matches := trashRegexp.FindStringSubmatch(path)
+			if len(matches) == 3 {
+				deadline, err := strconv.Atoi(matches[2])
+				if err != nil {
+					log.Printf("EmptyTrash error for %v: %v", matches[1], err)
+				} else {
+					if int64(deadline) < time.Now().Unix() {
+						err = os.Remove(path)
+						if err != nil {
+							log.Printf("Error deleting %v: %v", matches[1], err)
+							bytesInTrash += info.Size()
+							blocksInTrash++
+						} else {
+							bytesDeleted += info.Size()
+							blocksDeleted++
+						}
+					} else {
+						bytesInTrash += info.Size()
+						blocksInTrash++
+					}
+				}
+			}
+		}
+		return nil
+	})
+
+	if err != nil {
+		log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+	} else {
+		log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
+			v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+	}
+
+	return nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list