[ARVADOS] created: 2.1.0-1192-ge0632f47b

Git user git at public.arvados.org
Fri Aug 13 15:22:46 UTC 2021


        at  e0632f47bb83bda5badccc47cc2d8dbb70d92678 (commit)


commit e0632f47bb83bda5badccc47cc2d8dbb70d92678
Author: Tom Clegg <tom at curii.com>
Date:   Fri Aug 13 11:06:34 2021 -0400

    17698: Do concurrent writes when multiple volumes are needed.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 897447dd1..cbc83929d 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -23,6 +23,7 @@ import (
 	"os"
 	"sort"
 	"strings"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/lib/config"
@@ -367,6 +368,74 @@ func (s *HandlerSuite) TestReadsOrderedByStorageClassPriority(c *check.C) {
 	}
 }
 
+func (s *HandlerSuite) TestConcurrentWritesToMultipleStorageClasses(c *check.C) {
+	s.cluster.Volumes = map[string]arvados.Volume{
+		"zzzzz-nyw5e-111111111111111": {
+			Driver:         "mock",
+			Replication:    1,
+			StorageClasses: map[string]bool{"class1": true}},
+		"zzzzz-nyw5e-121212121212121": {
+			Driver:         "mock",
+			Replication:    1,
+			StorageClasses: map[string]bool{"class1": true, "class2": true}},
+		"zzzzz-nyw5e-222222222222222": {
+			Driver:         "mock",
+			Replication:    1,
+			StorageClasses: map[string]bool{"class2": true}},
+	}
+
+	for _, trial := range []struct {
+		setCounter uint32 // value to stuff vm.counter, to control offset
+		classes    string // desired classes
+		put111     int    // expected number of "put" ops on 11111... after 2x put reqs
+		put121     int    // expected number of "put" ops on 12121...
+		put222     int    // expected number of "put" ops on 22222...
+		cmp111     int    // expected number of "compare" ops on 11111... after 2x put reqs
+		cmp121     int    // expected number of "compare" ops on 12121...
+		cmp222     int    // expected number of "compare" ops on 22222...
+	}{
+		{0, "class1",
+			1, 0, 0,
+			2, 1, 0}, // first put compares on all vols with class2; second put succeeds after checking 121
+		{0, "class2",
+			0, 1, 0,
+			0, 2, 1}, // first put compares on all vols with class2; second put succeeds after checking 121
+		{0, "class1,class2",
+			1, 1, 0,
+			2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+		{1, "class1,class2",
+			0, 1, 0, // vm.counter offset is 1 so the first volume attempted is 121
+			2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+		{0, "class1,class2,class404",
+			1, 1, 0,
+			2, 2, 1}, // first put compares on all vols; second put doesn't compare on 222 because it already satisfied class2 on 121
+	} {
+		c.Logf("%+v", trial)
+		s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+			"class1": {},
+			"class2": {},
+			"class3": {},
+		}
+		c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+		atomic.StoreUint32(&s.handler.volmgr.counter, trial.setCounter)
+		for i := 0; i < 2; i++ {
+			IssueRequest(s.handler,
+				&RequestTester{
+					method:         "PUT",
+					uri:            "/" + TestHash,
+					requestBody:    TestBlock,
+					storageClasses: trial.classes,
+				})
+		}
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put111)
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put121)
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put222)
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp111)
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp121)
+		c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp222)
+	}
+}
+
 // Test TOUCH requests.
 func (s *HandlerSuite) TestTouchHandler(c *check.C) {
 	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2b469a13e..81f7fcd12 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -741,6 +742,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 }
 
 type putProgress struct {
+	classNeeded      map[string]bool
 	classTodo        map[string]bool
 	mountUsed        map[*VolumeMount]bool
 	totalReplication int
@@ -769,7 +771,7 @@ func (pr putProgress) ClassReplication() string {
 
 func (pr *putProgress) Add(mnt *VolumeMount) {
 	if pr.mountUsed[mnt] {
-		logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+		logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
 		return
 	}
 	pr.mountUsed[mnt] = true
@@ -780,6 +782,21 @@ func (pr *putProgress) Add(mnt *VolumeMount) {
 	}
 }
 
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+	if !pr.mountUsed[mnt] {
+		logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+		return
+	}
+	pr.mountUsed[mnt] = false
+	pr.totalReplication -= mnt.Replication
+	for class := range mnt.StorageClasses {
+		pr.classDone[class] -= mnt.Replication
+		if pr.classNeeded[class] {
+			pr.classTodo[class] = true
+		}
+	}
+}
+
 func (pr *putProgress) Done() bool {
 	return len(pr.classTodo) == 0 && pr.totalReplication > 0
 }
@@ -800,14 +817,36 @@ func (pr *putProgress) Want(mnt *VolumeMount) bool {
 	return false
 }
 
+func (pr *putProgress) Copy() *putProgress {
+	cp := putProgress{
+		classNeeded:      pr.classNeeded,
+		classTodo:        make(map[string]bool, len(pr.classTodo)),
+		classDone:        make(map[string]int, len(pr.classDone)),
+		mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
+		totalReplication: pr.totalReplication,
+	}
+	for k, v := range pr.classTodo {
+		cp.classTodo[k] = v
+	}
+	for k, v := range pr.classDone {
+		cp.classDone[k] = v
+	}
+	for k, v := range pr.mountUsed {
+		cp.mountUsed[k] = v
+	}
+	return &cp
+}
+
 func newPutResult(classes []string) putProgress {
 	pr := putProgress{
-		classTodo: make(map[string]bool, len(classes)),
-		classDone: map[string]int{},
-		mountUsed: map[*VolumeMount]bool{},
+		classNeeded: make(map[string]bool, len(classes)),
+		classTodo:   make(map[string]bool, len(classes)),
+		classDone:   map[string]int{},
+		mountUsed:   map[*VolumeMount]bool{},
 	}
 	for _, c := range classes {
 		if c != "" {
+			pr.classNeeded[c] = true
 			pr.classTodo[c] = true
 		}
 	}
@@ -856,67 +895,79 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 	// If we already have this data, it's intact on disk, and we
 	// can update its timestamp, return success. If we have
 	// different data with the same hash, return failure.
-	if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
+	if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
 		return result, err
 	}
 	if ctx.Err() != nil {
 		return result, ErrClientDisconnect
 	}
 
-	// Choose a Keep volume to write to.
-	// If this volume fails, try all of the volumes in order.
-	if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
-		// fall through to "try all volumes" below
-	} else if err := mnt.Put(ctx, hash, block); err != nil {
-		log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-	} else {
-		result.Add(mnt)
-		if result.Done() {
-			return result, nil
-		}
-	}
-	if ctx.Err() != nil {
-		return putProgress{}, ErrClientDisconnect
-	}
-
-	writables := volmgr.AllWritable()
+	writables := volmgr.NextWritable()
 	if len(writables) == 0 {
 		log.Error("no writable volumes")
-		return putProgress{}, FullError
-	}
-
-	allFull := true
+		return result, FullError
+	}
+
+	var wg sync.WaitGroup
+	var mtx sync.Mutex
+	cond := sync.Cond{L: &mtx}
+	// pending predicts what result will be if all pending writes
+	// succeed.
+	pending := result.Copy()
+	var allFull atomic.Value
+	allFull.Store(true)
+	mtx.Lock()
 	for _, mnt := range writables {
+		// Wait until our decision to use this mount does not
+		// depend on the outcome of pending writes.
+		for result.Want(mnt) && !pending.Want(mnt) {
+			cond.Wait()
+		}
 		if !result.Want(mnt) {
 			continue
 		}
-		err := mnt.Put(ctx, hash, block)
-		if ctx.Err() != nil {
-			return result, ErrClientDisconnect
-		}
-		switch err {
-		case nil:
-			result.Add(mnt)
-			if result.Done() {
-				return result, nil
+		mnt := mnt
+		pending.Add(mnt)
+		wg.Add(1)
+		go func() {
+			log.Debugf("PutBlock: start write to %s", mnt.UUID)
+			defer wg.Done()
+			err := mnt.Put(ctx, hash, block)
+
+			mtx.Lock()
+			if err != nil {
+				log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+				pending.Sub(mnt)
+			} else {
+				log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+				result.Add(mnt)
 			}
-			continue
-		case FullError:
-			continue
-		default:
-			// The volume is not full but the
-			// write did not succeed.  Report the
-			// error and continue trying.
-			allFull = false
-			log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-		}
+			cond.Broadcast()
+			mtx.Unlock()
+
+			if err != nil && err != FullError && ctx.Err() == nil {
+				// The volume is not full but the
+				// write did not succeed.  Report the
+				// error and continue trying.
+				allFull.Store(false)
+				log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+			}
+		}()
+	}
+	mtx.Unlock()
+	wg.Wait()
+	if ctx.Err() != nil {
+		return result, ErrClientDisconnect
+	}
+	if result.Done() {
+		return result, nil
 	}
 
 	if result.totalReplication > 0 {
 		// Some, but not all, of the storage classes were
 		// satisfied. This qualifies as success.
 		return result, nil
-	} else if allFull {
+	} else if allFull.Load().(bool) {
 		log.Error("all volumes with qualifying storage classes are full")
 		return putProgress{}, FullError
 	} else {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 9bfc6ca3e..878f690c9 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -344,11 +344,11 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
 			vm.writables = append(vm.writables, mnt)
 		}
 	}
-	// pri(i): return highest priority of any storage class
-	// offered by vm.readables[i]
-	pri := func(i int) int {
+	// pri(mnt): return highest priority of any storage class
+	// offered by mnt
+	pri := func(mnt *VolumeMount) int {
 		any, best := false, 0
-		for class := range vm.readables[i].KeepMount.StorageClasses {
+		for class := range mnt.KeepMount.StorageClasses {
 			if p := cluster.StorageClasses[class].Priority; !any || best < p {
 				best = p
 				any = true
@@ -356,14 +356,20 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
 		}
 		return best
 	}
-	// sort vm.readables, first by highest priority of any offered
+	// less(a,b): sort first by highest priority of any offered
 	// storage class (highest->lowest), then by volume UUID
-	sort.Slice(vm.readables, func(i, j int) bool {
-		if pi, pj := pri(i), pri(j); pi != pj {
-			return pi > pj
+	less := func(a, b *VolumeMount) bool {
+		if pa, pb := pri(a), pri(b); pa != pb {
+			return pa > pb
 		} else {
-			return vm.readables[i].KeepMount.UUID < vm.readables[j].KeepMount.UUID
+			return a.KeepMount.UUID < b.KeepMount.UUID
 		}
+	}
+	sort.Slice(vm.readables, func(i, j int) bool {
+		return less(vm.readables[i], vm.readables[j])
+	})
+	sort.Slice(vm.writables, func(i, j int) bool {
+		return less(vm.writables[i], vm.writables[j])
 	})
 	return vm, nil
 }
@@ -384,18 +390,19 @@ func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
 	return vm.readables
 }
 
-// AllWritable returns an array of all writable volumes
+// AllWritable returns writable volumes, sorted by priority/uuid. Used
+// by CompareAndTouch to ensure higher-priority volumes are checked
+// first.
 func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
 	return vm.writables
 }
 
-// NextWritable returns the next writable
-func (vm *RRVolumeManager) NextWritable() *VolumeMount {
-	if len(vm.writables) == 0 {
-		return nil
-	}
-	i := atomic.AddUint32(&vm.counter, 1)
-	return vm.writables[i%uint32(len(vm.writables))]
+// NextWritable returns writable volumes, rotated by vm.counter so
+// each volume gets a turn to be first. Used by PutBlock to distribute
+// new data across available volumes.
+func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
+	offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
+	return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
 }
 
 // VolumeStats returns an ioStats for the given volume.

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list