[ARVADOS] created: 0bf498cd8a240f70517d44e3dab0fa000468531d

git at public.curoverse.com git at public.curoverse.com
Wed Sep 10 18:27:33 EDT 2014


        at  0bf498cd8a240f70517d44e3dab0fa000468531d (commit)


commit 0bf498cd8a240f70517d44e3dab0fa000468531d
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Sep 10 13:38:07 2014 -0400

    3705: add BlockWorkList and unit tests

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
new file mode 100644
index 0000000..704a9ca
--- /dev/null
+++ b/services/keepstore/block_work_list.go
@@ -0,0 +1,135 @@
+package main
+
+/* A BlockWorkList concurrently processes blocks needing attention.
+
+   Tasks currently handled by BlockWorkList:
+     * the pull list
+     * the trash list
+
+   A BlockWorkList is instantiated with NewBlockWorkList(), which
+   launches a manager in a goroutine.  The manager listens on a
+   channel for data to be assigned to it via the ReplaceList() method.
+
+   A worker gets items to process from a BlockWorkList by reading the
+   NextItem channel.  The list manager continuously writes items to
+   this channel.
+
+   Example (simplified) implementation of a trash collector:
+
+		type DeleteRequest struct {
+			hash string
+			age time.Time
+		}
+
+		// Make a work list.
+		trashList := NewBlockWorkList()
+
+		// Start a concurrent worker to read items from the NextItem
+		// channel until it is closed, deleting each one.
+		go func(list BlockWorkList) {
+			for i := range list.NextItem {
+				req := i.(DeleteRequest)
+				if time.Now() > req.age {
+					deleteBlock(req.hash)
+				}
+			}
+		}(trashList)
+
+		// Set up a HTTP handler for PUT /trash
+		router.HandleFunc(`/trash`,
+			func(w http.ResponseWriter, req *http.Request) {
+				// Parse the request body into a list.List
+				// of DeleteRequests, and give this list to the
+				// trash collector.
+				trash := parseBody(req.Body)
+				trashList.ReplaceList(trash)
+			}).Methods("PUT")
+
+   Methods available on a BlockWorkList:
+
+		ReplaceList(list)
+			Replaces the current item list with a new one.  The list
+            manager discards any unprocessed items on the existing
+            list and replaces it with the new one. If the worker is
+            processing a list item when ReplaceList is called, it
+            finishes processing before receiving items from the new
+            list.
+		Close()
+			Shuts down the manager and the worker cleanly.
+*/
+
+import "container/list"
+
+type BlockWorkList struct {
+	items    *list.List
+	newlist  chan *list.List
+	NextItem chan *list.Element
+}
+
+// NewBlockWorkList returns a new worklist, and launches a listener
+// goroutine that waits for work and farms it out to workers.
+//
+func NewBlockWorkList() *BlockWorkList {
+	b := BlockWorkList{
+		items:    nil,
+		newlist:  make(chan *list.List),
+		NextItem: make(chan *list.Element),
+	}
+	go b.listen()
+	return &b
+}
+
+// ReplaceList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (b *BlockWorkList) ReplaceList(list *list.List) {
+	b.newlist <- list
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (b *BlockWorkList) Close() {
+	close(b.newlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (b *BlockWorkList) listen() {
+	var (
+		current_list *list.List
+		current_item *list.Element
+	)
+
+	// When we're done, close the output channel to shut down any
+	// workers.
+	defer close(b.NextItem)
+
+	for {
+		// If the current list is empty, wait for a new list before
+		// even checking if workers are ready.
+		if current_item == nil {
+			if p, ok := <-b.newlist; ok {
+				current_list = p
+			} else {
+				// The channel was closed; shut down.
+				return
+			}
+			current_item = current_list.Front()
+		}
+		select {
+		case p, ok := <-b.newlist:
+			if ok {
+				current_list = p
+				current_item = current_list.Front()
+			} else {
+				// The input channel is closed; time to shut down
+				return
+			}
+		case b.NextItem <- current_item:
+			current_item = current_item.Next()
+		}
+	}
+}
diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
new file mode 100644
index 0000000..a099d16
--- /dev/null
+++ b/services/keepstore/block_work_list_test.go
@@ -0,0 +1,116 @@
+package main
+
+import (
+	"container/list"
+	"testing"
+)
+
+func makeTestWorkList(ary []int) *list.List {
+	l := list.New()
+	for _, n := range ary {
+		l.PushBack(n)
+	}
+	return l
+}
+
+func compareSlice(l1, l2 []int) bool {
+	if len(l1) != len(l2) {
+		return false
+	}
+	for i := range l1 {
+		if l1[i] != l2[i] {
+			return false
+		}
+	}
+	return true
+}
+
+// Create a BlockWorkList, generate a list for it, and instantiate a worker.
+func TestBlockWorkListReadWrite(t *testing.T) {
+	var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+	b := NewBlockWorkList()
+	b.ReplaceList(makeTestWorkList(input))
+
+	output := make([]int, len(input))
+	var i = 0
+	for item := range b.NextItem {
+		output[i] = item.Value.(int)
+		i++
+		if i >= len(output) {
+			b.Close()
+		}
+	}
+
+	if !compareSlice(output, input) {
+		t.Fatalf("output %v does not match input %v\n", output, input)
+	}
+}
+
+// Start a worker before the list has any input.
+func TestBlockWorkListEarlyRead(t *testing.T) {
+	var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+	b := NewBlockWorkList()
+
+	// Start a reader in a goroutine. The reader will block until the
+	// block work list has been initialized.
+	output := make([]int, len(input))
+	done := make(chan int)
+	go func() {
+		var i = 0
+		for item := range b.NextItem {
+			output[i] = item.Value.(int)
+			i++
+			if i >= len(output) {
+				b.Close()
+			}
+		}
+		done <- 1
+	}()
+
+	// Feed the blocklist a new worklist, and wait for the worker to
+	// finish.
+	b.ReplaceList(makeTestWorkList(input))
+	<-done
+
+	if !compareSlice(output, input) {
+		t.Fatalf("output %v does not match input %v\n", output, input)
+	}
+}
+
+// Replace one active work list with another.
+func TestBlockWorkListReplaceList(t *testing.T) {
+	var input1 = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+	var input2 = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+
+	b := NewBlockWorkList()
+	b.ReplaceList(makeTestWorkList(input1))
+
+	// Read the first five elements from the work list.
+	//
+	output := make([]int, len(input1))
+	for i := 0; i < 5; i++ {
+		item := <-b.NextItem
+		output[i] = item.Value.(int)
+	}
+
+	// Replace the work list and read the remaining elements.
+	b.ReplaceList(makeTestWorkList(input2))
+	i := 5
+	for item := range b.NextItem {
+		output[i] = item.Value.(int)
+		i++
+		if i >= len(output) {
+			b.Close()
+			break
+		}
+	}
+
+	if !compareSlice(output[0:5], input1[0:5]) {
+		t.Fatal("first half of output does not match")
+	}
+	if !compareSlice(output[5:], input2[0:4]) {
+		t.Fatal("second half of output does not match")
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list