[ARVADOS] updated: 4cfb296612f7b483b56c36f119ca175def706d2f

git at public.curoverse.com git at public.curoverse.com
Tue Sep 16 22:41:48 EDT 2014


Summary of changes:
 services/keepstore/block_work_list.go      | 160 +++++++++++++++++++++++++++++
 services/keepstore/block_work_list_test.go | 149 +++++++++++++++++++++++++++
 services/keepstore/handler_test.go         |  13 ++-
 services/keepstore/handlers.go             |  24 +++--
 services/keepstore/keepstore.go            |   3 +-
 services/keepstore/pull_list/pull_list.go  |  81 ---------------
 6 files changed, 335 insertions(+), 95 deletions(-)
 create mode 100644 services/keepstore/block_work_list.go
 create mode 100644 services/keepstore/block_work_list_test.go
 delete mode 100644 services/keepstore/pull_list/pull_list.go

       via  4cfb296612f7b483b56c36f119ca175def706d2f (commit)
       via  87163a6ec678b841ced3824cbbe40ac20544821b (commit)
       via  82c5dcae45765d61f9af973bb3c02f878aa40804 (commit)
       via  391c3551e763834bdc4c18db8051905d2c338f83 (commit)
       via  c49db117daf8993185cb271ea90d89e841e01117 (commit)
       via  5df584b3e49ccb52419449da016d86b08510f959 (commit)
       via  7faaccf29475d419f55fe43e53dffd40392f48fa (commit)
       via  577bd728e410d6a9b57b11578264b61220a50d76 (commit)
       via  0bf498cd8a240f70517d44e3dab0fa000468531d (commit)
      from  452baf1ec1b55c6c4613972ee5f6b5ebf28e8ed7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4cfb296612f7b483b56c36f119ca175def706d2f
Merge: 452baf1 87163a6
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 22:40:16 2014 -0400

    Merge branch '3705-keep-blockworklist'
    
    Closes #3705.


commit 87163a6ec678b841ced3824cbbe40ac20544821b
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 22:34:20 2014 -0400

    3705: use %v to format interface types

diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
index 862dc59..c3df400 100644
--- a/services/keepstore/block_work_list_test.go
+++ b/services/keepstore/block_work_list_test.go
@@ -33,7 +33,7 @@ func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
 func expectChannelClosed(t *testing.T, c <-chan interface{}) {
 	received, ok := <-c
 	if ok {
-		t.Fatalf("Expected channel to be closed, but received %s instead", received)
+		t.Fatalf("Expected channel to be closed, but received %v instead", received)
 	}
 }
 
@@ -44,7 +44,7 @@ func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
 		if !ok {
 			t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
 		} else if actual.(int) != expected[i] {
-			t.Fatalf("Expected %v but received '%d' after receiving the first %d elements correctly.", expected[i], actual.(int), i)
+			t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
 		}
 	}
 }

commit 82c5dcae45765d61f9af973bb3c02f878aa40804
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 22:09:34 2014 -0400

    3705: drop unnecessary 'items' from BlockWorkList

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
index 2bd1583..4c5aeb1 100644
--- a/services/keepstore/block_work_list.go
+++ b/services/keepstore/block_work_list.go
@@ -85,7 +85,6 @@ package main
 import "container/list"
 
 type BlockWorkList struct {
-	items    *list.List
 	newlist  chan *list.List
 	NextItem chan interface{}
 }
@@ -95,7 +94,6 @@ type BlockWorkList struct {
 //
 func NewBlockWorkList() *BlockWorkList {
 	b := BlockWorkList{
-		items:    nil,
 		newlist:  make(chan *list.List),
 		NextItem: make(chan interface{}),
 	}

commit 391c3551e763834bdc4c18db8051905d2c338f83
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 21:56:55 2014 -0400

    3705: use helper functions in tests

diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
index 1d4f836..862dc59 100644
--- a/services/keepstore/block_work_list_test.go
+++ b/services/keepstore/block_work_list_test.go
@@ -13,14 +13,39 @@ func makeTestWorkList(ary []int) *list.List {
 	return l
 }
 
-// peek returns the next item available from the channel, or
-// nil if the channel is empty or closed.
-func peek(c <-chan interface{}) interface{} {
+func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
 	select {
 	case item := <-c:
-		return item
+		t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
 	default:
-		return nil
+		// no-op
+	}
+}
+
+func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
+	if item, ok := <-c; !ok {
+		t.Fatal("expected data on a closed channel")
+	} else if item == nil {
+		t.Fatal("expected data on an empty channel")
+	}
+}
+
+func expectChannelClosed(t *testing.T, c <-chan interface{}) {
+	received, ok := <-c
+	if ok {
+		t.Fatalf("Expected channel to be closed, but received %s instead", received)
+	}
+}
+
+func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+	for i := range expected {
+		actual, ok := <-c
+		t.Logf("received %v", actual)
+		if !ok {
+			t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
+		} else if actual.(int) != expected[i] {
+			t.Fatalf("Expected %v but received '%d' after receiving the first %d elements correctly.", expected[i], actual.(int), i)
+		}
 	}
 }
 
@@ -31,20 +56,9 @@ func TestBlockWorkListReadWrite(t *testing.T) {
 	b := NewBlockWorkList()
 	b.ReplaceList(makeTestWorkList(input))
 
-	var i = 0
-	for item := range b.NextItem {
-		if item.(int) != input[i] {
-			t.Fatalf("expected %d, got %d", input[i], item.(int))
-		}
-		i++
-		if i >= len(input) {
-			break
-		}
-	}
-
-	if item := peek(b.NextItem); item != nil {
-		t.Fatalf("unexpected output %v", item)
-	}
+	expectFromChannel(t, b.NextItem, input)
+	expectChannelEmpty(t, b.NextItem)
+	b.Close()
 }
 
 // Start a worker before the list has any input.
@@ -55,29 +69,16 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 
 	// First, demonstrate that nothing is available on the NextItem
 	// channel.
-	if item := peek(b.NextItem); item != nil {
-		t.Fatalf("unexpected output %v", item)
-	}
+	expectChannelEmpty(t, b.NextItem)
 
 	// Start a reader in a goroutine. The reader will block until the
 	// block work list has been initialized.
-	// Note that the worker closes itself: once it has read as many
-	// elements as it expects, it calls b.Close(), which causes the
-	// manager to close the b.NextItem channel.
 	//
 	done := make(chan int)
 	go func() {
-		var i = 0
-		defer func() { done <- 1 }()
-		for item := range b.NextItem {
-			if item.(int) != input[i] {
-				t.Fatalf("expected %d, got %d", input[i], item.(int))
-			}
-			i++
-			if i >= len(input) {
-				b.Close()
-			}
-		}
+		expectFromChannel(t, b.NextItem, input)
+		b.Close()
+		done <- 1
 	}()
 
 	// Feed the blocklist a new worklist, and wait for the worker to
@@ -85,49 +86,41 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 	b.ReplaceList(makeTestWorkList(input))
 	<-done
 
-	if item := peek(b.NextItem); item != nil {
-		t.Fatalf("unexpected output %v", item)
-	}
+	expectChannelClosed(t, b.NextItem)
 }
 
 // Show that a reader may block when the manager's list is exhausted,
 // and that the reader resumes automatically when new data is
 // available.
 func TestBlockWorkListReaderBlocks(t *testing.T) {
-	var input = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
+	var (
+		inputBeforeBlock = []int{1, 2, 3, 4, 5}
+		inputAfterBlock  = []int{6, 7, 8, 9, 10}
+	)
 
 	b := NewBlockWorkList()
 	sendmore := make(chan int)
 	done := make(chan int)
 	go func() {
-		i := 0
-		for item := range b.NextItem {
-			if item.(int) != input[i] {
-				t.Fatalf("expected %d, got %d", input[i], item.(int))
-			}
-			i++
-			if i == 5 {
-				sendmore <- 1
-			}
-			if i == 10 {
-				b.Close()
-			}
-		}
+		expectFromChannel(t, b.NextItem, inputBeforeBlock)
+
+		// Confirm that the channel is empty, so a subsequent read
+		// on it will block.
+		expectChannelEmpty(t, b.NextItem)
+
+		// Signal that we're ready for more input.
+		sendmore <- 1
+		expectFromChannel(t, b.NextItem, inputAfterBlock)
+		b.Close()
 		done <- 1
 	}()
 
-	// Write a slice of the first five elements and wait for a signal
-	// from the reader.
-	b.ReplaceList(makeTestWorkList(input[0:5]))
+	// Write a slice of the first five elements and wait for the
+	// reader to signal that it's ready for us to send more input.
+	b.ReplaceList(makeTestWorkList(inputBeforeBlock))
 	<-sendmore
 
-	// Confirm that no more data is available on the NextItem channel
-	// (and therefore any readers are blocked) before writing the
-	// final five elements.
-	if item := peek(b.NextItem); item != nil {
-		t.Fatalf("unexpected output %v", item)
-	}
-	b.ReplaceList(makeTestWorkList(input[5:]))
+	b.ReplaceList(makeTestWorkList(inputAfterBlock))
 
 	// Wait for the reader to complete.
 	<-done
@@ -135,27 +128,22 @@ func TestBlockWorkListReaderBlocks(t *testing.T) {
 
 // Replace one active work list with another.
 func TestBlockWorkListReplaceList(t *testing.T) {
-	var input1 = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
-	var input2 = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+	var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+	var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
 
 	b := NewBlockWorkList()
-	b.ReplaceList(makeTestWorkList(input1))
+	b.ReplaceList(makeTestWorkList(firstInput))
 
-	// Read the first five elements from the work list.
-	//
-	for i := 0; i < 5; i++ {
-		item := <-b.NextItem
-		if item.(int) != input1[i] {
-			t.Fatalf("expected %d, got %d", input1[i], item.(int))
-		}
-	}
+	// Read just the first five elements from the work list.
+	// Confirm that the channel is not empty.
+	expectFromChannel(t, b.NextItem, firstInput[0:5])
+	expectChannelNotEmpty(t, b.NextItem)
 
 	// Replace the work list and read five more elements.
-	b.ReplaceList(makeTestWorkList(input2))
-	for i := 0; i < 5; i++ {
-		item := <-b.NextItem
-		if item.(int) != input2[i] {
-			t.Fatalf("expected %d, got %d", input2[i], item.(int))
-		}
-	}
+	// The old list should have been discarded and all new
+	// elements come from the new list.
+	b.ReplaceList(makeTestWorkList(replaceInput))
+	expectFromChannel(t, b.NextItem, replaceInput[0:5])
+
+	b.Close()
 }

commit c49db117daf8993185cb271ea90d89e841e01117
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 17:54:40 2014 -0400

    3705: drop unnecessary current_list pointer

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
index 595455d..2bd1583 100644
--- a/services/keepstore/block_work_list.go
+++ b/services/keepstore/block_work_list.go
@@ -122,11 +122,15 @@ func (b *BlockWorkList) Close() {
 // listen is run in a goroutine. It reads new pull lists from its
 // input queue until the queue is closed.
 // listen takes ownership of the list that is passed to it.
+//
+// Note that the routine does not ever need to access the list
+// itself once the current_item has been initialized, so we do
+// not bother to keep a pointer to the list. Because it is a
+// doubly linked list, holding on to the current item will keep
+// it from garbage collection.
+//
 func (b *BlockWorkList) listen() {
-	var (
-		current_list *list.List
-		current_item *list.Element
-	)
+	var current_item *list.Element
 
 	// When we're done, close the output channel to shut down any
 	// workers.
@@ -137,18 +141,16 @@ func (b *BlockWorkList) listen() {
 		// even checking if workers are ready.
 		if current_item == nil {
 			if p, ok := <-b.newlist; ok {
-				current_list = p
+				current_item = p.Front()
 			} else {
 				// The channel was closed; shut down.
 				return
 			}
-			current_item = current_list.Front()
 		}
 		select {
 		case p, ok := <-b.newlist:
 			if ok {
-				current_list = p
-				current_item = current_list.Front()
+				current_item = p.Front()
 			} else {
 				// The input channel is closed; time to shut down
 				return

commit 5df584b3e49ccb52419449da016d86b08510f959
Author: Tim Pierce <twp at curoverse.com>
Date:   Tue Sep 16 02:15:13 2014 -0400

    3705: update for code review
    
    * Changed the type of the NextItem channel from *list.Element to
      interface{} (i.e. a list.Element.Value)
    * Better comments
    * Cleaner testing framework
    * TestBlockWorkListReaderBlocks to exercise the case where readers block
      when the list is exhausted, and resume when new data is supplied.

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
index a79e4e7..595455d 100644
--- a/services/keepstore/block_work_list.go
+++ b/services/keepstore/block_work_list.go
@@ -1,11 +1,10 @@
 package main
 
-/* A BlockWorkList concurrently processes blocks needing attention.
+/* A BlockWorkList is an asynchronous thread-safe queue manager.  It
+   provides a channel from which items can be read off the queue, and
+   permits replacing the contents of the queue at any time.
 
-   The BlockWorkList object itself manages a list of generic objects,
-   replacing the list when new data is available, and delivering items
-   from the list to consumers when requested.  The overall work flow
-   is as follows:
+   The overall work flow for a BlockWorkList is as follows:
 
      1. A BlockWorkList is created with NewBlockWorkList().  This
         function instantiates a new BlockWorkList and starts a manager
@@ -39,37 +38,34 @@ package main
      * the pull list
      * the trash list
 
-   Example (simplified) implementation of a trash collector:
+   Example usage:
 
-		type DeleteRequest struct {
-			hash string
-			age time.Time
+        // Any kind of user-defined type can be used with the
+        // BlockWorkList.
+		type FrobRequest struct {
+			frob string
 		}
 
 		// Make a work list.
-		trashList := NewBlockWorkList()
+		froblist := NewBlockWorkList()
 
 		// Start a concurrent worker to read items from the NextItem
 		// channel until it is closed, deleting each one.
-		if diskFull() {
-			go func(list BlockWorkList) {
-				for i := range list.NextItem {
-					req := i.(DeleteRequest)
-					if time.Now() > req.age {
-						deleteBlock(req.hash)
-					}
-				}
-			}(trashList)
-		}
+		go func(list BlockWorkList) {
+			for i := range list.NextItem {
+				req := i.(FrobRequest)
+				frob.Run(req)
+			}
+		}(froblist)
 
-		// Set up a HTTP handler for PUT /trash
-		router.HandleFunc(`/trash`,
+		// Set up a HTTP handler for PUT /frob
+		router.HandleFunc(`/frob`,
 			func(w http.ResponseWriter, req *http.Request) {
 				// Parse the request body into a list.List
-				// of DeleteRequests, and give this list to the
-				// trash collector.
-				trash := parseBody(req.Body)
-				trashList.ReplaceList(trash)
+				// of FrobRequests, and give this list to the
+				// frob manager.
+				newfrobs := parseBody(req.Body)
+				froblist.ReplaceList(newfrobs)
 			}).Methods("PUT")
 
    Methods available on a BlockWorkList:
@@ -82,7 +78,8 @@ package main
             finishes processing before receiving items from the new
             list.
 		Close()
-			Shuts down the manager and the worker cleanly.
+			Shuts down the manager goroutine. When Close is called,
+			the manager closes the NextItem channel.
 */
 
 import "container/list"
@@ -90,7 +87,7 @@ import "container/list"
 type BlockWorkList struct {
 	items    *list.List
 	newlist  chan *list.List
-	NextItem chan *list.Element
+	NextItem chan interface{}
 }
 
 // NewBlockWorkList returns a new worklist, and launches a listener
@@ -100,7 +97,7 @@ func NewBlockWorkList() *BlockWorkList {
 	b := BlockWorkList{
 		items:    nil,
 		newlist:  make(chan *list.List),
-		NextItem: make(chan *list.Element),
+		NextItem: make(chan interface{}),
 	}
 	go b.listen()
 	return &b
@@ -124,6 +121,7 @@ func (b *BlockWorkList) Close() {
 
 // listen is run in a goroutine. It reads new pull lists from its
 // input queue until the queue is closed.
+// listen takes ownership of the list that is passed to it.
 func (b *BlockWorkList) listen() {
 	var (
 		current_list *list.List
@@ -155,7 +153,7 @@ func (b *BlockWorkList) listen() {
 				// The input channel is closed; time to shut down
 				return
 			}
-		case b.NextItem <- current_item:
+		case b.NextItem <- current_item.Value:
 			current_item = current_item.Next()
 		}
 	}
diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
index a099d16..1d4f836 100644
--- a/services/keepstore/block_work_list_test.go
+++ b/services/keepstore/block_work_list_test.go
@@ -13,16 +13,15 @@ func makeTestWorkList(ary []int) *list.List {
 	return l
 }
 
-func compareSlice(l1, l2 []int) bool {
-	if len(l1) != len(l2) {
-		return false
+// peek returns the next item available from the channel, or
+// nil if the channel is empty or closed.
+func peek(c <-chan interface{}) interface{} {
+	select {
+	case item := <-c:
+		return item
+	default:
+		return nil
 	}
-	for i := range l1 {
-		if l1[i] != l2[i] {
-			return false
-		}
-	}
-	return true
 }
 
 // Create a BlockWorkList, generate a list for it, and instantiate a worker.
@@ -32,18 +31,19 @@ func TestBlockWorkListReadWrite(t *testing.T) {
 	b := NewBlockWorkList()
 	b.ReplaceList(makeTestWorkList(input))
 
-	output := make([]int, len(input))
 	var i = 0
 	for item := range b.NextItem {
-		output[i] = item.Value.(int)
+		if item.(int) != input[i] {
+			t.Fatalf("expected %d, got %d", input[i], item.(int))
+		}
 		i++
-		if i >= len(output) {
-			b.Close()
+		if i >= len(input) {
+			break
 		}
 	}
 
-	if !compareSlice(output, input) {
-		t.Fatalf("output %v does not match input %v\n", output, input)
+	if item := peek(b.NextItem); item != nil {
+		t.Fatalf("unexpected output %v", item)
 	}
 }
 
@@ -53,20 +53,31 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 
 	b := NewBlockWorkList()
 
+	// First, demonstrate that nothing is available on the NextItem
+	// channel.
+	if item := peek(b.NextItem); item != nil {
+		t.Fatalf("unexpected output %v", item)
+	}
+
 	// Start a reader in a goroutine. The reader will block until the
 	// block work list has been initialized.
-	output := make([]int, len(input))
+	// Note that the worker closes itself: once it has read as many
+	// elements as it expects, it calls b.Close(), which causes the
+	// manager to close the b.NextItem channel.
+	//
 	done := make(chan int)
 	go func() {
 		var i = 0
+		defer func() { done <- 1 }()
 		for item := range b.NextItem {
-			output[i] = item.Value.(int)
+			if item.(int) != input[i] {
+				t.Fatalf("expected %d, got %d", input[i], item.(int))
+			}
 			i++
-			if i >= len(output) {
+			if i >= len(input) {
 				b.Close()
 			}
 		}
-		done <- 1
 	}()
 
 	// Feed the blocklist a new worklist, and wait for the worker to
@@ -74,9 +85,52 @@ func TestBlockWorkListEarlyRead(t *testing.T) {
 	b.ReplaceList(makeTestWorkList(input))
 	<-done
 
-	if !compareSlice(output, input) {
-		t.Fatalf("output %v does not match input %v\n", output, input)
+	if item := peek(b.NextItem); item != nil {
+		t.Fatalf("unexpected output %v", item)
+	}
+}
+
+// Show that a reader may block when the manager's list is exhausted,
+// and that the reader resumes automatically when new data is
+// available.
+func TestBlockWorkListReaderBlocks(t *testing.T) {
+	var input = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
+
+	b := NewBlockWorkList()
+	sendmore := make(chan int)
+	done := make(chan int)
+	go func() {
+		i := 0
+		for item := range b.NextItem {
+			if item.(int) != input[i] {
+				t.Fatalf("expected %d, got %d", input[i], item.(int))
+			}
+			i++
+			if i == 5 {
+				sendmore <- 1
+			}
+			if i == 10 {
+				b.Close()
+			}
+		}
+		done <- 1
+	}()
+
+	// Write a slice of the first five elements and wait for a signal
+	// from the reader.
+	b.ReplaceList(makeTestWorkList(input[0:5]))
+	<-sendmore
+
+	// Confirm that no more data is available on the NextItem channel
+	// (and therefore any readers are blocked) before writing the
+	// final five elements.
+	if item := peek(b.NextItem); item != nil {
+		t.Fatalf("unexpected output %v", item)
 	}
+	b.ReplaceList(makeTestWorkList(input[5:]))
+
+	// Wait for the reader to complete.
+	<-done
 }
 
 // Replace one active work list with another.
@@ -89,28 +143,19 @@ func TestBlockWorkListReplaceList(t *testing.T) {
 
 	// Read the first five elements from the work list.
 	//
-	output := make([]int, len(input1))
 	for i := 0; i < 5; i++ {
 		item := <-b.NextItem
-		output[i] = item.Value.(int)
+		if item.(int) != input1[i] {
+			t.Fatalf("expected %d, got %d", input1[i], item.(int))
+		}
 	}
 
-	// Replace the work list and read the remaining elements.
+	// Replace the work list and read five more elements.
 	b.ReplaceList(makeTestWorkList(input2))
-	i := 5
-	for item := range b.NextItem {
-		output[i] = item.Value.(int)
-		i++
-		if i >= len(output) {
-			b.Close()
-			break
+	for i := 0; i < 5; i++ {
+		item := <-b.NextItem
+		if item.(int) != input2[i] {
+			t.Fatalf("expected %d, got %d", input2[i], item.(int))
 		}
 	}
-
-	if !compareSlice(output[0:5], input1[0:5]) {
-		t.Fatal("first half of output does not match")
-	}
-	if !compareSlice(output[5:], input2[0:4]) {
-		t.Fatal("second half of output does not match")
-	}
 }
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index e246b5e..deb1c3d 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -666,7 +666,7 @@ func TestPullHandler(t *testing.T) {
 	var output_list = make([]PullRequest, 3)
 	for i := 0; i < 3; i++ {
 		item := <-pullmgr.NextItem
-		if pr, ok := item.Value.(PullRequest); ok {
+		if pr, ok := item.(PullRequest); ok {
 			output_list[i] = pr
 		} else {
 			t.Errorf("item %v could not be parsed as a PullRequest", item)

commit 7faaccf29475d419f55fe43e53dffd40392f48fa
Author: Tim Pierce <twp at curoverse.com>
Date:   Mon Sep 15 18:19:40 2014 -0400

    3705: describe BlockWorkList flow more explicitly
    
    Updated file comments to describe the overall data flow for a
    BlockWorkList manager and worker more explicitly.

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
index 704a9ca..a79e4e7 100644
--- a/services/keepstore/block_work_list.go
+++ b/services/keepstore/block_work_list.go
@@ -2,18 +2,43 @@ package main
 
 /* A BlockWorkList concurrently processes blocks needing attention.
 
+   The BlockWorkList object itself manages a list of generic objects,
+   replacing the list when new data is available, and delivering items
+   from the list to consumers when requested.  The overall work flow
+   is as follows:
+
+     1. A BlockWorkList is created with NewBlockWorkList().  This
+        function instantiates a new BlockWorkList and starts a manager
+        goroutine.  The manager listens on an input channel
+        (manager.newlist) and an output channel (manager.NextItem).
+
+     2. The manager first waits for a new list of requests on the
+        newlist channel.  When another goroutine calls
+        manager.ReplaceList(lst), it sends lst over the newlist
+        channel to the manager.  The manager goroutine now has
+        ownership of the list.
+
+     3. Once the manager has this initial list, it listens on both the
+        input and output channels for one of the following to happen:
+
+          a. A worker attempts to read an item from the NextItem
+             channel.  The manager sends the next item from the list
+             over this channel to the worker, and loops.
+
+          b. New data is sent to the manager on the newlist channel.
+             This happens when another goroutine calls
+             manager.ReplaceItem() with a new list.  The manager
+             discards the current list, replaces it with the new one,
+             and begins looping again.
+
+          c. The input channel is closed.  The manager closes its
+             output channel (signalling any workers to quit) and
+             terminates.
+
    Tasks currently handled by BlockWorkList:
      * the pull list
      * the trash list
 
-   A BlockWorkList is instantiated with NewBlockWorkList(), which
-   launches a manager in a goroutine.  The manager listens on a
-   channel for data to be assigned to it via the ReplaceList() method.
-
-   A worker gets items to process from a BlockWorkList by reading the
-   NextItem channel.  The list manager continuously writes items to
-   this channel.
-
    Example (simplified) implementation of a trash collector:
 
 		type DeleteRequest struct {
@@ -26,14 +51,16 @@ package main
 
 		// Start a concurrent worker to read items from the NextItem
 		// channel until it is closed, deleting each one.
-		go func(list BlockWorkList) {
-			for i := range list.NextItem {
-				req := i.(DeleteRequest)
-				if time.Now() > req.age {
-					deleteBlock(req.hash)
+		if diskFull() {
+			go func(list BlockWorkList) {
+				for i := range list.NextItem {
+					req := i.(DeleteRequest)
+					if time.Now() > req.age {
+						deleteBlock(req.hash)
+					}
 				}
-			}
-		}(trashList)
+			}(trashList)
+		}
 
 		// Set up a HTTP handler for PUT /trash
 		router.HandleFunc(`/trash`,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list