[ARVADOS] updated: d89b7ae1f6fa35dd3627ead14c855751f1de2193
git at public.curoverse.com
git at public.curoverse.com
Mon Aug 10 12:46:12 EDT 2015
Summary of changes:
apps/workbench/app/models/arvados_base.rb | 4 +
apps/workbench/app/models/user.rb | 3 +
.../application/_delete_object_button.html.erb | 2 +-
.../views/application/_show_home_button.html.erb | 3 +
.../app/views/application/_show_recent.html.erb | 18 +--
.../controllers/application_controller_test.rb | 41 +++++++
doc/_config.yml | 6 +-
doc/install/install-api-server.html.textile.liquid | 12 +-
.../install-arv-git-httpd.html.textile.liquid | 9 +-
.../install-crunch-dispatch.html.textile.liquid | 5 +-
doc/install/install-keepproxy.html.textile.liquid | 6 +-
doc/install/install-keepstore.html.textile.liquid | 2 +-
...nstall-manual-prerequisites.html.textile.liquid | 19 ++-
.../install-shell-server.html.textile.liquid | 4 +
.../install-workbench-app.html.textile.liquid | 6 +-
doc/sdk/perl/index.html.textile.liquid | 2 +-
.../running-external-program.html.textile.liquid | 17 ++-
sdk/python/arvados/arvfile.py | 4 +-
sdk/python/arvados/commands/arv_copy.py | 36 +++---
sdk/python/arvados/keep.py | 5 +-
sdk/python/arvados/retry.py | 3 +-
sdk/python/tests/test_retry.py | 6 +-
sdk/python/tests/test_websockets.py | 136 ++++++++++-----------
services/api/script/crunch-dispatch.rb | 1 +
services/api/test/fixtures/humans.yml | 1 +
services/arv-git-httpd/auth_handler.go | 14 ++-
services/arv-git-httpd/server_test.go | 15 +++
services/keepstore/handlers.go | 24 ++--
services/keepstore/pull_worker_test.go | 6 +-
services/keepstore/status_test.go | 15 +++
services/keepstore/trash_worker_test.go | 7 +-
services/keepstore/work_queue.go | 82 ++++++-------
services/keepstore/work_queue_test.go | 35 +++---
33 files changed, 336 insertions(+), 213 deletions(-)
create mode 100644 apps/workbench/app/views/application/_show_home_button.html.erb
create mode 100644 services/api/test/fixtures/humans.yml
create mode 100644 services/keepstore/status_test.go
discards b48ad53210709a52574399cb80d6809164530511 (commit)
discards 69573d5dc48d026b5ad49652c6941d97aa3d7784 (commit)
discards 0b96908a7acdb1e4157ed57464de6023032c06cb (commit)
via d89b7ae1f6fa35dd3627ead14c855751f1de2193 (commit)
via 5562d6d556a942b66ea392c1e9bc803f9b9733e7 (commit)
via ab689cf0a5c73e1fa0525416fa12aaf5ba88abc9 (commit)
via 4de0af809ffbef43d89cd1751e5d611a4b5445e9 (commit)
via e0213cbec6a151e077b8cca00700815c3c3d18e7 (commit)
via 2553fde59a3cf872be891a4f689c241055080c35 (commit)
via 63b82f437c0f237c76f460ef71be0bb3ed42f7f4 (commit)
via 2a96c097e5a176018d078a5d6985403072e8672e (commit)
via 970907f28866a09a9fe95da48dffa6cd34ab4dca (commit)
via 3959d7afff8bb3c3b8da9eb7d178919275180f2a (commit)
via bdb850521603561439429c99e414ec702d7f83b1 (commit)
via 50ff63e948234f6f67acce0aec909f7b6f4705b8 (commit)
via 06f774bc61654ea0bb6ddd7b9f61bde1dd56b884 (commit)
via 4357a60b2805af7151e7c24a8e616b36584a22e3 (commit)
via c24168b75a7bfb6813843bd0c1825baae7434cc9 (commit)
via 6cfe8a2abb6121617286c0931ca723cadfc9e98f (commit)
via 9209660a3bd951c3945bce0da2fa9195cb002e44 (commit)
via 0988acb472849dc08d576ee40493e70bde2132ca (commit)
via f46b05c041684ced4cf438ae6cade577156f81a5 (commit)
via fc851b249ea25a40a1fb392906705142113ac5b9 (commit)
via d2d7138c84a55ef87937cdaefd9c58a66916d76f (commit)
via 8fe01fcc1b88a49f1f7eff14d0435e3ac0649721 (commit)
via 8089b2f5c97b1db9bd826a1b6488f1b060830def (commit)
via 97f16e9b6ce0095a40b68781238550d066a15261 (commit)
via 43338e77fc9ac255511395d8a8b1ae4bb8c98577 (commit)
via bbf7272aa2b831102c47fc93f8966ec32e918205 (commit)
via 03d5c4df2f33c5bb2117c45869808fa018e855f1 (commit)
via 261fe4c689858952b19991e0055eda669ab144af (commit)
via 427d9052d59ca7819acba9fb2e5f381d3e44a53e (commit)
via c8bea0c6dd47a9b9a7a892602d9869177d4c231f (commit)
via 93e0931ea059355ffc26add1303a52f13d2964c9 (commit)
via db717120a687b2851e526117a135a45be75cba1a (commit)
via b7c99efa73a3b0ccfec85915f04545d634b235b0 (commit)
via 5f642789f87c1f0cefbb878cbb031c9b71fc7a09 (commit)
via 54ad79868ed16e4a53f943ec9dc104c28dee2343 (commit)
via 092e1b42918850f7166cb4e3bbda25b67049105f (commit)
via 5916ac79faa2384d75cc41ab4af3ff1a881e8d6e (commit)
via 29a54ec4795c707b19858e3e02dcc48bf3d77e75 (commit)
via 467b636f7d1b34f7695f55af972ae90132fc8063 (commit)
via 75eaaaa74a4a7ec6821008fe93dbee598ee24dee (commit)
via d6b2ead0824fe6810917e5281b4feb969528eb46 (commit)
via 15ac44ec46bbbea31bf6a1ccc1842f7703b9f832 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (b48ad53210709a52574399cb80d6809164530511)
\
N -- N -- N (d89b7ae1f6fa35dd3627ead14c855751f1de2193)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit d89b7ae1f6fa35dd3627ead14c855751f1de2193
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Aug 10 12:45:45 2015 -0400
6260: Pull entire status object out of WorkQueue atomically.
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a6665f6..a86bb6a 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -192,12 +192,6 @@ type PoolStatus struct {
Len int `json:"BuffersInUse"`
}
-type WorkQueueStatus struct {
- InProgress int
- Outstanding int
- Queued int
-}
-
type NodeStatus struct {
Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
@@ -238,22 +232,20 @@ func readNodeStatus(st *NodeStatus) {
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
- readWorkQueueStatus(&st.PullQueue, pullq)
- readWorkQueueStatus(&st.TrashQueue, trashq)
+ st.PullQueue = getWorkQueueStatus(pullq)
+ st.TrashQueue = getWorkQueueStatus(trashq)
runtime.ReadMemStats(&st.Memory)
}
-// Populate a WorkQueueStatus. This is not atomic, so race conditions
-// can cause InProgress + Queued != Outstanding.
-func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+// return a WorkQueueStatus for the given queue. If q is nil (which
+// should never happen except in test suites), return a zero status
+// value instead of crashing.
+func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
if q == nil {
// This should only happen during tests.
- *st = WorkQueueStatus{}
- return
+ return WorkQueueStatus{}
}
- st.InProgress = q.CountInProgress()
- st.Outstanding = q.CountOutstanding()
- st.Queued = q.CountQueued()
+ return q.Status()
}
// DeleteHandler processes DELETE requests.
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index e8d390a..37d83b3 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -281,14 +281,16 @@ func performTest(testData PullWorkerTestData, c *C) {
}
c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
- c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
- expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() })
+ expectEqualWithin(c, time.Second, 0, func() interface{} {
+ st := pullq.Status()
+ return st.InProgress + st.Queued
+ })
if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
c.Assert(len(testPullLists), Equals, 2)
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 433eef5..40b291e 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -271,26 +271,23 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
}
assertStatusItem("InProgress", 0)
- assertStatusItem("Outstanding", 0)
assertStatusItem("Queued", 0)
listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
// Wait for worker to take request(s)
- expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
- expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
// Ensure status.json also reports work is happening
assertStatusItem("InProgress", float64(1))
- assertStatusItem("Outstanding", float64(listLen))
assertStatusItem("Queued", float64(listLen-1))
// Let worker proceed
close(gate)
// Wait for worker to finish
- expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
data, _ := GetBlock(testData.Locator1, false)
diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go
index 6ec5274..58e4966 100644
--- a/services/keepstore/work_queue.go
+++ b/services/keepstore/work_queue.go
@@ -85,10 +85,8 @@ package main
import "container/list"
type WorkQueue struct {
- countInProgress chan int
- countOutstanding chan int
- countQueued chan int
- newlist chan *list.List
+ getStatus chan WorkQueueStatus
+ newlist chan *list.List
// Workers get work items by reading from this channel.
NextItem <-chan interface{}
// Each worker must send struct{}{} to ReportDone exactly once
@@ -98,6 +96,11 @@ type WorkQueue struct {
ReportDone chan<- struct{}
}
+type WorkQueueStatus struct {
+ InProgress int
+ Queued int
+}
+
// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
@@ -105,33 +108,34 @@ func NewWorkQueue() *WorkQueue {
reportDone := make(chan struct{})
newList := make(chan *list.List)
b := WorkQueue{
- countQueued: make(chan int),
- countInProgress: make(chan int),
- countOutstanding: make(chan int),
- newlist: newList,
- NextItem: nextItem,
- ReportDone: reportDone,
+ getStatus: make(chan WorkQueueStatus),
+ newlist: newList,
+ NextItem: nextItem,
+ ReportDone: reportDone,
}
go func() {
// Read new work lists from the newlist channel.
- // Reply to "length" and "get next item" queries by
- // sending to the countQueued and nextItem channels
+ // Reply to "status" and "get next item" queries by
+ // sending to the getStatus and nextItem channels
// respectively. Return when the newlist channel
// closes.
todo := &list.List{}
- countInProgress := 0
+ status := WorkQueueStatus{}
// When we're done, close the output channel; workers will
// shut down next time they ask for new work.
defer close(nextItem)
- defer close(b.countInProgress)
- defer close(b.countOutstanding)
- defer close(b.countQueued)
+ defer close(b.getStatus)
+ // nextChan and nextVal are both nil when we have
+ // nothing to send; otherwise they are, respectively,
+ // the nextItem channel and the next work item to send
+ // to it.
var nextChan chan interface{}
var nextVal interface{}
- for newList != nil || countInProgress > 0 {
+
+ for newList != nil || status.InProgress > 0 {
select {
case p, ok := <-newList:
if !ok {
@@ -142,7 +146,8 @@ func NewWorkQueue() *WorkQueue {
if todo == nil {
todo = &list.List{}
}
- if todo.Len() == 0 {
+ status.Queued = todo.Len()
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
@@ -151,9 +156,10 @@ func NewWorkQueue() *WorkQueue {
nextVal = todo.Front().Value
}
case nextChan <- nextVal:
- countInProgress++
todo.Remove(todo.Front())
- if todo.Len() == 0 {
+ status.InProgress++
+ status.Queued--
+ if status.Queued == 0 {
// Stop sending work
nextChan = nil
nextVal = nil
@@ -161,10 +167,8 @@ func NewWorkQueue() *WorkQueue {
nextVal = todo.Front().Value
}
case <-reportDone:
- countInProgress--
- case b.countInProgress <- countInProgress:
- case b.countOutstanding <- todo.Len() + countInProgress:
- case b.countQueued <- todo.Len():
+ status.InProgress--
+ case b.getStatus <- status:
}
}
}()
@@ -184,31 +188,19 @@ func (b *WorkQueue) ReplaceQueue(list *list.List) {
// abandons any pending requests, but allows any pull request already
// in progress to continue.
//
-// After Close, CountX methods will return correct values, NextItem
-// will be closed, and ReplaceQueue will panic.
+// After Close, Status will return correct values, NextItem will be
+// closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// CountOutstanding returns the number of items in the queue or in
-// progress. A return value of 0 guarantees all existing work (work
-// that was sent to ReplaceQueue before CountOutstanding was called)
-// has completed.
-//
-func (b *WorkQueue) CountOutstanding() int {
- // If the channel is closed, we get zero, which is correct.
- return <-b.countOutstanding
-}
-
-// CountQueued returns the number of items in the current queue.
-//
-func (b *WorkQueue) CountQueued() int {
- return <-b.countQueued
-}
-
-// Len returns the number of items in progress.
+// Status returns an up-to-date WorkQueueStatus reflecting the current
+// queue status.
//
-func (b *WorkQueue) CountInProgress() int {
- return <-b.countInProgress
+func (b *WorkQueue) Status() WorkQueueStatus {
+ // If the channel is closed, we get the nil value of
+ // WorkQueueStatus, which is an accurate description of a
+ // finished queue.
+ return <-b.getStatus
}
diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go
index df0fa9c..7844a2b 100644
--- a/services/keepstore/work_queue_test.go
+++ b/services/keepstore/work_queue_test.go
@@ -84,9 +84,9 @@ func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f
}
}
-func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
- if l := b.CountQueued(); l != expectCountQueued {
- t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+ if l := b.Status().Queued; l != expectQueued {
+ t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
}
}
@@ -94,18 +94,25 @@ func TestWorkQueueDoneness(t *testing.T) {
b := NewWorkQueue()
defer b.Close()
b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
- expectCountQueued(t, b, 3)
+ expectQueued(t, b, 3)
+ gate := make(chan struct{})
go func() {
+ <-gate
for _ = range b.NextItem {
- //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+ <-gate
time.Sleep(time.Millisecond)
b.ReportDone <- struct{}{}
}
}()
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
- b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
- expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+ for i := 1; i <= 3; i++ {
+ gate <- struct{}{}
+ expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+ }
+ close(gate)
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelEmpty(t, b.NextItem)
}
@@ -114,10 +121,10 @@ func TestWorkQueueReadWrite(t *testing.T) {
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
b.ReplaceQueue(makeTestWorkList(input))
- expectCountQueued(t, b, len(input))
+ expectQueued(t, b, len(input))
doWorkItems(t, b, input)
expectChannelEmpty(t, b.NextItem)
@@ -148,7 +155,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
// finish.
b.ReplaceQueue(makeTestWorkList(input))
<-done
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
}
// After Close(), NextItem closes, work finishes, then stats return zero.
@@ -166,10 +173,10 @@ func TestWorkQueueClose(t *testing.T) {
// Wait for worker to take item 1
<-mark
b.Close()
- expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
// Tell worker to report done
mark <- struct{}{}
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelClosedWithin(t, time.Second, b.NextItem)
}
commit 5562d6d556a942b66ea392c1e9bc803f9b9733e7
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Aug 10 11:56:48 2015 -0400
6260: Expose queue sizes in /status.json. Fix sleep/race in trash_worker_test.
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index c4ecfb4..a6665f6 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -192,9 +192,17 @@ type PoolStatus struct {
Len int `json:"BuffersInUse"`
}
+type WorkQueueStatus struct {
+ InProgress int
+ Outstanding int
+ Queued int
+}
+
type NodeStatus struct {
Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
+ PullQueue WorkQueueStatus
+ TrashQueue WorkQueueStatus
Memory runtime.MemStats
}
@@ -203,7 +211,7 @@ var stLock sync.Mutex
func StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
- ReadNodeStatus(&st)
+ readNodeStatus(&st)
jstat, err := json.Marshal(&st)
stLock.Unlock()
if err == nil {
@@ -215,10 +223,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
}
}
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
vols := KeepVM.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*VolumeStatus, len(vols))
@@ -232,9 +238,24 @@ func ReadNodeStatus(st *NodeStatus) {
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
+ readWorkQueueStatus(&st.PullQueue, pullq)
+ readWorkQueueStatus(&st.TrashQueue, trashq)
runtime.ReadMemStats(&st.Memory)
}
+// Populate a WorkQueueStatus. This is not atomic, so race conditions
+// can cause InProgress + Queued != Outstanding.
+func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+ if q == nil {
+ // This should only happen during tests.
+ *st = WorkQueueStatus{}
+ return
+ }
+ st.InProgress = q.CountInProgress()
+ st.Outstanding = q.CountOutstanding()
+ st.Queued = q.CountQueued()
+}
+
// DeleteHandler processes DELETE requests.
//
// DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 0d4f9be..e8d390a 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -236,6 +236,9 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
}
func performTest(testData PullWorkerTestData, c *C) {
+ KeepVM = MakeTestVolumeManager(2)
+ defer KeepVM.Close()
+
RunTestPullWorker(c)
defer pullq.Close()
@@ -249,6 +252,7 @@ func performTest(testData PullWorkerTestData, c *C) {
GetContent = orig
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
processedPullLists[testData.name] = testData.response_body
if testData.read_error {
err = errors.New("Error getting data")
@@ -276,6 +280,10 @@ func performTest(testData PullWorkerTestData, c *C) {
}
}
+ c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+ c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
+ c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go
new file mode 100644
index 0000000..134b016
--- /dev/null
+++ b/services/keepstore/status_test.go
@@ -0,0 +1,15 @@
+package main
+
+import (
+ "encoding/json"
+)
+
+func getStatusItem(keys ...string) interface{} {
+ resp := IssueRequest(&RequestTester{"/status.json", "", "GET", nil})
+ var s interface{}
+ json.NewDecoder(resp.Body).Decode(&s)
+ for _, k := range keys {
+ s = s.(map[string]interface{})[k]
+ }
+ return s
+}
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 8268191..433eef5 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -258,8 +258,39 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
}
go RunTrashWorker(trashq)
+ // Install gate so all local operations block until we say go
+ gate := make(chan struct{})
+ for _, v := range vols {
+ v.(*MockVolume).Gate = gate
+ }
+
+ assertStatusItem := func(k string, expect float64) {
+ if v := getStatusItem("TrashQueue", k); v != expect {
+ t.Errorf("Got %s %v, expected %v", k, v, expect)
+ }
+ }
+
+ assertStatusItem("InProgress", 0)
+ assertStatusItem("Outstanding", 0)
+ assertStatusItem("Queued", 0)
+
+ listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
- time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+ // Wait for worker to take request(s)
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+
+ // Ensure status.json also reports work is happening
+ assertStatusItem("InProgress", float64(1))
+ assertStatusItem("Outstanding", float64(listLen))
+ assertStatusItem("Queued", float64(listLen-1))
+
+ // Let worker proceed
+ close(gate)
+
+ // Wait for worker to finish
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
// Verify Locator1 to be un/deleted as expected
data, _ := GetBlock(testData.Locator1, false)
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 2615019..d660017 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -22,13 +22,20 @@ type MockVolume struct {
// Readonly volumes return an error for Put, Delete, and
// Touch.
Readonly bool
- called map[string]int
- mutex sync.Mutex
+ // Every operation (except Status) starts by receiving from
+ // Gate. Send one value to unblock one operation; close the
+ // channel to unblock all. By default, it is a closed channel,
+ // so all operations proceed without blocking.
+ Gate chan struct{}
+ called map[string]int
+ mutex sync.Mutex
}
// CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
// volume.
func CreateMockVolume() *MockVolume {
+ gate := make(chan struct{})
+ close(gate)
return &MockVolume{
Store: make(map[string][]byte),
Timestamps: make(map[string]time.Time),
@@ -36,6 +43,7 @@ func CreateMockVolume() *MockVolume {
Touchable: true,
Readonly: false,
called: map[string]int{},
+ Gate: gate,
}
}
@@ -62,6 +70,7 @@ func (v *MockVolume) gotCall(method string) {
func (v *MockVolume) Get(loc string) ([]byte, error) {
v.gotCall("Get")
+ <-v.Gate
if v.Bad {
return nil, errors.New("Bad volume")
} else if block, ok := v.Store[loc]; ok {
@@ -74,6 +83,7 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
func (v *MockVolume) Put(loc string, block []byte) error {
v.gotCall("Put")
+ <-v.Gate
if v.Bad {
return errors.New("Bad volume")
}
@@ -86,6 +96,7 @@ func (v *MockVolume) Put(loc string, block []byte) error {
func (v *MockVolume) Touch(loc string) error {
v.gotCall("Touch")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}
@@ -98,6 +109,7 @@ func (v *MockVolume) Touch(loc string) error {
func (v *MockVolume) Mtime(loc string) (time.Time, error) {
v.gotCall("Mtime")
+ <-v.Gate
var mtime time.Time
var err error
if v.Bad {
@@ -112,6 +124,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) {
func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
v.gotCall("IndexTo")
+ <-v.Gate
for loc, block := range v.Store {
if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
continue
@@ -127,6 +140,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
func (v *MockVolume) Delete(loc string) error {
v.gotCall("Delete")
+ <-v.Gate
if v.Readonly {
return MethodDisabledError
}
commit ab689cf0a5c73e1fa0525416fa12aaf5ba88abc9
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Aug 5 23:38:12 2015 -0400
6260: gofmt
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index d1169d5..c4ecfb4 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -193,13 +193,14 @@ type PoolStatus struct {
}
type NodeStatus struct {
- Volumes []*VolumeStatus `json:"volumes"`
+ Volumes []*VolumeStatus `json:"volumes"`
BufferPool PoolStatus
Memory runtime.MemStats
}
var st NodeStatus
var stLock sync.Mutex
+
func StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
ReadNodeStatus(&st)
diff --git a/services/keepstore/perms_test.go b/services/keepstore/perms_test.go
index 85883b0..e43cb8d 100644
--- a/services/keepstore/perms_test.go
+++ b/services/keepstore/perms_test.go
@@ -48,7 +48,7 @@ func TestVerifySignatureExtraHints(t *testing.T) {
PermissionSecret = []byte(known_key)
defer func() { PermissionSecret = nil }()
- if VerifySignature(known_locator+"+K at xyzzy"+known_sig_hint, known_token) != nil{
+ if VerifySignature(known_locator+"+K at xyzzy"+known_sig_hint, known_token) != nil {
t.Fatal("Verify cannot handle hint before permission signature")
}
diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go
index 762abff..3e57407 100644
--- a/services/keepstore/pull_worker_integration_test.go
+++ b/services/keepstore/pull_worker_integration_test.go
@@ -107,7 +107,7 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
// Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+ defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if string(content) != testData.Content {
t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
@@ -116,7 +116,9 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
}
// Override GetContent to mock keepclient Get functionality
- defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+ defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+ GetContent = orig
+ }(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
if testData.GetError != "" {
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 822d202..0d4f9be 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -245,7 +245,7 @@ func performTest(testData PullWorkerTestData, c *C) {
processedPullLists := make(map[string]string)
// Override GetContent to mock keepclient Get functionality
- defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) {
+ defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
GetContent = orig
}(GetContent)
GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
@@ -264,7 +264,7 @@ func performTest(testData PullWorkerTestData, c *C) {
}
// Override PutContent to mock PutBlock functionality
- defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+ defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
PutContent = func(content []byte, locator string) (err error) {
if testData.put_error {
err = errors.New("Error putting data")
commit 4de0af809ffbef43d89cd1751e5d611a4b5445e9
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Aug 5 23:38:05 2015 -0400
6260: Fix races in keepstore tests. Expose WorkQueue in-progress/queued stats.
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
index 718e2ca..95d118e 100644
--- a/services/keepstore/bufferpool_test.go
+++ b/services/keepstore/bufferpool_test.go
@@ -21,6 +21,11 @@ func init() {
bufs = newBufferPool(maxBuffers, BLOCKSIZE)
}
+// Restore sane default after bufferpool's own tests
+func (s *BufferPoolSuite) TearDownTest(c *C) {
+ bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
bufs := newBufferPool(2, 10)
b1 := bufs.Get(1)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 3d67cf2..d55fd32 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -24,6 +24,7 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
for item := range nextItem {
pullRequest := item.(PullRequest)
err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+ pullq.ReportDone <- struct{}{}
if err == nil {
log.Printf("Pull %s success", pullRequest)
} else {
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 0833bc6..822d202 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -9,6 +9,7 @@ import (
"io"
"net/http"
"testing"
+ "time"
)
type PullWorkerTestSuite struct{}
@@ -22,7 +23,6 @@ func TestPullWorker(t *testing.T) {
var _ = Suite(&PullWorkerTestSuite{})
var testPullLists map[string]string
-var processedPullLists map[string]string
var readContent string
var readError error
var putContent []byte
@@ -39,7 +39,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
// This behavior is verified using these two maps in the
// "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
testPullLists = make(map[string]string)
- processedPullLists = make(map[string]string)
}
// Since keepstore does not come into picture in tests,
@@ -238,15 +237,18 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
func performTest(testData PullWorkerTestData, c *C) {
RunTestPullWorker(c)
+ defer pullq.Close()
currentTestData = testData
testPullLists[testData.name] = testData.response_body
- // Override GetContent to mock keepclient Get functionality
- defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
- GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
- reader io.ReadCloser, contentLength int64, url string, err error) {
+ processedPullLists := make(map[string]string)
+ // Override GetContent to mock keepclient Get functionality
+ defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) {
+ GetContent = orig
+ }(GetContent)
+ GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
processedPullLists[testData.name] = testData.response_body
if testData.read_error {
err = errors.New("Error getting data")
@@ -278,9 +280,7 @@ func performTest(testData PullWorkerTestData, c *C) {
c.Assert(response.Code, Equals, testData.response_code)
c.Assert(response.Body.String(), Equals, testData.response_body)
- expectWorkerChannelEmpty(c, pullq.NextItem)
-
- pullq.Close()
+ expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() })
if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
c.Assert(len(testPullLists), Equals, 2)
@@ -311,6 +311,8 @@ func performTest(testData PullWorkerTestData, c *C) {
c.Assert(string(putContent), Equals, testData.read_content)
}
}
+
+ expectChannelEmpty(c, pullq.NextItem)
}
type ClosingBuffer struct {
@@ -320,19 +322,3 @@ type ClosingBuffer struct {
func (cb *ClosingBuffer) Close() (err error) {
return
}
-
-func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
- select {
- case item := <-workerChannel:
- c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
- default:
- }
-}
-
-func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
- select {
- case item := <-workerChannel:
- c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
- default:
- }
-}
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 4fbe4bb..2cf2dc8 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -18,6 +18,7 @@ func RunTrashWorker(trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
TrashItem(trashRequest)
+ trashq.ReportDone <- struct{}{}
}
}
diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go
index 9509cac..6ec5274 100644
--- a/services/keepstore/work_queue.go
+++ b/services/keepstore/work_queue.go
@@ -85,76 +85,130 @@ package main
import "container/list"
type WorkQueue struct {
- newlist chan *list.List
- NextItem chan interface{}
+ countInProgress chan int
+ countOutstanding chan int
+ countQueued chan int
+ newlist chan *list.List
+ // Workers get work items by reading from this channel.
+ NextItem <-chan interface{}
+ // Each worker must send struct{}{} to ReportDone exactly once
+ // for each work item received from NextItem, when it stops
+ // working on that item (regardless of whether the work was
+ // successful).
+ ReportDone chan<- struct{}
}
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
//
func NewWorkQueue() *WorkQueue {
+ nextItem := make(chan interface{})
+ reportDone := make(chan struct{})
+ newList := make(chan *list.List)
b := WorkQueue{
- newlist: make(chan *list.List),
- NextItem: make(chan interface{}),
+ countQueued: make(chan int),
+ countInProgress: make(chan int),
+ countOutstanding: make(chan int),
+ newlist: newList,
+ NextItem: nextItem,
+ ReportDone: reportDone,
}
- go b.listen()
+ go func() {
+ // Read new work lists from the newlist channel.
+ // Reply to "length" and "get next item" queries by
+ // sending to the countQueued and nextItem channels
+ // respectively. Return when the newlist channel
+ // closes.
+
+ todo := &list.List{}
+ countInProgress := 0
+
+ // When we're done, close the output channel; workers will
+ // shut down next time they ask for new work.
+ defer close(nextItem)
+ defer close(b.countInProgress)
+ defer close(b.countOutstanding)
+ defer close(b.countQueued)
+
+ var nextChan chan interface{}
+ var nextVal interface{}
+ for newList != nil || countInProgress > 0 {
+ select {
+ case p, ok := <-newList:
+ if !ok {
+ // Closed, stop receiving
+ newList = nil
+ }
+ todo = p
+ if todo == nil {
+ todo = &list.List{}
+ }
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextChan = nextItem
+ nextVal = todo.Front().Value
+ }
+ case nextChan <- nextVal:
+ countInProgress++
+ todo.Remove(todo.Front())
+ if todo.Len() == 0 {
+ // Stop sending work
+ nextChan = nil
+ nextVal = nil
+ } else {
+ nextVal = todo.Front().Value
+ }
+ case <-reportDone:
+ countInProgress--
+ case b.countInProgress <- countInProgress:
+ case b.countOutstanding <- todo.Len() + countInProgress:
+ case b.countQueued <- todo.Len():
+ }
+ }
+ }()
return &b
}
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
//
func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, CountX methods will return correct values, NextItem
+// will be closed, and ReplaceQueue will panic.
//
func (b *WorkQueue) Close() {
close(b.newlist)
}
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// CountOutstanding returns the number of items in the queue or in
+// progress. A return value of 0 guarantees all existing work (work
+// that was sent to ReplaceQueue before CountOutstanding was called)
+// has completed.
//
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+func (b *WorkQueue) CountOutstanding() int {
+ // If the channel is closed, we get zero, which is correct.
+ return <-b.countOutstanding
+}
+
+// CountQueued returns the number of items in the current queue.
//
-func (b *WorkQueue) listen() {
- var current_item *list.Element
-
- // When we're done, close the output channel to shut down any
- // workers.
- defer close(b.NextItem)
-
- for {
- // If the current list is empty, wait for a new list before
- // even checking if workers are ready.
- if current_item == nil {
- if p, ok := <-b.newlist; ok {
- current_item = p.Front()
- } else {
- // The channel was closed; shut down.
- return
- }
- }
- select {
- case p, ok := <-b.newlist:
- if ok {
- current_item = p.Front()
- } else {
- // The input channel is closed; time to shut down
- return
- }
- case b.NextItem <- current_item.Value:
- current_item = current_item.Next()
- }
- }
+func (b *WorkQueue) CountQueued() int {
+ return <-b.countQueued
+}
+
+// Len returns the number of items in progress.
+//
+func (b *WorkQueue) CountInProgress() int {
+ return <-b.countInProgress
}
diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go
index 144e4c2..df0fa9c 100644
--- a/services/keepstore/work_queue_test.go
+++ b/services/keepstore/work_queue_test.go
@@ -2,9 +2,15 @@ package main
import (
"container/list"
+ "runtime"
"testing"
+ "time"
)
+type fatalfer interface {
+ Fatalf(string, ...interface{})
+}
+
func makeTestWorkList(ary []int) *list.List {
l := list.New()
for _, n := range ary {
@@ -13,50 +19,107 @@ func makeTestWorkList(ary []int) *list.List {
return l
}
-func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
select {
- case item := <-c:
- t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+ case item, ok := <-c:
+ if ok {
+ t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
+ }
default:
- // no-op
}
}
-func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
- if item, ok := <-c; !ok {
- t.Fatal("expected data on a closed channel")
- } else if item == nil {
- t.Fatal("expected data on an empty channel")
+func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
+ select {
+ case item, ok := <-c:
+ if !ok {
+ t.Fatalf("expected data on a closed channel")
+ }
+ return item
+ case <-time.After(time.Second):
+ t.Fatalf("expected data on an empty channel")
+ return nil
}
}
-func expectChannelClosed(t *testing.T, c <-chan interface{}) {
- received, ok := <-c
- if ok {
- t.Fatalf("Expected channel to be closed, but received %v instead", received)
+func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
+ select {
+ case received, ok := <-c:
+ if ok {
+ t.Fatalf("Expected channel to be closed, but received %+v instead", received)
+ }
+ case <-time.After(timeout):
+ t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
}
}
-func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
for i := range expected {
- actual, ok := <-c
- t.Logf("received %v", actual)
+ actual, ok := <-q.NextItem
if !ok {
- t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
- } else if actual.(int) != expected[i] {
- t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+ t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
+ }
+ q.ReportDone <- struct{}{}
+ if actual.(int) != expected[i] {
+ t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
}
}
}
+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+ ok := make(chan struct{})
+ giveup := false
+ go func() {
+ for f() != expect && !giveup {
+ time.Sleep(time.Millisecond)
+ }
+ close(ok)
+ }()
+ select {
+ case <-ok:
+ case <-time.After(timeout):
+ giveup = true
+ _, file, line, _ := runtime.Caller(1)
+ t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+ }
+}
+
+func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
+ if l := b.CountQueued(); l != expectCountQueued {
+ t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+ }
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+ b := NewWorkQueue()
+ defer b.Close()
+ b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+ expectCountQueued(t, b, 3)
+ go func() {
+ for _ = range b.NextItem {
+ //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+ time.Sleep(time.Millisecond)
+ b.ReportDone <- struct{}{}
+ }
+ }()
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
+ expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectChannelEmpty(t, b.NextItem)
+}
+
// Create a WorkQueue, generate a list for it, and instantiate a worker.
func TestWorkQueueReadWrite(t *testing.T) {
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
+ expectCountQueued(t, b, 0)
+
b.ReplaceQueue(makeTestWorkList(input))
+ expectCountQueued(t, b, len(input))
- expectFromChannel(t, b.NextItem, input)
+ doWorkItems(t, b, input)
expectChannelEmpty(t, b.NextItem)
b.Close()
}
@@ -66,6 +129,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
+ defer b.Close()
// First, demonstrate that nothing is available on the NextItem
// channel.
@@ -76,8 +140,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
//
done := make(chan int)
go func() {
- expectFromChannel(t, b.NextItem, input)
- b.Close()
+ doWorkItems(t, b, input)
done <- 1
}()
@@ -85,8 +148,29 @@ func TestWorkQueueEarlyRead(t *testing.T) {
// finish.
b.ReplaceQueue(makeTestWorkList(input))
<-done
+ expectCountQueued(t, b, 0)
+}
- expectChannelClosed(t, b.NextItem)
+// After Close(), NextItem closes, work finishes, then stats return zero.
+func TestWorkQueueClose(t *testing.T) {
+ b := NewWorkQueue()
+ input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+ mark := make(chan struct{})
+ go func() {
+ <-b.NextItem
+ mark <- struct{}{}
+ <-mark
+ b.ReportDone <- struct{}{}
+ }()
+ b.ReplaceQueue(makeTestWorkList(input))
+ // Wait for worker to take item 1
+ <-mark
+ b.Close()
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+ // Tell worker to report done
+ mark <- struct{}{}
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectChannelClosedWithin(t, time.Second, b.NextItem)
}
// Show that a reader may block when the manager's list is exhausted,
@@ -99,10 +183,11 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
)
b := NewWorkQueue()
+ defer b.Close()
sendmore := make(chan int)
done := make(chan int)
go func() {
- expectFromChannel(t, b.NextItem, inputBeforeBlock)
+ doWorkItems(t, b, inputBeforeBlock)
// Confirm that the channel is empty, so a subsequent read
// on it will block.
@@ -110,8 +195,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
// Signal that we're ready for more input.
sendmore <- 1
- expectFromChannel(t, b.NextItem, inputAfterBlock)
- b.Close()
+ doWorkItems(t, b, inputAfterBlock)
done <- 1
}()
@@ -136,14 +220,14 @@ func TestWorkQueueReplaceQueue(t *testing.T) {
// Read just the first five elements from the work list.
// Confirm that the channel is not empty.
- expectFromChannel(t, b.NextItem, firstInput[0:5])
+ doWorkItems(t, b, firstInput[0:5])
expectChannelNotEmpty(t, b.NextItem)
// Replace the work list and read five more elements.
// The old list should have been discarded and all new
// elements come from the new list.
b.ReplaceQueue(makeTestWorkList(replaceInput))
- expectFromChannel(t, b.NextItem, replaceInput[0:5])
+ doWorkItems(t, b, replaceInput[0:5])
b.Close()
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list