[ARVADOS] created: 1.1.4-149-gdcb4760

Git user git at public.curoverse.com
Fri May 4 16:53:12 EDT 2018


        at  dcb4760843cc0ed4647e8eaa43abb5d2f049cd0c (commit)


commit dcb4760843cc0ed4647e8eaa43abb5d2f049cd0c
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri May 4 16:48:50 2018 -0400

    13383: Implement EmptyTrashWorkers in azure and filesystem backends.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 828a1f1..5da2055 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -620,49 +621,67 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
 	var bytesDeleted, bytesInTrash int64
-	var blocksDeleted, blocksInTrash int
-	params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+	var blocksDeleted, blocksInTrash int64
 
-	for {
-		resp, err := v.container.ListBlobs(params)
+	doBlob := func(b storage.Blob) {
+		// Check whether the block is flagged as trash
+		if b.Metadata["expires_at"] == "" {
+			return
+		}
+
+		atomic.AddInt64(&blocksInTrash, 1)
+		atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
+
+		expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
 		if err != nil {
-			log.Printf("EmptyTrash: ListBlobs: %v", err)
-			break
+			log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
+			return
 		}
-		for _, b := range resp.Blobs {
-			// Check if the block is expired
-			if b.Metadata["expires_at"] == "" {
-				continue
-			}
 
-			blocksInTrash++
-			bytesInTrash += b.Properties.ContentLength
+		if expiresAt > time.Now().Unix() {
+			return
+		}
 
-			expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
-			if err != nil {
-				log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
-				continue
-			}
+		err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+			IfMatch: b.Properties.Etag,
+		})
+		if err != nil {
+			log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
+			return
+		}
+		atomic.AddInt64(&blocksDeleted, 1)
+		atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
+	}
 
-			if expiresAt > time.Now().Unix() {
-				continue
+	var wg sync.WaitGroup
+	todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
+	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for b := range todo {
+				doBlob(b)
 			}
+		}()
+	}
 
-			err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
-				IfMatch: b.Properties.Etag,
-			})
-			if err != nil {
-				log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
-				continue
-			}
-			blocksDeleted++
-			bytesDeleted += b.Properties.ContentLength
+	params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
+	for {
+		resp, err := v.container.ListBlobs(params)
+		if err != nil {
+			log.Printf("EmptyTrash: ListBlobs: %v", err)
+			break
+		}
+		for _, b := range resp.Blobs {
+			todo <- b
 		}
 		if resp.NextMarker == "" {
 			break
 		}
 		params.Marker = resp.NextMarker
 	}
+	close(todo)
+	wg.Wait()
 
 	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index b5a1c97..9d4d801 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -850,7 +850,7 @@ func (v *S3Volume) EmptyTrash() {
 	}
 
 	var wg sync.WaitGroup
-	todo := make(chan *s3.Key)
+	todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
 	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
 		wg.Add(1)
 		go func() {
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
index 5f6fd90..672d7cf 100644
--- a/services/keepstore/usage.go
+++ b/services/keepstore/usage.go
@@ -118,6 +118,21 @@ TrashCheckInterval:
     How often to check for (and delete) trashed blocks whose
     TrashLifetime has expired.
 
+TrashWorkers:
+
+    Maximum number of concurrent trash operations. Default is 1, i.e.,
+    trash lists are processed serially.
+
+EmptyTrashWorkers:
+
+    Maximum number of concurrent block deletion operations (per
+    volume) when emptying trash. Default is 1.
+
+PullWorkers:
+
+    Maximum number of concurrent pull operations. Default is 1, i.e.,
+    pull lists are processed serially.
+
 Volumes:
 
     List of storage volumes. If omitted or empty, the default is to
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5a04ffd..23d6753 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 )
@@ -725,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
 	var bytesDeleted, bytesInTrash int64
-	var blocksDeleted, blocksInTrash int
+	var blocksDeleted, blocksInTrash int64
 
-	err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
-		if err != nil {
-			log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
-			return nil
-		}
+	doFile := func(path string, info os.FileInfo) {
 		if info.Mode().IsDir() {
-			return nil
+			return
 		}
 		matches := unixTrashLocRegexp.FindStringSubmatch(path)
 		if len(matches) != 3 {
-			return nil
+			return
 		}
 		deadline, err := strconv.ParseInt(matches[2], 10, 64)
 		if err != nil {
 			log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
-			return nil
+			return
 		}
-		bytesInTrash += info.Size()
-		blocksInTrash++
+		atomic.AddInt64(&bytesInTrash, info.Size())
+		atomic.AddInt64(&blocksInTrash, 1)
 		if deadline > time.Now().Unix() {
-			return nil
+			return
 		}
 		err = v.os.Remove(path)
 		if err != nil {
 			log.Printf("EmptyTrash: Remove %v: %v", path, err)
+			return
+		}
+		atomic.AddInt64(&bytesDeleted, info.Size())
+		atomic.AddInt64(&blocksDeleted, 1)
+	}
+
+	type dirent struct {
+		path string
+		info os.FileInfo
+	}
+	var wg sync.WaitGroup
+	todo := make(chan dirent, theConfig.EmptyTrashWorkers)
+	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for e := range todo {
+				doFile(e.path, e.info)
+			}
+		}()
+	}
+
+	err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+		if err != nil {
+			log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
 			return nil
 		}
-		bytesDeleted += info.Size()
-		blocksDeleted++
+		todo <- dirent{path, info}
 		return nil
 	})
+	close(todo)
+	wg.Wait()
 
 	if err != nil {
 		log.Printf("EmptyTrash error for %v: %v", v.String(), err)

commit 72b75c62d530a4d3083502d99dcd1012e7dcbba2
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Apr 24 17:03:50 2018 -0400

    13383: Fix warning about expected 404 error.
    
    The expected error (object not found immediately after deleting it)
    was being logged as a warning, and the warning erroneously stated the
    error was from "HEAD recent/X" rather than "HEAD X".
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 532a082..b5a1c97 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -835,13 +835,17 @@ func (v *S3Volume) EmptyTrash() {
 		atomic.AddInt64(&blocksDeleted, 1)
 
 		_, err = v.bucket.Head(loc, nil)
-		if os.IsNotExist(err) {
-			err = v.bucket.Del("recent/" + loc)
-			if err != nil {
-				log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
-			}
-		} else if err != nil {
-			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+		if err == nil {
+			log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+			return
+		}
+		if !os.IsNotExist(v.translateError(err)) {
+			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+			return
+		}
+		err = v.bucket.Del("recent/" + loc)
+		if err != nil {
+			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
 		}
 	}
 

commit 71e686d5be7e425ab3358505b8edf5098a2f09f2
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Apr 24 16:32:24 2018 -0400

    13383: Add PullWorkers, TrashWorkers, EmptyTrashWorkers configs.
    
    EmptyTrashWorkers only applies to S3 volumes.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/config.go b/services/keepstore/config.go
index 17d6acd..19dc7f6 100644
--- a/services/keepstore/config.go
+++ b/services/keepstore/config.go
@@ -40,6 +40,9 @@ type Config struct {
 	EnableDelete        bool
 	TrashLifetime       arvados.Duration
 	TrashCheckInterval  arvados.Duration
+	PullWorkers         int
+	TrashWorkers        int
+	EmptyTrashWorkers   int
 
 	Volumes VolumeList
 
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 03eef7e..c742752 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -165,19 +165,23 @@ func main() {
 		log.Fatal(err)
 	}
 
-	// Initialize Pull queue and worker
+	// Initialize keepclient for pull workers
 	keepClient := &keepclient.KeepClient{
 		Arvados:       &arvadosclient.ArvadosClient{},
 		Want_replicas: 1,
 	}
 
-	// Initialize the pullq and worker
+	// Initialize the pullq and workers
 	pullq = NewWorkQueue()
-	go RunPullWorker(pullq, keepClient)
+	for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+		go RunPullWorker(pullq, keepClient)
+	}
 
-	// Initialize the trashq and worker
+	// Initialize the trashq and workers
 	trashq = NewWorkQueue()
-	go RunTrashWorker(trashq)
+	for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+		go RunTrashWorker(trashq)
+	}
 
 	// Start emptyTrash goroutine
 	doneEmptyingTrash := make(chan bool)
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index a60b2fc..532a082 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -18,6 +18,7 @@ import (
 	"regexp"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -764,26 +765,21 @@ func (v *S3Volume) translateError(err error) error {
 func (v *S3Volume) EmptyTrash() {
 	var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
 
-	// Use a merge sort to find matching sets of trash/X and recent/X.
-	trashL := s3Lister{
-		Bucket:   v.bucket.Bucket,
-		Prefix:   "trash/",
-		PageSize: v.IndexPageSize,
-	}
 	// Define "ready to delete" as "...when EmptyTrash started".
 	startT := time.Now()
-	for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+
+	emptyOneKey := func(trash *s3.Key) {
 		loc := trash.Key[6:]
 		if !v.isKeepBlock(loc) {
-			continue
+			return
 		}
-		bytesInTrash += trash.Size
-		blocksInTrash++
+		atomic.AddInt64(&bytesInTrash, trash.Size)
+		atomic.AddInt64(&blocksInTrash, 1)
 
 		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
 		if err != nil {
 			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
-			continue
+			return
 		}
 		recent, err := v.bucket.Head("recent/"+loc, nil)
 		if err != nil && os.IsNotExist(v.translateError(err)) {
@@ -792,15 +788,15 @@ func (v *S3Volume) EmptyTrash() {
 			if err != nil {
 				log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
 			}
-			continue
+			return
 		} else if err != nil {
 			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
-			continue
+			return
 		}
 		recentT, err := v.lastModified(recent)
 		if err != nil {
 			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
-			continue
+			return
 		}
 		if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
 			if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
@@ -815,28 +811,28 @@ func (v *S3Volume) EmptyTrash() {
 				log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
 				v.fixRace(loc)
 				v.Touch(loc)
-				continue
+				return
 			}
 			_, err := v.bucket.Head(loc, nil)
 			if os.IsNotExist(err) {
 				log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
 				v.fixRace(loc)
-				continue
+				return
 			} else if err != nil {
 				log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
-				continue
+				return
 			}
 		}
 		if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
-			continue
+			return
 		}
 		err = v.bucket.Del(trash.Key)
 		if err != nil {
 			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
-			continue
+			return
 		}
-		bytesDeleted += trash.Size
-		blocksDeleted++
+		atomic.AddInt64(&bytesDeleted, trash.Size)
+		atomic.AddInt64(&blocksDeleted, 1)
 
 		_, err = v.bucket.Head(loc, nil)
 		if os.IsNotExist(err) {
@@ -848,6 +844,30 @@ func (v *S3Volume) EmptyTrash() {
 			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
 		}
 	}
+
+	var wg sync.WaitGroup
+	todo := make(chan *s3.Key)
+	for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			for key := range todo {
+				emptyOneKey(key)
+			}
+		}()
+	}
+
+	trashL := s3Lister{
+		Bucket:   v.bucket.Bucket,
+		Prefix:   "trash/",
+		PageSize: v.IndexPageSize,
+	}
+	for trash := trashL.First(); trash != nil; trash = trashL.Next() {
+		todo <- trash
+	}
+	close(todo)
+	wg.Wait()
+
 	if err := trashL.Error(); err != nil {
 		log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list