[ARVADOS] updated: 16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738
Git user
git at public.curoverse.com
Mon Jan 23 15:32:24 EST 2017
Summary of changes:
services/keep-balance/balance_run_test.go | 4 +-
services/keep-balance/collection.go | 73 +++++++++++++++++++++++--------
services/keep-balance/collection_test.go | 57 ++++++++++++++++++++++++
3 files changed, 115 insertions(+), 19 deletions(-)
create mode 100644 services/keep-balance/collection_test.go
via 16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738 (commit)
from b8de9b3e62e82b806576b237be5f317bf378169f (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Jan 23 15:31:23 2017 -0500
9998: Handle timestamp collisions in collection index.
diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 30683b4..0208032 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -138,7 +138,9 @@ func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
rt.Add(r)
if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
io.WriteString(w, `{"items_available":3,"items":[]}`)
- } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+ } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
+ io.WriteString(w, `{"items_available":0,"items":[]}`)
+ } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
io.WriteString(w, `{"items_available":0,"items":[]}`)
} else {
io.WriteString(w, `{"items_available":2,"items":[
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index e8ac30b..a7a4846 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -53,6 +53,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
var last arvados.Collection
var filterTime time.Time
callCount := 0
+ gettingExactTimestamp := false
for {
progress(callCount, expectCount)
var page arvados.CollectionList
@@ -71,26 +72,62 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
}
last = coll
}
- if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
- if page.ItemsAvailable > len(page.Items) {
- // TODO: use "mtime=X && UUID>Y"
- // filters to get all collections with
- // this timestamp, then use "mtime>X"
- // to get the next timestamp.
- return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
- }
+ if len(page.Items) == 0 && !gettingExactTimestamp {
break
+ } else if last.ModifiedAt == nil {
+ return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
+ } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
+ // If we requested time>=X and never got a
+ // time>X then we might not have received all
+ // items with time==X yet. Switch to
+ // gettingExactTimestamp mode (if we're not
+ // there already), advancing our UUID
+ // threshold with each request, until we get
+ // an empty page.
+ gettingExactTimestamp = true
+ params.Filters = []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: "=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: ">",
+ Operand: last.UUID,
+ }}
+ } else if gettingExactTimestamp {
+ // This must be an empty page (in this mode,
+ // an unequal timestamp is impossible) so we
+ // can start getting pages of newer
+ // collections.
+ gettingExactTimestamp = false
+ params.Filters = []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: ">",
+ Operand: filterTime,
+ }}
+ } else {
+ // In the normal case, we know we have seen
+ // all collections with modtime<filterTime,
+ // but we might not have seen all that have
+ // modtime=filterTime. Hence we use >= instead
+ // of > and skip the obvious overlapping item,
+ // i.e., the last item on the previous
+ // page. In some edge cases this can return
+ // collections we have already seen, but
+ // avoiding that would add overhead in the
+ // overwhelmingly common cases, so we don't
+ // bother.
+ filterTime = *last.ModifiedAt
+ params.Filters = []arvados.Filter{{
+ Attr: "modified_at",
+ Operator: ">=",
+ Operand: filterTime,
+ }, {
+ Attr: "uuid",
+ Operator: "!=",
+ Operand: last.UUID,
+ }}
}
- filterTime = *last.ModifiedAt
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: "!=",
- Operand: last.UUID,
- }}
}
progress(callCount, expectCount)
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
new file mode 100644
index 0000000..629ff3a
--- /dev/null
+++ b/services/keep-balance/collection_test.go
@@ -0,0 +1,57 @@
+package main
+
+import (
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+// TestIdenticalTimestamps ensures EachCollection returns the same
+// set of collections for various page sizes -- even page sizes so
+// small that we get entire pages full of collections with identical
+// timestamps and exercise our gettingExactTimestamp cases.
+func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
+ // pageSize==0 uses the default (large) page size.
+ pageSizes := []int{0, 2, 3, 4, 5}
+ got := make([][]string, len(pageSizes))
+ var wg sync.WaitGroup
+ for trial, pageSize := range pageSizes {
+ wg.Add(1)
+ go func(trial, pageSize int) {
+ defer wg.Done()
+ streak := 0
+ longestStreak := 0
+ var lastMod time.Time
+ sawUUID := make(map[string]bool)
+ err := EachCollection(&s.config.Client, pageSize, func(c arvados.Collection) error {
+ got[trial] = append(got[trial], c.UUID)
+ if c.ModifiedAt == nil {
+ return nil
+ }
+ if sawUUID[c.UUID] {
+ // dup
+ return nil
+ }
+ sawUUID[c.UUID] = true
+ if lastMod == *c.ModifiedAt {
+ streak++
+ if streak > longestStreak {
+ longestStreak = streak
+ }
+ } else {
+ streak = 0
+ lastMod = *c.ModifiedAt
+ }
+ return nil
+ }, nil)
+ c.Check(err, check.IsNil)
+ c.Check(longestStreak > 25, check.Equals, true)
+ }(trial, pageSize)
+ }
+ wg.Wait()
+ for trial := 1; trial < len(pageSizes); trial++ {
+ c.Check(got[trial], check.DeepEquals, got[0])
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list