[ARVADOS] updated: 16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738

Git user git at public.curoverse.com
Mon Jan 23 15:32:24 EST 2017


Summary of changes:
 services/keep-balance/balance_run_test.go |  4 +-
 services/keep-balance/collection.go       | 73 +++++++++++++++++++++++--------
 services/keep-balance/collection_test.go  | 57 ++++++++++++++++++++++++
 3 files changed, 115 insertions(+), 19 deletions(-)
 create mode 100644 services/keep-balance/collection_test.go

       via  16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738 (commit)
      from  b8de9b3e62e82b806576b237be5f317bf378169f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 16fe80b0e93ed8c8416b2dcbc0e2ad49bc850738
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Jan 23 15:31:23 2017 -0500

    9998: Handle timestamp collisions in collection index.

diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go
index 30683b4..0208032 100644
--- a/services/keep-balance/balance_run_test.go
+++ b/services/keep-balance/balance_run_test.go
@@ -138,7 +138,9 @@ func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
 		rt.Add(r)
 		if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
 			io.WriteString(w, `{"items_available":3,"items":[]}`)
-		} else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+		} else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
+			io.WriteString(w, `{"items_available":0,"items":[]}`)
+		} else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
 			io.WriteString(w, `{"items_available":0,"items":[]}`)
 		} else {
 			io.WriteString(w, `{"items_available":2,"items":[
diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go
index e8ac30b..a7a4846 100644
--- a/services/keep-balance/collection.go
+++ b/services/keep-balance/collection.go
@@ -53,6 +53,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
 	var last arvados.Collection
 	var filterTime time.Time
 	callCount := 0
+	gettingExactTimestamp := false
 	for {
 		progress(callCount, expectCount)
 		var page arvados.CollectionList
@@ -71,26 +72,62 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
 			}
 			last = coll
 		}
-		if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
-			if page.ItemsAvailable > len(page.Items) {
-				// TODO: use "mtime=X && UUID>Y"
-				// filters to get all collections with
-				// this timestamp, then use "mtime>X"
-				// to get the next timestamp.
-				return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
-			}
+		if len(page.Items) == 0 && !gettingExactTimestamp {
 			break
+		} else if last.ModifiedAt == nil {
+			return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
+		} else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
+			// If we requested time>=X and never got a
+			// time>X then we might not have received all
+			// items with time==X yet. Switch to
+			// gettingExactTimestamp mode (if we're not
+			// there already), advancing our UUID
+			// threshold with each request, until we get
+			// an empty page.
+			gettingExactTimestamp = true
+			params.Filters = []arvados.Filter{{
+				Attr:     "modified_at",
+				Operator: "=",
+				Operand:  filterTime,
+			}, {
+				Attr:     "uuid",
+				Operator: ">",
+				Operand:  last.UUID,
+			}}
+		} else if gettingExactTimestamp {
+			// This must be an empty page (in this mode,
+			// an unequal timestamp is impossible) so we
+			// can start getting pages of newer
+			// collections.
+			gettingExactTimestamp = false
+			params.Filters = []arvados.Filter{{
+				Attr:     "modified_at",
+				Operator: ">",
+				Operand:  filterTime,
+			}}
+		} else {
+			// In the normal case, we know we have seen
+			// all collections with modtime<filterTime,
+			// but we might not have seen all that have
+			// modtime=filterTime. Hence we use >= instead
+			// of > and skip the obvious overlapping item,
+			// i.e., the last item on the previous
+			// page. In some edge cases this can return
+			// collections we have already seen, but
+			// avoiding that would add overhead in the
+			// overwhelmingly common cases, so we don't
+			// bother.
+			filterTime = *last.ModifiedAt
+			params.Filters = []arvados.Filter{{
+				Attr:     "modified_at",
+				Operator: ">=",
+				Operand:  filterTime,
+			}, {
+				Attr:     "uuid",
+				Operator: "!=",
+				Operand:  last.UUID,
+			}}
 		}
-		filterTime = *last.ModifiedAt
-		params.Filters = []arvados.Filter{{
-			Attr:     "modified_at",
-			Operator: ">=",
-			Operand:  filterTime,
-		}, {
-			Attr:     "uuid",
-			Operator: "!=",
-			Operand:  last.UUID,
-		}}
 	}
 	progress(callCount, expectCount)
 
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
new file mode 100644
index 0000000..629ff3a
--- /dev/null
+++ b/services/keep-balance/collection_test.go
@@ -0,0 +1,57 @@
+package main
+
+import (
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	check "gopkg.in/check.v1"
+)
+
+//  TestIdenticalTimestamps ensures EachCollection returns the same
+//  set of collections for various page sizes -- even page sizes so
+//  small that we get entire pages full of collections with identical
+//  timestamps and exercise our gettingExactTimestamp cases.
+func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
+	// pageSize==0 uses the default (large) page size.
+	pageSizes := []int{0, 2, 3, 4, 5}
+	got := make([][]string, len(pageSizes))
+	var wg sync.WaitGroup
+	for trial, pageSize := range pageSizes {
+		wg.Add(1)
+		go func(trial, pageSize int) {
+			defer wg.Done()
+			streak := 0
+			longestStreak := 0
+			var lastMod time.Time
+			sawUUID := make(map[string]bool)
+			err := EachCollection(&s.config.Client, pageSize, func(c arvados.Collection) error {
+				got[trial] = append(got[trial], c.UUID)
+				if c.ModifiedAt == nil {
+					return nil
+				}
+				if sawUUID[c.UUID] {
+					// dup
+					return nil
+				}
+				sawUUID[c.UUID] = true
+				if lastMod == *c.ModifiedAt {
+					streak++
+					if streak > longestStreak {
+						longestStreak = streak
+					}
+				} else {
+					streak = 0
+					lastMod = *c.ModifiedAt
+				}
+				return nil
+			}, nil)
+			c.Check(err, check.IsNil)
+			c.Check(longestStreak > 25, check.Equals, true)
+		}(trial, pageSize)
+	}
+	wg.Wait()
+	for trial := 1; trial < len(pageSizes); trial++ {
+		c.Check(got[trial], check.DeepEquals, got[0])
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list