[ARVADOS] created: 0bf498cd8a240f70517d44e3dab0fa000468531d
git at public.curoverse.com
git at public.curoverse.com
Wed Sep 10 18:27:33 EDT 2014
at 0bf498cd8a240f70517d44e3dab0fa000468531d (commit)
commit 0bf498cd8a240f70517d44e3dab0fa000468531d
Author: Tim Pierce <twp at curoverse.com>
Date: Wed Sep 10 13:38:07 2014 -0400
3705: add BlockWorkList and unit tests
diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
new file mode 100644
index 0000000..704a9ca
--- /dev/null
+++ b/services/keepstore/block_work_list.go
@@ -0,0 +1,135 @@
+package main
+
+/* A BlockWorkList concurrently processes blocks needing attention.
+
+ Tasks currently handled by BlockWorkList:
+ * the pull list
+ * the trash list
+
+ A BlockWorkList is instantiated with NewBlockWorkList(), which
+ launches a manager in a goroutine. The manager listens on a
+ channel for data to be assigned to it via the ReplaceList() method.
+
+ A worker gets items to process from a BlockWorkList by reading the
+ NextItem channel. The list manager continuously writes items to
+ this channel.
+
+ Example (simplified) implementation of a trash collector:
+
+ type DeleteRequest struct {
+ hash string
+ age time.Time
+ }
+
+ // Make a work list.
+ trashList := NewBlockWorkList()
+
+ // Start a concurrent worker to read items from the NextItem
+ // channel until it is closed, deleting each one.
+ go func(list BlockWorkList) {
+ for i := range list.NextItem {
+ req := i.(DeleteRequest)
+ if time.Now() > req.age {
+ deleteBlock(req.hash)
+ }
+ }
+ }(trashList)
+
+ // Set up a HTTP handler for PUT /trash
+ router.HandleFunc(`/trash`,
+ func(w http.ResponseWriter, req *http.Request) {
+ // Parse the request body into a list.List
+ // of DeleteRequests, and give this list to the
+ // trash collector.
+ trash := parseBody(req.Body)
+ trashList.ReplaceList(trash)
+ }).Methods("PUT")
+
+ Methods available on a BlockWorkList:
+
+ ReplaceList(list)
+ Replaces the current item list with a new one. The list
+ manager discards any unprocessed items on the existing
+ list and replaces it with the new one. If the worker is
+ processing a list item when ReplaceList is called, it
+ finishes processing before receiving items from the new
+ list.
+ Close()
+ Shuts down the manager and the worker cleanly.
+*/
+
+import "container/list"
+
+type BlockWorkList struct {
+ items *list.List
+ newlist chan *list.List
+ NextItem chan *list.Element
+}
+
+// NewBlockWorkList returns a new worklist, and launches a listener
+// goroutine that waits for work and farms it out to workers.
+//
+func NewBlockWorkList() *BlockWorkList {
+ b := BlockWorkList{
+ items: nil,
+ newlist: make(chan *list.List),
+ NextItem: make(chan *list.Element),
+ }
+ go b.listen()
+ return &b
+}
+
+// ReplaceList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (b *BlockWorkList) ReplaceList(list *list.List) {
+ b.newlist <- list
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (b *BlockWorkList) Close() {
+ close(b.newlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (b *BlockWorkList) listen() {
+ var (
+ current_list *list.List
+ current_item *list.Element
+ )
+
+ // When we're done, close the output channel to shut down any
+ // workers.
+ defer close(b.NextItem)
+
+ for {
+ // If the current list is empty, wait for a new list before
+ // even checking if workers are ready.
+ if current_item == nil {
+ if p, ok := <-b.newlist; ok {
+ current_list = p
+ } else {
+ // The channel was closed; shut down.
+ return
+ }
+ current_item = current_list.Front()
+ }
+ select {
+ case p, ok := <-b.newlist:
+ if ok {
+ current_list = p
+ current_item = current_list.Front()
+ } else {
+ // The input channel is closed; time to shut down
+ return
+ }
+ case b.NextItem <- current_item:
+ current_item = current_item.Next()
+ }
+ }
+}
diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
new file mode 100644
index 0000000..a099d16
--- /dev/null
+++ b/services/keepstore/block_work_list_test.go
@@ -0,0 +1,116 @@
+package main
+
+import (
+ "container/list"
+ "testing"
+)
+
+func makeTestWorkList(ary []int) *list.List {
+ l := list.New()
+ for _, n := range ary {
+ l.PushBack(n)
+ }
+ return l
+}
+
+func compareSlice(l1, l2 []int) bool {
+ if len(l1) != len(l2) {
+ return false
+ }
+ for i := range l1 {
+ if l1[i] != l2[i] {
+ return false
+ }
+ }
+ return true
+}
+
+// Create a BlockWorkList, generate a list for it, and instantiate a worker.
+func TestBlockWorkListReadWrite(t *testing.T) {
+ var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+ b := NewBlockWorkList()
+ b.ReplaceList(makeTestWorkList(input))
+
+ output := make([]int, len(input))
+ var i = 0
+ for item := range b.NextItem {
+ output[i] = item.Value.(int)
+ i++
+ if i >= len(output) {
+ b.Close()
+ }
+ }
+
+ if !compareSlice(output, input) {
+ t.Fatalf("output %v does not match input %v\n", output, input)
+ }
+}
+
+// Start a worker before the list has any input.
+func TestBlockWorkListEarlyRead(t *testing.T) {
+ var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+ b := NewBlockWorkList()
+
+ // Start a reader in a goroutine. The reader will block until the
+ // block work list has been initialized.
+ output := make([]int, len(input))
+ done := make(chan int)
+ go func() {
+ var i = 0
+ for item := range b.NextItem {
+ output[i] = item.Value.(int)
+ i++
+ if i >= len(output) {
+ b.Close()
+ }
+ }
+ done <- 1
+ }()
+
+ // Feed the blocklist a new worklist, and wait for the worker to
+ // finish.
+ b.ReplaceList(makeTestWorkList(input))
+ <-done
+
+ if !compareSlice(output, input) {
+ t.Fatalf("output %v does not match input %v\n", output, input)
+ }
+}
+
+// Replace one active work list with another.
+func TestBlockWorkListReplaceList(t *testing.T) {
+ var input1 = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var input2 = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+
+ b := NewBlockWorkList()
+ b.ReplaceList(makeTestWorkList(input1))
+
+ // Read the first five elements from the work list.
+ //
+ output := make([]int, len(input1))
+ for i := 0; i < 5; i++ {
+ item := <-b.NextItem
+ output[i] = item.Value.(int)
+ }
+
+ // Replace the work list and read the remaining elements.
+ b.ReplaceList(makeTestWorkList(input2))
+ i := 5
+ for item := range b.NextItem {
+ output[i] = item.Value.(int)
+ i++
+ if i >= len(output) {
+ b.Close()
+ break
+ }
+ }
+
+ if !compareSlice(output[0:5], input1[0:5]) {
+ t.Fatal("first half of output does not match")
+ }
+ if !compareSlice(output[5:], input2[0:4]) {
+ t.Fatal("second half of output does not match")
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list