[ARVADOS] updated: ac037435f2ee9688957f1c0be6bfa9066ac027ba
git at public.curoverse.com
git at public.curoverse.com
Fri Mar 20 11:29:23 EDT 2015
Summary of changes:
sdk/cli/bin/arv-run-pipeline-instance | 53 +------
sdk/cli/bin/crunch-job | 30 ++--
services/keepstore/keepstore.go | 5 +
services/keepstore/trash_worker.go | 46 ++++++
services/keepstore/trash_worker_test.go | 250 ++++++++++++++++++++++++++++++++
5 files changed, 329 insertions(+), 55 deletions(-)
create mode 100644 services/keepstore/trash_worker.go
create mode 100644 services/keepstore/trash_worker_test.go
via ac037435f2ee9688957f1c0be6bfa9066ac027ba (commit)
via 4925686f6aa7214568ebd60be3acaa49dbf9dd1a (commit)
via a578a055456693d66ccf4d65a09822380d2cc7a8 (commit)
via 1ed0df4d4b2221a06e37effce42ab5b2e23b29c8 (commit)
via a239e2db534cc36aa8c3e08077383d84bf6ba8e8 (commit)
via 3365d47ab4f504a1e849852691313cddd89d0f15 (commit)
from 6a13c4e44ce35056fb1ccbdf470a79b8b6b8bfa5 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ac037435f2ee9688957f1c0be6bfa9066ac027ba
Author: Radhika Chippada <radhika at curoverse.com>
Date: Fri Mar 20 11:27:34 2015 -0400
3762: Run trash worker. Add tests to test the trash worker.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 83974ff..a363bac 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -283,9 +283,14 @@ func main() {
Client: &http.Client{},
}
+ // Initialize the pullq and worker
pullq = NewWorkQueue()
go RunPullWorker(pullq, keepClient)
+ // Initialize the trashq and worker
+ trashq = NewWorkQueue()
+ go RunTrashWorker(trashq)
+
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
new file mode 100644
index 0000000..6b8d60b
--- /dev/null
+++ b/services/keepstore/trash_worker.go
@@ -0,0 +1,46 @@
+package main
+
+import (
+ "log"
+ "time"
+)
+
+/*
+ Keepstore initiates trash worker channel goroutine.
+ The channel will process trash list.
+ For each (next) trash request:
+ Delete the block indicated by the trash request Locator
+ Repeat
+*/
+
+var DEFAULT_TRASH_LIFE_TIME int64 = 1209600 // Use 2 weeks for now
+
+func RunTrashWorker(trashq *WorkQueue) {
+ nextItem := trashq.NextItem
+ for item := range nextItem {
+ trashRequest := item.(TrashRequest)
+ err := TrashItem(trashRequest)
+ if err != nil {
+ log.Printf("Trash request error for %s: %s", trashRequest, err)
+ }
+ }
+}
+
+/*
+ Delete the block indicated by the Locator in TrashRequest.
+*/
+func TrashItem(trashRequest TrashRequest) (err error) {
+ // Verify if the block is to be deleted based on its Mtime
+ for _, volume := range KeepVM.Volumes() {
+ mtime, err := volume.Mtime(trashRequest.Locator)
+ if err == nil {
+ if trashRequest.BlockMtime == mtime.Unix() {
+ currentTime := time.Now().Unix()
+ if (currentTime - trashRequest.BlockMtime) > DEFAULT_TRASH_LIFE_TIME {
+ err = volume.Delete(trashRequest.Locator)
+ }
+ }
+ }
+ }
+ return
+}
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
new file mode 100644
index 0000000..e4f7fb3
--- /dev/null
+++ b/services/keepstore/trash_worker_test.go
@@ -0,0 +1,250 @@
+package main
+
+import (
+ "container/list"
+ "testing"
+ "time"
+)
+
+type TrashWorkerTestData struct {
+ Locator1 string
+ Block1 []byte
+ BlockMtime1 int64
+
+ Locator2 string
+ Block2 []byte
+ BlockMtime2 int64
+
+ CreateData bool
+ CreateInVolume1 bool
+ UseDelayToCreate bool
+
+ DeleteLocator string
+
+ ExpectLocator1 bool
+ ExpectLocator2 bool
+}
+
+/* Delete block that does not exist in any of the keep volumes.
+ Expect no errors.
+*/
+func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: "5d41402abc4b2a76b9719d911017c592",
+ Block1: []byte("hello"),
+
+ Locator2: "5d41402abc4b2a76b9719d911017c592",
+ Block2: []byte("hello"),
+
+ CreateData: false,
+
+ DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
+
+ ExpectLocator1: false,
+ ExpectLocator2: false,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Delete a block that exists on volume 1 of the keep servers.
+ Expect the second locator in volume 2 to be unaffected.
+*/
+func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: TEST_HASH,
+ Block1: TEST_BLOCK,
+
+ Locator2: TEST_HASH_2,
+ Block2: TEST_BLOCK_2,
+
+ CreateData: true,
+
+ DeleteLocator: TEST_HASH, // first locator
+
+ ExpectLocator1: false,
+ ExpectLocator2: true,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Delete a block that exists on volume 2 of the keep servers.
+ Expect the first locator in volume 1 to be unaffected.
+*/
+func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: TEST_HASH,
+ Block1: TEST_BLOCK,
+
+ Locator2: TEST_HASH_2,
+ Block2: TEST_BLOCK_2,
+
+ CreateData: true,
+
+ DeleteLocator: TEST_HASH_2, // locator 2
+
+ ExpectLocator1: true,
+ ExpectLocator2: false,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Delete a block with matching mtime for locator in both volumes.
+ Expect locator to be deleted from both volumes.
+*/
+func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: TEST_HASH,
+ Block1: TEST_BLOCK,
+
+ Locator2: TEST_HASH,
+ Block2: TEST_BLOCK,
+
+ CreateData: true,
+
+ DeleteLocator: TEST_HASH,
+
+ ExpectLocator1: false,
+ ExpectLocator2: false,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Same locator with different Mtimes exists in both volumes.
+ Delete the second and expect the first to be still around.
+*/
+func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: TEST_HASH,
+ Block1: TEST_BLOCK,
+
+ Locator2: TEST_HASH,
+ Block2: TEST_BLOCK,
+
+ CreateData: true,
+ UseDelayToCreate: true,
+
+ DeleteLocator: TEST_HASH,
+
+ ExpectLocator1: true,
+ ExpectLocator2: false,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Two different locators in volume 1.
+ Delete one of them.
+ Expect the other unaffected.
+*/
+func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
+ testData := TrashWorkerTestData{
+ Locator1: TEST_HASH,
+ Block1: TEST_BLOCK,
+
+ Locator2: TEST_HASH_2,
+ Block2: TEST_BLOCK_2,
+
+ CreateData: true,
+ CreateInVolume1: true,
+
+ DeleteLocator: TEST_HASH, // locator 1
+
+ ExpectLocator1: false,
+ ExpectLocator2: true,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Perform the test */
+func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
+ // Create Keep Volumes
+ KeepVM = MakeTestVolumeManager(2)
+
+ // Set trash life time delta to 0 so that the test can delete the blocks right after create
+ DEFAULT_TRASH_LIFE_TIME = 0
+
+ // Delete from volume will not take place if the block MTime is within permission_ttl
+ permission_ttl = time.Duration(1) * time.Second
+
+ vols := KeepVM.Volumes()
+
+ // Put test content
+ if testData.CreateData {
+ vols[0].Put(testData.Locator1, testData.Block1)
+ vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+
+ // One of the tests deletes a locator with different Mtimes in two different volumes
+ if testData.UseDelayToCreate {
+ time.Sleep(1 * time.Second)
+ }
+
+ if testData.CreateInVolume1 {
+ vols[0].Put(testData.Locator2, testData.Block2)
+ vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+ } else {
+ vols[1].Put(testData.Locator2, testData.Block2)
+ vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+ }
+ }
+
+ // Create TrashRequest for the test
+ trashRequest := TrashRequest{
+ Locator: testData.DeleteLocator,
+ BlockMtime: time.Now().Unix(),
+ }
+
+ // delay by permission_ttl to allow deletes to work
+ time.Sleep(1 * time.Second)
+
+ // Run trash worker and put the trashRequest on trashq
+ trashList := list.New()
+ trashList.PushBack(trashRequest)
+ trashq = NewWorkQueue()
+ go RunTrashWorker(trashq)
+ trashq.ReplaceQueue(trashList)
+ time.Sleep(10 * time.Millisecond) // give it a moment to finish processing the trash list
+
+ // Verify Locator1 to be un/deleted as expected
+ data, _ := GetBlock(testData.Locator1, false)
+ if testData.ExpectLocator1 {
+ if len(data) == 0 {
+ t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
+ }
+ } else {
+ if len(data) > 0 {
+ t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
+ }
+ }
+
+ // Verify Locator2 to be un/deleted as expected
+ if testData.Locator1 != testData.Locator2 {
+ data, _ = GetBlock(testData.Locator2, false)
+ if testData.ExpectLocator2 {
+ if len(data) == 0 {
+ t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
+ }
+ } else {
+ if len(data) > 0 {
+ t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
+ }
+ }
+ }
+
+ // One test used the same locator in two different volumes but with different Mtime values
+ // Hence let's verify that only one volume has it and the other is deleted
+ if (testData.ExpectLocator1) &&
+ (testData.Locator1 == testData.Locator2) {
+ locatorFoundIn := 0
+ for _, volume := range KeepVM.Volumes() {
+ if _, err := volume.Get(testData.Locator1); err == nil {
+ locatorFoundIn = locatorFoundIn + 1
+ }
+ }
+ if locatorFoundIn != 1 {
+ t.Errorf("Expected locator to be found in only one volume after deleting. But found: %s", locatorFoundIn)
+ }
+ }
+
+ // Done
+ trashq.Close()
+ KeepVM.Quit()
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list