[ARVADOS] updated: 1.1.4-149-g956a920

Git user git at public.curoverse.com
Fri May 4 16:50:21 EDT 2018


Summary of changes:
 services/keepstore/azure_blob_volume.go | 77 ++++++++++++++++++++-------------
 services/keepstore/s3_volume.go         |  2 +-
 services/keepstore/usage.go             | 15 +++++++
 services/keepstore/volume_unix.go       | 51 ++++++++++++++++------
 4 files changed, 101 insertions(+), 44 deletions(-)

       via  956a920ec0d8ab0e9d6e2fb92318daa87b79f801 (commit)
      from  10803caaaf1b59caf5e4c49f78eca71685f1c4ae (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 956a920ec0d8ab0e9d6e2fb92318daa87b79f801
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri May 4 16:48:50 2018 -0400

    13335: Implement EmptyTrashWorkers in azure and filesystem backends.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 828a1f1..5da2055 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -620,49 +621,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
 	var bytesDeleted, bytesInTrash int64
-	var blocksDeleted, blocksInTrash int
-	params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+	var blocksDeleted, blocksInTrash int64
 
-	for {
-		resp, err := v.container.ListBlobs(params)
+	doBlob := func(b storage.Blob) {
+		// Check whether the block is flagged as trash
+		if b.Metadata["expires_at"] == "" {
+			return
+		}
+
+		atomic.AddInt64(&blocksInTrash, 1)
+		atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
+
+		expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
 		if err != nil {
-			log.Printf("EmptyTrash: ListBlobs: %v", err)
-			break
+			log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+			return
 		}
-		for _, b := range resp.Blobs {
-			// Check if the block is expired
-			if b.Metadata["expires_at"] == "" {
-				continue
-			}
 
-			blocksInTrash++
-			bytesInTrash += b.Properties.ContentLength
+		if expiresAt > time.Now().Unix() {
+			return
+		}
 
-			expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
-			if err != nil {
-				log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
-				continue
-			}
+		err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+			IfMatch: b.Properties.Etag,
+		})
+		if err != nil {
+			log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+			return
+		}
+		atomic.AddInt64(&blocksDeleted, 1)
+		atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
+	}
 
-			if expiresAt > time.Now().Unix() {
-				continue
+	var wg sync.WaitGroup
+	todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
+	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for b := range todo {
+				doBlob(b)
 			}
+		}()
+	}
 
-			err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
-				IfMatch: b.Properties.Etag,
-			})
-			if err != nil {
-				log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
-				continue
-			}
-			blocksDeleted++
-			bytesDeleted += b.Properties.ContentLength
+	params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+	for {
+		resp, err := v.container.ListBlobs(params)
+		if err != nil {
+			log.Printf("EmptyTrash: ListBlobs: %v", err)
+			break
+		}
+		for _, b := range resp.Blobs {
+			todo <- b
 		}
 		if resp.NextMarker == "" {
 			break
 		}
 		params.Marker = resp.NextMarker
 	}
+	close(todo)
+	wg.Wait()
 
 	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index b5a1c97..9d4d801 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -850,7 +850,7 @@ func (v *S3Volume) EmptyTrash() {
 	}
 
 	var wg sync.WaitGroup
-	todo := make(chan *s3.Key)
+	todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
 	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
 		wg.Add(1)
 		go func() {
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
index 5f6fd90..672d7cf 100644
--- a/services/keepstore/usage.go
+++ b/services/keepstore/usage.go
@@ -118,6 +118,21 @@ TrashCheckInterval:
     How often to check for (and delete) trashed blocks whose
     TrashLifetime has expired.
 
+TrashWorkers:
+
+    Maximum number of concurrent trash operations. Default is 1, i.e.,
+    trash lists are processed serially.
+
+EmptyTrashWorkers:
+
+    Maximum number of concurrent block deletion operations (per
+    volume) when emptying trash. Default is 1.
+
+PullWorkers:
+
+    Maximum number of concurrent pull operations. Default is 1, i.e.,
+    pull lists are processed serially.
+
 Volumes:
 
     List of storage volumes. If omitted or empty, the default is to
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5a04ffd..23d6753 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 )
@@ -725,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
 	var bytesDeleted, bytesInTrash int64
-	var blocksDeleted, blocksInTrash int
+	var blocksDeleted, blocksInTrash int64
 
-	err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
-		if err != nil {
-			log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
-			return nil
-		}
+	doFile := func(path string, info os.FileInfo) {
 		if info.Mode().IsDir() {
-			return nil
+			return
 		}
 		matches := unixTrashLocRegexp.FindStringSubmatch(path)
 		if len(matches) != 3 {
-			return nil
+			return
 		}
 		deadline, err := strconv.ParseInt(matches[2], 10, 64)
 		if err != nil {
 			log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
-			return nil
+			return
 		}
-		bytesInTrash += info.Size()
-		blocksInTrash++
+		atomic.AddInt64(&bytesInTrash, info.Size())
+		atomic.AddInt64(&blocksInTrash, 1)
 		if deadline > time.Now().Unix() {
-			return nil
+			return
 		}
 		err = v.os.Remove(path)
 		if err != nil {
 			log.Printf("EmptyTrash: Remove %v: %v", path, err)
+			return
+		}
+		atomic.AddInt64(&bytesDeleted, info.Size())
+		atomic.AddInt64(&blocksDeleted, 1)
+	}
+
+	type dirent struct {
+		path string
+		info os.FileInfo
+	}
+	var wg sync.WaitGroup
+	todo := make(chan dirent, theConfig.EmptyTrashWorkers)
+	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for e := range todo {
+				doFile(e.path, e.info)
+			}
+		}()
+	}
+
+	err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
 			return nil
 		}
-		bytesDeleted += info.Size()
-		blocksDeleted++
+		todo <- dirent{path, info}
 		return nil
 	})
+	close(todo)
+	wg.Wait()
 
 	if err != nil {
 		log.Printf("EmptyTrash error for %v: %v", v.String(), err)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list