[ARVADOS] created: b48ad53210709a52574399cb80d6809164530511

git at public.curoverse.com git at public.curoverse.com
Thu Aug 6 01:10:34 EDT 2015


        at  b48ad53210709a52574399cb80d6809164530511 (commit)


commit b48ad53210709a52574399cb80d6809164530511
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Aug 6 00:56:05 2015 -0400

    6260: Expose queue sizes in /status.json. Fix sleep/race in trash_worker_test.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index c4ecfb4..a6665f6 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -192,9 +192,17 @@ type PoolStatus struct {
 	Len   int    `json:"BuffersInUse"`
 }
 
+type WorkQueueStatus struct {
+	InProgress  int
+	Outstanding int
+	Queued      int
+}
+
 type NodeStatus struct {
 	Volumes    []*VolumeStatus `json:"volumes"`
 	BufferPool PoolStatus
+	PullQueue  WorkQueueStatus
+	TrashQueue WorkQueueStatus
 	Memory     runtime.MemStats
 }
 
@@ -203,7 +211,7 @@ var stLock sync.Mutex
 
 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
 	stLock.Lock()
-	ReadNodeStatus(&st)
+	readNodeStatus(&st)
 	jstat, err := json.Marshal(&st)
 	stLock.Unlock()
 	if err == nil {
@@ -215,10 +223,8 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 }
 
-// ReadNodeStatus populates the given NodeStatus struct with current
-// values.
-//
-func ReadNodeStatus(st *NodeStatus) {
+// populate the given NodeStatus struct with current values.
+func readNodeStatus(st *NodeStatus) {
 	vols := KeepVM.AllReadable()
 	if cap(st.Volumes) < len(vols) {
 		st.Volumes = make([]*VolumeStatus, len(vols))
@@ -232,9 +238,24 @@ func ReadNodeStatus(st *NodeStatus) {
 	st.BufferPool.Alloc = bufs.Alloc()
 	st.BufferPool.Cap = bufs.Cap()
 	st.BufferPool.Len = bufs.Len()
+	readWorkQueueStatus(&st.PullQueue, pullq)
+	readWorkQueueStatus(&st.TrashQueue, trashq)
 	runtime.ReadMemStats(&st.Memory)
 }
 
+// Populate a WorkQueueStatus. This is not atomic, so race conditions
+// can cause InProgress + Queued != Outstanding.
+func readWorkQueueStatus(st *WorkQueueStatus, q *WorkQueue) {
+	if q == nil {
+		// This should only happen during tests.
+		*st = WorkQueueStatus{}
+		return
+	}
+	st.InProgress = q.CountInProgress()
+	st.Outstanding = q.CountOutstanding()
+	st.Queued = q.CountQueued()
+}
+
 // DeleteHandler processes DELETE requests.
 //
 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 0d4f9be..e8d390a 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -236,6 +236,9 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
 }
 
 func performTest(testData PullWorkerTestData, c *C) {
+	KeepVM = MakeTestVolumeManager(2)
+	defer KeepVM.Close()
+
 	RunTestPullWorker(c)
 	defer pullq.Close()
 
@@ -249,6 +252,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 		GetContent = orig
 	}(GetContent)
 	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
+		c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(1))
 		processedPullLists[testData.name] = testData.response_body
 		if testData.read_error {
 			err = errors.New("Error getting data")
@@ -276,6 +280,10 @@ func performTest(testData PullWorkerTestData, c *C) {
 		}
 	}
 
+	c.Assert(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
+	c.Assert(getStatusItem("PullQueue", "Outstanding"), Equals, float64(0))
+	c.Assert(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+
 	response := IssueRequest(&testData.req)
 	c.Assert(response.Code, Equals, testData.response_code)
 	c.Assert(response.Body.String(), Equals, testData.response_body)
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 8268191..433eef5 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -258,8 +258,39 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
 	}
 	go RunTrashWorker(trashq)
 
+	// Install gate so all local operations block until we say go
+	gate := make(chan struct{})
+	for _, v := range vols {
+		v.(*MockVolume).Gate = gate
+	}
+
+	assertStatusItem := func(k string, expect float64) {
+		if v := getStatusItem("TrashQueue", k); v != expect {
+			t.Errorf("Got %s %v, expected %v", k, v, expect)
+		}
+	}
+
+	assertStatusItem("InProgress", 0)
+	assertStatusItem("Outstanding", 0)
+	assertStatusItem("Queued", 0)
+
+	listLen := trashList.Len()
 	trashq.ReplaceQueue(trashList)
-	time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+	// Wait for worker to take request(s)
+	expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
+	expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+
+	// Ensure status.json also reports work is happening
+	assertStatusItem("InProgress", float64(1))
+	assertStatusItem("Outstanding", float64(listLen))
+	assertStatusItem("Queued", float64(listLen-1))
+
+	// Let worker proceed
+	close(gate)
+
+	// Wait for worker to finish
+	expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
 
 	// Verify Locator1 to be un/deleted as expected
 	data, _ := GetBlock(testData.Locator1, false)
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 2615019..d660017 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -22,13 +22,20 @@ type MockVolume struct {
 	// Readonly volumes return an error for Put, Delete, and
 	// Touch.
 	Readonly bool
-	called   map[string]int
-	mutex    sync.Mutex
+	// Every operation (except Status) starts by receiving from
+	// Gate. Send one value to unblock one operation; close the
+	// channel to unblock all. By default, it is a closed channel,
+	// so all operations proceed without blocking.
+	Gate   chan struct{}
+	called map[string]int
+	mutex  sync.Mutex
 }
 
 // CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
 // volume.
 func CreateMockVolume() *MockVolume {
+	gate := make(chan struct{})
+	close(gate)
 	return &MockVolume{
 		Store:      make(map[string][]byte),
 		Timestamps: make(map[string]time.Time),
@@ -36,6 +43,7 @@ func CreateMockVolume() *MockVolume {
 		Touchable:  true,
 		Readonly:   false,
 		called:     map[string]int{},
+		Gate:       gate,
 	}
 }
 
@@ -62,6 +70,7 @@ func (v *MockVolume) gotCall(method string) {
 
 func (v *MockVolume) Get(loc string) ([]byte, error) {
 	v.gotCall("Get")
+	<-v.Gate
 	if v.Bad {
 		return nil, errors.New("Bad volume")
 	} else if block, ok := v.Store[loc]; ok {
@@ -74,6 +83,7 @@ func (v *MockVolume) Get(loc string) ([]byte, error) {
 
 func (v *MockVolume) Put(loc string, block []byte) error {
 	v.gotCall("Put")
+	<-v.Gate
 	if v.Bad {
 		return errors.New("Bad volume")
 	}
@@ -86,6 +96,7 @@ func (v *MockVolume) Put(loc string, block []byte) error {
 
 func (v *MockVolume) Touch(loc string) error {
 	v.gotCall("Touch")
+	<-v.Gate
 	if v.Readonly {
 		return MethodDisabledError
 	}
@@ -98,6 +109,7 @@ func (v *MockVolume) Touch(loc string) error {
 
 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
 	v.gotCall("Mtime")
+	<-v.Gate
 	var mtime time.Time
 	var err error
 	if v.Bad {
@@ -112,6 +124,7 @@ func (v *MockVolume) Mtime(loc string) (time.Time, error) {
 
 func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
 	v.gotCall("IndexTo")
+	<-v.Gate
 	for loc, block := range v.Store {
 		if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
 			continue
@@ -127,6 +140,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
 
 func (v *MockVolume) Delete(loc string) error {
 	v.gotCall("Delete")
+	<-v.Gate
 	if v.Readonly {
 		return MethodDisabledError
 	}

commit 69573d5dc48d026b5ad49652c6941d97aa3d7784
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Aug 5 23:38:12 2015 -0400

    6260: gofmt

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index d1169d5..c4ecfb4 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -193,13 +193,14 @@ type PoolStatus struct {
 }
 
 type NodeStatus struct {
-	Volumes    []*VolumeStatus  `json:"volumes"`
+	Volumes    []*VolumeStatus `json:"volumes"`
 	BufferPool PoolStatus
 	Memory     runtime.MemStats
 }
 
 var st NodeStatus
 var stLock sync.Mutex
+
 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
 	stLock.Lock()
 	ReadNodeStatus(&st)
diff --git a/services/keepstore/perms_test.go b/services/keepstore/perms_test.go
index 85883b0..e43cb8d 100644
--- a/services/keepstore/perms_test.go
+++ b/services/keepstore/perms_test.go
@@ -48,7 +48,7 @@ func TestVerifySignatureExtraHints(t *testing.T) {
 	PermissionSecret = []byte(known_key)
 	defer func() { PermissionSecret = nil }()
 
-	if VerifySignature(known_locator+"+K at xyzzy"+known_sig_hint, known_token) != nil{
+	if VerifySignature(known_locator+"+K at xyzzy"+known_sig_hint, known_token) != nil {
 		t.Fatal("Verify cannot handle hint before permission signature")
 	}
 
diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go
index 762abff..3e57407 100644
--- a/services/keepstore/pull_worker_integration_test.go
+++ b/services/keepstore/pull_worker_integration_test.go
@@ -107,7 +107,7 @@ func TestPullWorkerIntegration_GetExistingLocator(t *testing.T) {
 func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pullRequest PullRequest, t *testing.T) {
 
 	// Override PutContent to mock PutBlock functionality
-	defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+	defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
 	PutContent = func(content []byte, locator string) (err error) {
 		if string(content) != testData.Content {
 			t.Errorf("PutContent invoked with unexpected data. Expected: %s; Found: %s", testData.Content, content)
@@ -116,7 +116,9 @@ func performPullWorkerIntegrationTest(testData PullWorkIntegrationTestData, pull
 	}
 
 	// Override GetContent to mock keepclient Get functionality
-	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
+	defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
+		GetContent = orig
+	}(GetContent)
 	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
 		reader io.ReadCloser, contentLength int64, url string, err error) {
 		if testData.GetError != "" {
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 822d202..0d4f9be 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -245,7 +245,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	processedPullLists := make(map[string]string)
 
 	// Override GetContent to mock keepclient Get functionality
-	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) {
+	defer func(orig func(string, *keepclient.KeepClient) (io.ReadCloser, int64, string, error)) {
 		GetContent = orig
 	}(GetContent)
 	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
@@ -264,7 +264,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	}
 
 	// Override PutContent to mock PutBlock functionality
-	defer func(orig func([]byte, string)(error)) { PutContent = orig }(PutContent)
+	defer func(orig func([]byte, string) error) { PutContent = orig }(PutContent)
 	PutContent = func(content []byte, locator string) (err error) {
 		if testData.put_error {
 			err = errors.New("Error putting data")

commit 0b96908a7acdb1e4157ed57464de6023032c06cb
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Aug 5 23:38:05 2015 -0400

    6260: Fix races in keepstore tests. Expose WorkQueue in-progress/queued stats.

diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
index 718e2ca..95d118e 100644
--- a/services/keepstore/bufferpool_test.go
+++ b/services/keepstore/bufferpool_test.go
@@ -21,6 +21,11 @@ func init() {
 	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
 }
 
+// Restore sane default after bufferpool's own tests
+func (s *BufferPoolSuite) TearDownTest(c *C) {
+	bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+}
+
 func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
 	bufs := newBufferPool(2, 10)
 	b1 := bufs.Get(1)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 3d67cf2..d55fd32 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -24,6 +24,7 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
 	for item := range nextItem {
 		pullRequest := item.(PullRequest)
 		err := PullItemAndProcess(item.(PullRequest), GenerateRandomApiToken(), keepClient)
+		pullq.ReportDone <- struct{}{}
 		if err == nil {
 			log.Printf("Pull %s success", pullRequest)
 		} else {
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 0833bc6..822d202 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -9,6 +9,7 @@ import (
 	"io"
 	"net/http"
 	"testing"
+	"time"
 )
 
 type PullWorkerTestSuite struct{}
@@ -22,7 +23,6 @@ func TestPullWorker(t *testing.T) {
 var _ = Suite(&PullWorkerTestSuite{})
 
 var testPullLists map[string]string
-var processedPullLists map[string]string
 var readContent string
 var readError error
 var putContent []byte
@@ -39,7 +39,6 @@ func (s *PullWorkerTestSuite) SetUpTest(c *C) {
 	// This behavior is verified using these two maps in the
 	// "TestPullWorker_pull_list_with_two_items_latest_replacing_old"
 	testPullLists = make(map[string]string)
-	processedPullLists = make(map[string]string)
 }
 
 // Since keepstore does not come into picture in tests,
@@ -238,15 +237,18 @@ func (s *PullWorkerTestSuite) TestPullWorker_invalid_data_manager_token(c *C) {
 
 func performTest(testData PullWorkerTestData, c *C) {
 	RunTestPullWorker(c)
+	defer pullq.Close()
 
 	currentTestData = testData
 	testPullLists[testData.name] = testData.response_body
 
-	// Override GetContent to mock keepclient Get functionality
-	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) { GetContent = orig }(GetContent)
-	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
-		reader io.ReadCloser, contentLength int64, url string, err error) {
+	processedPullLists := make(map[string]string)
 
+	// Override GetContent to mock keepclient Get functionality
+	defer func(orig func(string, *keepclient.KeepClient)(io.ReadCloser, int64, string, error)) {
+		GetContent = orig
+	}(GetContent)
+	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (reader io.ReadCloser, contentLength int64, url string, err error) {
 		processedPullLists[testData.name] = testData.response_body
 		if testData.read_error {
 			err = errors.New("Error getting data")
@@ -278,9 +280,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	c.Assert(response.Code, Equals, testData.response_code)
 	c.Assert(response.Body.String(), Equals, testData.response_body)
 
-	expectWorkerChannelEmpty(c, pullq.NextItem)
-
-	pullq.Close()
+	expectEqualWithin(c, time.Second, 0, func() interface{} { return pullq.CountOutstanding() })
 
 	if testData.name == "TestPullWorker_pull_list_with_two_items_latest_replacing_old" {
 		c.Assert(len(testPullLists), Equals, 2)
@@ -311,6 +311,8 @@ func performTest(testData PullWorkerTestData, c *C) {
 			c.Assert(string(putContent), Equals, testData.read_content)
 		}
 	}
+
+	expectChannelEmpty(c, pullq.NextItem)
 }
 
 type ClosingBuffer struct {
@@ -320,19 +322,3 @@ type ClosingBuffer struct {
 func (cb *ClosingBuffer) Close() (err error) {
 	return
 }
-
-func expectWorkerChannelEmpty(c *C, workerChannel <-chan interface{}) {
-	select {
-	case item := <-workerChannel:
-		c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-	default:
-	}
-}
-
-func expectWorkerChannelNotEmpty(c *C, workerChannel <-chan interface{}) {
-	select {
-	case item := <-workerChannel:
-		c.Fatalf("Received value (%v) from channel that was expected to be empty", item)
-	default:
-	}
-}
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 4fbe4bb..2cf2dc8 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -18,6 +18,7 @@ func RunTrashWorker(trashq *WorkQueue) {
 	for item := range trashq.NextItem {
 		trashRequest := item.(TrashRequest)
 		TrashItem(trashRequest)
+		trashq.ReportDone <- struct{}{}
 	}
 }
 
diff --git a/services/keepstore/work_queue.go b/services/keepstore/work_queue.go
index 9509cac..6ec5274 100644
--- a/services/keepstore/work_queue.go
+++ b/services/keepstore/work_queue.go
@@ -85,76 +85,130 @@ package main
 import "container/list"
 
 type WorkQueue struct {
-	newlist  chan *list.List
-	NextItem chan interface{}
+	countInProgress  chan int
+	countOutstanding chan int
+	countQueued      chan int
+	newlist          chan *list.List
+	// Workers get work items by reading from this channel.
+	NextItem <-chan interface{}
+	// Each worker must send struct{}{} to ReportDone exactly once
+	// for each work item received from NextItem, when it stops
+	// working on that item (regardless of whether the work was
+	// successful).
+	ReportDone chan<- struct{}
 }
 
-// NewWorkQueue returns a new worklist, and launches a listener
-// goroutine that waits for work and farms it out to workers.
+// NewWorkQueue returns a new empty WorkQueue.
 //
 func NewWorkQueue() *WorkQueue {
+	nextItem := make(chan interface{})
+	reportDone := make(chan struct{})
+	newList := make(chan *list.List)
 	b := WorkQueue{
-		newlist:  make(chan *list.List),
-		NextItem: make(chan interface{}),
+		countQueued:      make(chan int),
+		countInProgress:  make(chan int),
+		countOutstanding: make(chan int),
+		newlist:          newList,
+		NextItem:         nextItem,
+		ReportDone:       reportDone,
 	}
-	go b.listen()
+	go func() {
+		// Read new work lists from the newlist channel.
+		// Reply to "length" and "get next item" queries by
+		// sending to the countQueued and nextItem channels
+		// respectively. Return when the newlist channel
+		// closes.
+
+		todo := &list.List{}
+		countInProgress := 0
+
+		// When we're done, close the output channel; workers will
+		// shut down next time they ask for new work.
+		defer close(nextItem)
+		defer close(b.countInProgress)
+		defer close(b.countOutstanding)
+		defer close(b.countQueued)
+
+		var nextChan chan interface{}
+		var nextVal interface{}
+		for newList != nil || countInProgress > 0 {
+			select {
+			case p, ok := <-newList:
+				if !ok {
+					// Closed, stop receiving
+					newList = nil
+				}
+				todo = p
+				if todo == nil {
+					todo = &list.List{}
+				}
+				if todo.Len() == 0 {
+					// Stop sending work
+					nextChan = nil
+					nextVal = nil
+				} else {
+					nextChan = nextItem
+					nextVal = todo.Front().Value
+				}
+			case nextChan <- nextVal:
+				countInProgress++
+				todo.Remove(todo.Front())
+				if todo.Len() == 0 {
+					// Stop sending work
+					nextChan = nil
+					nextVal = nil
+				} else {
+					nextVal = todo.Front().Value
+				}
+			case <-reportDone:
+				countInProgress--
+			case b.countInProgress <- countInProgress:
+			case b.countOutstanding <- todo.Len() + countInProgress:
+			case b.countQueued <- todo.Len():
+			}
+		}
+	}()
 	return &b
 }
 
-// ReplaceQueue sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
+// ReplaceQueue abandons any work items left in the existing queue,
+// and starts giving workers items from the given list. After giving
+// it to ReplaceQueue, the caller must not read or write the given
+// list.
 //
 func (b *WorkQueue) ReplaceQueue(list *list.List) {
 	b.newlist <- list
 }
 
 // Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
+// abandons any pending requests, but allows any pull request already
+// in progress to continue.
+//
+// After Close, CountX methods will return correct values, NextItem
+// will be closed, and ReplaceQueue will panic.
 //
 func (b *WorkQueue) Close() {
 	close(b.newlist)
 }
 
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-// listen takes ownership of the list that is passed to it.
+// CountOutstanding returns the number of items in the queue or in
+// progress. A return value of 0 guarantees all existing work (work
+// that was sent to ReplaceQueue before CountOutstanding was called)
+// has completed.
 //
-// Note that the routine does not ever need to access the list
-// itself once the current_item has been initialized, so we do
-// not bother to keep a pointer to the list. Because it is a
-// doubly linked list, holding on to the current item will keep
-// it from garbage collection.
+func (b *WorkQueue) CountOutstanding() int {
+	// If the channel is closed, we get zero, which is correct.
+	return <-b.countOutstanding
+}
+
+// CountQueued returns the number of items in the current queue.
 //
-func (b *WorkQueue) listen() {
-	var current_item *list.Element
-
-	// When we're done, close the output channel to shut down any
-	// workers.
-	defer close(b.NextItem)
-
-	for {
-		// If the current list is empty, wait for a new list before
-		// even checking if workers are ready.
-		if current_item == nil {
-			if p, ok := <-b.newlist; ok {
-				current_item = p.Front()
-			} else {
-				// The channel was closed; shut down.
-				return
-			}
-		}
-		select {
-		case p, ok := <-b.newlist:
-			if ok {
-				current_item = p.Front()
-			} else {
-				// The input channel is closed; time to shut down
-				return
-			}
-		case b.NextItem <- current_item.Value:
-			current_item = current_item.Next()
-		}
-	}
+func (b *WorkQueue) CountQueued() int {
+	return <-b.countQueued
+}
+
+// Len returns the number of items in progress.
+//
+func (b *WorkQueue) CountInProgress() int {
+	return <-b.countInProgress
 }
diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go
index 144e4c2..df0fa9c 100644
--- a/services/keepstore/work_queue_test.go
+++ b/services/keepstore/work_queue_test.go
@@ -2,9 +2,15 @@ package main
 
 import (
 	"container/list"
+	"runtime"
 	"testing"
+	"time"
 )
 
+type fatalfer interface {
+	Fatalf(string, ...interface{})
+}
+
 func makeTestWorkList(ary []int) *list.List {
 	l := list.New()
 	for _, n := range ary {
@@ -13,50 +19,107 @@ func makeTestWorkList(ary []int) *list.List {
 	return l
 }
 
-func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
 	select {
-	case item := <-c:
-		t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+	case item, ok := <-c:
+		if ok {
+			t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
+		}
 	default:
-		// no-op
 	}
 }
 
-func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
-	if item, ok := <-c; !ok {
-		t.Fatal("expected data on a closed channel")
-	} else if item == nil {
-		t.Fatal("expected data on an empty channel")
+func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
+	select {
+	case item, ok := <-c:
+		if !ok {
+			t.Fatalf("expected data on a closed channel")
+		}
+		return item
+	case <-time.After(time.Second):
+		t.Fatalf("expected data on an empty channel")
+		return nil
 	}
 }
 
-func expectChannelClosed(t *testing.T, c <-chan interface{}) {
-	received, ok := <-c
-	if ok {
-		t.Fatalf("Expected channel to be closed, but received %v instead", received)
+func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
+	select {
+	case received, ok := <-c:
+		if ok {
+			t.Fatalf("Expected channel to be closed, but received %+v instead", received)
+		}
+	case <-time.After(timeout):
+		t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
 	}
 }
 
-func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
 	for i := range expected {
-		actual, ok := <-c
-		t.Logf("received %v", actual)
+		actual, ok := <-q.NextItem
 		if !ok {
-			t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
-		} else if actual.(int) != expected[i] {
-			t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+			t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
+		}
+		q.ReportDone <- struct{}{}
+		if actual.(int) != expected[i] {
+			t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
 		}
 	}
 }
 
+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+	ok := make(chan struct{})
+	giveup := false
+	go func() {
+		for f() != expect && !giveup {
+			time.Sleep(time.Millisecond)
+		}
+		close(ok)
+	}()
+	select {
+	case <-ok:
+	case <-time.After(timeout):
+		giveup = true
+		_, file, line, _ := runtime.Caller(1)
+		t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+	}
+}
+
+func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
+	if l := b.CountQueued(); l != expectCountQueued {
+		t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+	}
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+	b := NewWorkQueue()
+	defer b.Close()
+	b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+	expectCountQueued(t, b, 3)
+	go func() {
+		for _ = range b.NextItem {
+			//time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+			time.Sleep(time.Millisecond)
+			b.ReportDone <- struct{}{}
+		}
+	}()
+	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+	b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
+	expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
+	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+	expectChannelEmpty(t, b.NextItem)
+}
+
 // Create a WorkQueue, generate a list for it, and instantiate a worker.
 func TestWorkQueueReadWrite(t *testing.T) {
 	var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
 	b := NewWorkQueue()
+	expectCountQueued(t, b, 0)
+
 	b.ReplaceQueue(makeTestWorkList(input))
+	expectCountQueued(t, b, len(input))
 
-	expectFromChannel(t, b.NextItem, input)
+	doWorkItems(t, b, input)
 	expectChannelEmpty(t, b.NextItem)
 	b.Close()
 }
@@ -66,6 +129,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
 	var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
 
 	b := NewWorkQueue()
+	defer b.Close()
 
 	// First, demonstrate that nothing is available on the NextItem
 	// channel.
@@ -76,8 +140,7 @@ func TestWorkQueueEarlyRead(t *testing.T) {
 	//
 	done := make(chan int)
 	go func() {
-		expectFromChannel(t, b.NextItem, input)
-		b.Close()
+		doWorkItems(t, b, input)
 		done <- 1
 	}()
 
@@ -85,8 +148,29 @@ func TestWorkQueueEarlyRead(t *testing.T) {
 	// finish.
 	b.ReplaceQueue(makeTestWorkList(input))
 	<-done
+	expectCountQueued(t, b, 0)
+}
 
-	expectChannelClosed(t, b.NextItem)
+// After Close(), NextItem closes, work finishes, then stats return zero.
+func TestWorkQueueClose(t *testing.T) {
+	b := NewWorkQueue()
+	input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+	mark := make(chan struct{})
+	go func() {
+		<-b.NextItem
+		mark <- struct{}{}
+		<-mark
+		b.ReportDone <- struct{}{}
+	}()
+	b.ReplaceQueue(makeTestWorkList(input))
+	// Wait for worker to take item 1
+	<-mark
+	b.Close()
+	expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+	// Tell worker to report done
+	mark <- struct{}{}
+	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+	expectChannelClosedWithin(t, time.Second, b.NextItem)
 }
 
 // Show that a reader may block when the manager's list is exhausted,
@@ -99,10 +183,11 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 	)
 
 	b := NewWorkQueue()
+	defer b.Close()
 	sendmore := make(chan int)
 	done := make(chan int)
 	go func() {
-		expectFromChannel(t, b.NextItem, inputBeforeBlock)
+		doWorkItems(t, b, inputBeforeBlock)
 
 		// Confirm that the channel is empty, so a subsequent read
 		// on it will block.
@@ -110,8 +195,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) {
 
 		// Signal that we're ready for more input.
 		sendmore <- 1
-		expectFromChannel(t, b.NextItem, inputAfterBlock)
-		b.Close()
+		doWorkItems(t, b, inputAfterBlock)
 		done <- 1
 	}()
 
@@ -136,14 +220,14 @@ func TestWorkQueueReplaceQueue(t *testing.T) {
 
 	// Read just the first five elements from the work list.
 	// Confirm that the channel is not empty.
-	expectFromChannel(t, b.NextItem, firstInput[0:5])
+	doWorkItems(t, b, firstInput[0:5])
 	expectChannelNotEmpty(t, b.NextItem)
 
 	// Replace the work list and read five more elements.
 	// The old list should have been discarded and all new
 	// elements come from the new list.
 	b.ReplaceQueue(makeTestWorkList(replaceInput))
-	expectFromChannel(t, b.NextItem, replaceInput[0:5])
+	doWorkItems(t, b, replaceInput[0:5])
 
 	b.Close()
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list