[ARVADOS] created: 5f4f8509e8fce0bcfb071188b5773a405b61f976
git at public.curoverse.com
git at public.curoverse.com
Fri Sep 19 16:14:26 EDT 2014
at 5f4f8509e8fce0bcfb071188b5773a405b61f976 (commit)
commit 5f4f8509e8fce0bcfb071188b5773a405b61f976
Author: Tim Pierce <twp at curoverse.com>
Date: Fri Sep 19 16:13:28 2014 -0400
3705: rename BlockWorkList -> WorkQueue
Per discussion at
https://github.com/curoverse/arvados/pull/8#discussion_r17637500
refs #3705.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index deb1c3d..0cfa1f3 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -591,7 +591,7 @@ func TestDeleteHandler(t *testing.T) {
//
// TODO(twp): test concurrency: launch 100 goroutines to update the
// pull list simultaneously. Make sure that none of them return 400
-// Bad Request and that pullmgr.GetList() returns a valid list.
+// Bad Request and that pullq.GetList() returns a valid list.
//
func TestPullHandler(t *testing.T) {
defer teardown()
@@ -665,7 +665,7 @@ func TestPullHandler(t *testing.T) {
// requests on it.
var output_list = make([]PullRequest, 3)
for i := 0; i < 3; i++ {
- item := <-pullmgr.NextItem
+ item := <-pullq.NextItem
if pr, ok := item.(PullRequest); ok {
output_list[i] = pr
} else {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 8095207..fde6087 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -472,10 +472,10 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
plist.PushBack(p)
}
- if pullmgr == nil {
- pullmgr = NewBlockWorkList()
+ if pullq == nil {
+ pullq = NewWorkQueue()
}
- pullmgr.ReplaceList(plist)
+ pullq.ReplaceQueue(plist)
}
// ==============================
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 06054f5..2437638 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -91,12 +91,12 @@ func (e *KeepError) Error() string {
// Initialized by the --volumes flag (or by FindKeepVolumes).
var KeepVM VolumeManager
-// The pull list manager is a singleton pull list (a list of blocks
+// The pull list queue is a singleton pull list (a list of blocks
// that the current keepstore process should be pulling from remote
// keepstore servers in order to increase data replication) with
// atomic update methods that are safe to use from multiple
// goroutines.
-var pullmgr *BlockWorkList
+var pullq *WorkQueue
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
diff --git a/services/keepstore/block_work_list.go b/services/keepstore/work_queue.go
similarity index 80%
rename from services/keepstore/block_work_list.go
rename to services/keepstore/work_queue.go
index 4c5aeb1..9509cac 100644
--- a/services/keepstore/block_work_list.go
+++ b/services/keepstore/work_queue.go
@@ -1,19 +1,19 @@
package main
-/* A BlockWorkList is an asynchronous thread-safe queue manager. It
+/* A WorkQueue is an asynchronous thread-safe queue manager. It
provides a channel from which items can be read off the queue, and
permits replacing the contents of the queue at any time.
- The overall work flow for a BlockWorkList is as follows:
+ The overall work flow for a WorkQueue is as follows:
- 1. A BlockWorkList is created with NewBlockWorkList(). This
- function instantiates a new BlockWorkList and starts a manager
+ 1. A WorkQueue is created with NewWorkQueue(). This
+ function instantiates a new WorkQueue and starts a manager
goroutine. The manager listens on an input channel
(manager.newlist) and an output channel (manager.NextItem).
2. The manager first waits for a new list of requests on the
newlist channel. When another goroutine calls
- manager.ReplaceList(lst), it sends lst over the newlist
+ manager.ReplaceQueue(lst), it sends lst over the newlist
channel to the manager. The manager goroutine now has
ownership of the list.
@@ -34,24 +34,24 @@ package main
output channel (signalling any workers to quit) and
terminates.
- Tasks currently handled by BlockWorkList:
+ Tasks currently handled by WorkQueue:
* the pull list
* the trash list
Example usage:
// Any kind of user-defined type can be used with the
- // BlockWorkList.
+ // WorkQueue.
type FrobRequest struct {
frob string
}
// Make a work list.
- froblist := NewBlockWorkList()
+ froblist := NewWorkQueue()
// Start a concurrent worker to read items from the NextItem
// channel until it is closed, deleting each one.
- go func(list BlockWorkList) {
+ go func(list WorkQueue) {
for i := range list.NextItem {
req := i.(FrobRequest)
frob.Run(req)
@@ -65,16 +65,16 @@ package main
// of FrobRequests, and give this list to the
// frob manager.
newfrobs := parseBody(req.Body)
- froblist.ReplaceList(newfrobs)
+ froblist.ReplaceQueue(newfrobs)
}).Methods("PUT")
- Methods available on a BlockWorkList:
+ Methods available on a WorkQueue:
- ReplaceList(list)
+ ReplaceQueue(list)
Replaces the current item list with a new one. The list
manager discards any unprocessed items on the existing
list and replaces it with the new one. If the worker is
- processing a list item when ReplaceList is called, it
+ processing a list item when ReplaceQueue is called, it
finishes processing before receiving items from the new
list.
Close()
@@ -84,16 +84,16 @@ package main
import "container/list"
-type BlockWorkList struct {
+type WorkQueue struct {
newlist chan *list.List
NextItem chan interface{}
}
-// NewBlockWorkList returns a new worklist, and launches a listener
+// NewWorkQueue returns a new worklist, and launches a listener
// goroutine that waits for work and farms it out to workers.
//
-func NewBlockWorkList() *BlockWorkList {
- b := BlockWorkList{
+func NewWorkQueue() *WorkQueue {
+ b := WorkQueue{
newlist: make(chan *list.List),
NextItem: make(chan interface{}),
}
@@ -101,11 +101,11 @@ func NewBlockWorkList() *BlockWorkList {
return &b
}
-// ReplaceList sends a new list of pull requests to the manager goroutine.
+// ReplaceQueue sends a new list of pull requests to the manager goroutine.
// The manager will discard any outstanding pull list and begin
// working on the new list.
//
-func (b *BlockWorkList) ReplaceList(list *list.List) {
+func (b *WorkQueue) ReplaceQueue(list *list.List) {
b.newlist <- list
}
@@ -113,7 +113,7 @@ func (b *BlockWorkList) ReplaceList(list *list.List) {
// completes any pull request in progress and abandons any pending
// requests.
//
-func (b *BlockWorkList) Close() {
+func (b *WorkQueue) Close() {
close(b.newlist)
}
@@ -127,7 +127,7 @@ func (b *BlockWorkList) Close() {
// doubly linked list, holding on to the current item will keep
// it from garbage collection.
//
-func (b *BlockWorkList) listen() {
+func (b *WorkQueue) listen() {
var current_item *list.Element
// When we're done, close the output channel to shut down any
diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/work_queue_test.go
similarity index 83%
rename from services/keepstore/block_work_list_test.go
rename to services/keepstore/work_queue_test.go
index c3df400..144e4c2 100644
--- a/services/keepstore/block_work_list_test.go
+++ b/services/keepstore/work_queue_test.go
@@ -49,12 +49,12 @@ func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
}
}
-// Create a BlockWorkList, generate a list for it, and instantiate a worker.
-func TestBlockWorkListReadWrite(t *testing.T) {
+// Create a WorkQueue, generate a list for it, and instantiate a worker.
+func TestWorkQueueReadWrite(t *testing.T) {
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
- b := NewBlockWorkList()
- b.ReplaceList(makeTestWorkList(input))
+ b := NewWorkQueue()
+ b.ReplaceQueue(makeTestWorkList(input))
expectFromChannel(t, b.NextItem, input)
expectChannelEmpty(t, b.NextItem)
@@ -62,10 +62,10 @@ func TestBlockWorkListReadWrite(t *testing.T) {
}
// Start a worker before the list has any input.
-func TestBlockWorkListEarlyRead(t *testing.T) {
+func TestWorkQueueEarlyRead(t *testing.T) {
var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
- b := NewBlockWorkList()
+ b := NewWorkQueue()
// First, demonstrate that nothing is available on the NextItem
// channel.
@@ -83,7 +83,7 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
// Feed the blocklist a new worklist, and wait for the worker to
// finish.
- b.ReplaceList(makeTestWorkList(input))
+ b.ReplaceQueue(makeTestWorkList(input))
<-done
expectChannelClosed(t, b.NextItem)
@@ -92,13 +92,13 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
// Show that a reader may block when the manager's list is exhausted,
// and that the reader resumes automatically when new data is
// available.
-func TestBlockWorkListReaderBlocks(t *testing.T) {
+func TestWorkQueueReaderBlocks(t *testing.T) {
var (
inputBeforeBlock = []int{1, 2, 3, 4, 5}
inputAfterBlock = []int{6, 7, 8, 9, 10}
)
- b := NewBlockWorkList()
+ b := NewWorkQueue()
sendmore := make(chan int)
done := make(chan int)
go func() {
@@ -117,22 +117,22 @@ func TestBlockWorkListReaderBlocks(t *testing.T) {
// Write a slice of the first five elements and wait for the
// reader to signal that it's ready for us to send more input.
- b.ReplaceList(makeTestWorkList(inputBeforeBlock))
+ b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
<-sendmore
- b.ReplaceList(makeTestWorkList(inputAfterBlock))
+ b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
// Wait for the reader to complete.
<-done
}
// Replace one active work list with another.
-func TestBlockWorkListReplaceList(t *testing.T) {
+func TestWorkQueueReplaceQueue(t *testing.T) {
var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
- b := NewBlockWorkList()
- b.ReplaceList(makeTestWorkList(firstInput))
+ b := NewWorkQueue()
+ b.ReplaceQueue(makeTestWorkList(firstInput))
// Read just the first five elements from the work list.
// Confirm that the channel is not empty.
@@ -142,7 +142,7 @@ func TestBlockWorkListReplaceList(t *testing.T) {
// Replace the work list and read five more elements.
// The old list should have been discarded and all new
// elements come from the new list.
- b.ReplaceList(makeTestWorkList(replaceInput))
+ b.ReplaceQueue(makeTestWorkList(replaceInput))
expectFromChannel(t, b.NextItem, replaceInput[0:5])
b.Close()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list