[ARVADOS] created: 2.1.0-1192-ge0632f47b
Git user
git at public.arvados.org
Fri Aug 13 15:22:46 UTC 2021
at e0632f47bb83bda5badccc47cc2d8dbb70d92678 (commit)
commit e0632f47bb83bda5badccc47cc2d8dbb70d92678
Author: Tom Clegg <tom at curii.com>
Date: Fri Aug 13 11:06:34 2021 -0400
17698: Do concurrent writes when multiple volumes are needed.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 897447dd1..cbc83929d 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -23,6 +23,7 @@ import (
"os"
"sort"
"strings"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/lib/config"
@@ -367,6 +368,74 @@ func (s *HandlerSuite) TestReadsOrderedByStorageClassPriority(c *check.C) {
}
}
+func (s *HandlerSuite) TestConcurrentWritesToMultipleStorageClasses(c *check.C) {
+ s.cluster.Volumes = map[string]arvados.Volume{
+ "zzzzz-nyw5e-111111111111111": {
+ Driver: "mock",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class1": true}},
+ "zzzzz-nyw5e-121212121212121": {
+ Driver: "mock",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class1": true, "class2": true}},
+ "zzzzz-nyw5e-222222222222222": {
+ Driver: "mock",
+ Replication: 1,
+ StorageClasses: map[string]bool{"class2": true}},
+ }
+
+ for _, trial := range []struct {
+ setCounter uint32 // value to stuff vm.counter, to control offset
+ classes string // desired classes
+ put111 int // expected number of "put" ops on 11111... after 2x put reqs
+ put121 int // expected number of "put" ops on 12121...
+ put222 int // expected number of "put" ops on 22222...
+ cmp111 int // expected number of "compare" ops on 11111... after 2x put reqs
+ cmp121 int // expected number of "compare" ops on 12121...
+ cmp222 int // expected number of "compare" ops on 22222...
+ }{
+ {0, "class1",
+ 1, 0, 0,
+ 2, 1, 0}, // first put compares on all vols with class2; second put succeeds after checking 121
+ {0, "class2",
+ 0, 1, 0,
+ 0, 2, 1}, // first put compares on all vols with class2; second put succeeds after checking 121
+ {0, "class1,class2",
+ 1, 1, 0,
+ 2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+ {1, "class1,class2",
+ 0, 1, 0, // vm.counter offset is 1 so the first volume attempted is 121
+ 2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+ {0, "class1,class2,class404",
+ 1, 1, 0,
+ 2, 2, 1}, // first put compares on all vols; second put doesn't compare on 222 because it already satisfied class2 on 121
+ } {
+ c.Logf("%+v", trial)
+ s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+ "class1": {},
+ "class2": {},
+ "class3": {},
+ }
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+ atomic.StoreUint32(&s.handler.volmgr.counter, trial.setCounter)
+ for i := 0; i < 2; i++ {
+ IssueRequest(s.handler,
+ &RequestTester{
+ method: "PUT",
+ uri: "/" + TestHash,
+ requestBody: TestBlock,
+ storageClasses: trial.classes,
+ })
+ }
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put111)
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put121)
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put222)
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp111)
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp121)
+ c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp222)
+ }
+}
+
// Test TOUCH requests.
func (s *HandlerSuite) TestTouchHandler(c *check.C) {
c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2b469a13e..81f7fcd12 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -18,6 +18,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -741,6 +742,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
}
type putProgress struct {
+ classNeeded map[string]bool
classTodo map[string]bool
mountUsed map[*VolumeMount]bool
totalReplication int
@@ -769,7 +771,7 @@ func (pr putProgress) ClassReplication() string {
func (pr *putProgress) Add(mnt *VolumeMount) {
if pr.mountUsed[mnt] {
- logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+ logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
return
}
pr.mountUsed[mnt] = true
@@ -780,6 +782,21 @@ func (pr *putProgress) Add(mnt *VolumeMount) {
}
}
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+ if !pr.mountUsed[mnt] {
+ logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+ return
+ }
+ pr.mountUsed[mnt] = false
+ pr.totalReplication -= mnt.Replication
+ for class := range mnt.StorageClasses {
+ pr.classDone[class] -= mnt.Replication
+ if pr.classNeeded[class] {
+ pr.classTodo[class] = true
+ }
+ }
+}
+
func (pr *putProgress) Done() bool {
return len(pr.classTodo) == 0 && pr.totalReplication > 0
}
@@ -800,14 +817,36 @@ func (pr *putProgress) Want(mnt *VolumeMount) bool {
return false
}
+func (pr *putProgress) Copy() *putProgress {
+ cp := putProgress{
+ classNeeded: pr.classNeeded,
+ classTodo: make(map[string]bool, len(pr.classTodo)),
+ classDone: make(map[string]int, len(pr.classDone)),
+ mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
+ totalReplication: pr.totalReplication,
+ }
+ for k, v := range pr.classTodo {
+ cp.classTodo[k] = v
+ }
+ for k, v := range pr.classDone {
+ cp.classDone[k] = v
+ }
+ for k, v := range pr.mountUsed {
+ cp.mountUsed[k] = v
+ }
+ return &cp
+}
+
func newPutResult(classes []string) putProgress {
pr := putProgress{
- classTodo: make(map[string]bool, len(classes)),
- classDone: map[string]int{},
- mountUsed: map[*VolumeMount]bool{},
+ classNeeded: make(map[string]bool, len(classes)),
+ classTodo: make(map[string]bool, len(classes)),
+ classDone: map[string]int{},
+ mountUsed: map[*VolumeMount]bool{},
}
for _, c := range classes {
if c != "" {
+ pr.classNeeded[c] = true
pr.classTodo[c] = true
}
}
@@ -856,67 +895,79 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
+ if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
return result, err
}
if ctx.Err() != nil {
return result, ErrClientDisconnect
}
- // Choose a Keep volume to write to.
- // If this volume fails, try all of the volumes in order.
- if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
- // fall through to "try all volumes" below
- } else if err := mnt.Put(ctx, hash, block); err != nil {
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- } else {
- result.Add(mnt)
- if result.Done() {
- return result, nil
- }
- }
- if ctx.Err() != nil {
- return putProgress{}, ErrClientDisconnect
- }
-
- writables := volmgr.AllWritable()
+ writables := volmgr.NextWritable()
if len(writables) == 0 {
log.Error("no writable volumes")
- return putProgress{}, FullError
- }
-
- allFull := true
+ return result, FullError
+ }
+
+ var wg sync.WaitGroup
+ var mtx sync.Mutex
+ cond := sync.Cond{L: &mtx}
+ // pending predicts what result will be if all pending writes
+ // succeed.
+ pending := result.Copy()
+ var allFull atomic.Value
+ allFull.Store(true)
+ mtx.Lock()
for _, mnt := range writables {
+ // Wait until our decision to use this mount does not
+ // depend on the outcome of pending writes.
+ for result.Want(mnt) && !pending.Want(mnt) {
+ cond.Wait()
+ }
if !result.Want(mnt) {
continue
}
- err := mnt.Put(ctx, hash, block)
- if ctx.Err() != nil {
- return result, ErrClientDisconnect
- }
- switch err {
- case nil:
- result.Add(mnt)
- if result.Done() {
- return result, nil
+ mnt := mnt
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ log.Debugf("PutBlock: start write to %s", mnt.UUID)
+ defer wg.Done()
+ err := mnt.Put(ctx, hash, block)
+
+ mtx.Lock()
+ if err != nil {
+ log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+ pending.Sub(mnt)
+ } else {
+ log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+ result.Add(mnt)
}
- continue
- case FullError:
- continue
- default:
- // The volume is not full but the
- // write did not succeed. Report the
- // error and continue trying.
- allFull = false
- log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
- }
+ cond.Broadcast()
+ mtx.Unlock()
+
+ if err != nil && err != FullError && ctx.Err() == nil {
+ // The volume is not full but the
+ // write did not succeed. Report the
+ // error and continue trying.
+ allFull.Store(false)
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ }
+ }()
+ }
+ mtx.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return result, ErrClientDisconnect
+ }
+ if result.Done() {
+ return result, nil
}
if result.totalReplication > 0 {
// Some, but not all, of the storage classes were
// satisfied. This qualifies as success.
return result, nil
- } else if allFull {
+ } else if allFull.Load().(bool) {
log.Error("all volumes with qualifying storage classes are full")
return putProgress{}, FullError
} else {
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 9bfc6ca3e..878f690c9 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -344,11 +344,11 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
vm.writables = append(vm.writables, mnt)
}
}
- // pri(i): return highest priority of any storage class
- // offered by vm.readables[i]
- pri := func(i int) int {
+ // pri(mnt): return highest priority of any storage class
+ // offered by mnt
+ pri := func(mnt *VolumeMount) int {
any, best := false, 0
- for class := range vm.readables[i].KeepMount.StorageClasses {
+ for class := range mnt.KeepMount.StorageClasses {
if p := cluster.StorageClasses[class].Priority; !any || best < p {
best = p
any = true
@@ -356,14 +356,20 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
}
return best
}
- // sort vm.readables, first by highest priority of any offered
+ // less(a,b): sort first by highest priority of any offered
// storage class (highest->lowest), then by volume UUID
- sort.Slice(vm.readables, func(i, j int) bool {
- if pi, pj := pri(i), pri(j); pi != pj {
- return pi > pj
+ less := func(a, b *VolumeMount) bool {
+ if pa, pb := pri(a), pri(b); pa != pb {
+ return pa > pb
} else {
- return vm.readables[i].KeepMount.UUID < vm.readables[j].KeepMount.UUID
+ return a.KeepMount.UUID < b.KeepMount.UUID
}
+ }
+ sort.Slice(vm.readables, func(i, j int) bool {
+ return less(vm.readables[i], vm.readables[j])
+ })
+ sort.Slice(vm.writables, func(i, j int) bool {
+ return less(vm.writables[i], vm.writables[j])
})
return vm, nil
}
@@ -384,18 +390,19 @@ func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
return vm.readables
}
-// AllWritable returns an array of all writable volumes
+// AllWritable returns writable volumes, sorted by priority/uuid. Used
+// by CompareAndTouch to ensure higher-priority volumes are checked
+// first.
func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
return vm.writables
}
-// NextWritable returns the next writable
-func (vm *RRVolumeManager) NextWritable() *VolumeMount {
- if len(vm.writables) == 0 {
- return nil
- }
- i := atomic.AddUint32(&vm.counter, 1)
- return vm.writables[i%uint32(len(vm.writables))]
+// NextWritable returns writable volumes, rotated by vm.counter so
+// each volume gets a turn to be first. Used by PutBlock to distribute
+// new data across available volumes.
+func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
+ offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
+ return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
}
// VolumeStats returns an ioStats for the given volume.
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list