[ARVADOS] updated: b41bfad067592e82ef999967d6e2a565c794b805

git at public.curoverse.com git at public.curoverse.com
Wed Sep 10 13:38:25 EDT 2014


Summary of changes:
 services/keepstore/block_work_list.go | 135 ++++++++++++++++++++++++++++++++++
 1 file changed, 135 insertions(+)
 create mode 100644 services/keepstore/block_work_list.go

       via  b41bfad067592e82ef999967d6e2a565c794b805 (commit)
      from  f467b469109e27bc18635c8952892e4c23fabd60 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b41bfad067592e82ef999967d6e2a565c794b805
Author: Tim Pierce <twp at curoverse.com>
Date:   Wed Sep 10 13:38:07 2014 -0400

    3705: add BlockWorkList

diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
new file mode 100644
index 0000000..33a8ff8
--- /dev/null
+++ b/services/keepstore/block_work_list.go
@@ -0,0 +1,135 @@
+package main
+
+/* A BlockWorkList concurrently processes blocks needing attention.
+
+   Tasks currently handled by BlockWorkList:
+     * the pull list
+     * the trash list
+
+   A BlockWorkList is instantiated with NewBlockWorkList(), which
+   launches a manager in a goroutine.  The manager listens on a
+   channel for data to be assigned to it via the ReplaceList() method.
+
+   A worker gets items to process from a BlockWorkList by reading the
+   NextItem channel.  The list manager continuously writes items to
+   this channel.
+
+   Example (simplified) implementation of a trash collector:
+
+		type DeleteRequest struct {
+			hash string
+			age time.Time
+		}
+
+		// Make a work list.
+		trashList := NewBlockWorkList()
+
+		// Start a concurrent worker to read items from the NextItem
+		// channel until it is closed, deleting each one.
+		go func(list BlockWorkList) {
+			for i := range list.NextItem {
+				req := i.(DeleteRequest)
+				if time.Now() > req.age {
+					deleteBlock(req.hash)
+				}
+			}
+		}(trashList)
+
+		// Set up a HTTP handler for PUT /trash
+		router.HandleFunc(`/trash`,
+			func(w http.ResponseWriter, req *http.Request) {
+				// Parse the request body into a container.List
+				// of DeleteRequests, and give this list to the
+				// trash collector.
+				trash := parseBody(req.Body)
+				trashList.ReplaceList(trash)
+			}).Methods("PUT")
+
+   Methods available on a BlockWorkList:
+
+		ReplaceList(list)
+			Replaces the current item list with a new one.  The list
+            manager discards any unprocessed items on the existing
+            list and replaces it with the new one. If the worker is
+            processing a list item when ReplaceList is called, it
+            finishes processing before receiving items from the new
+            list.
+		Close()
+			Shuts down the manager and the worker cleanly.
+*/
+
+import "container/list"
+
+type BlockWorkList struct {
+	items    *container.List
+	newlist  chan *container.List
+	NextItem chan *container.Element
+}
+
+// NewBlockWorkList returns a new worklist, and launches a listener
+// goroutine that waits for work and farms it out to workers.
+//
+func NewBlockWorkList() *BlockWorkList {
+	b := BlockWorkList{
+		items:    nil,
+		newlist:  make(chan *container.List),
+		NextItem: make(chan *container.Element),
+	}
+	go b.listen()
+	return &b
+}
+
+// ReplaceList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (b *BlockWorkList) ReplaceList(list *container.List) {
+	b.newlist <- list
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (b *BlockWorkList) Close() {
+	close(b.newlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+func (b *BlockWorkList) listen() {
+	var (
+		current_list *container.List
+		current_item *container.Element
+	)
+
+	// When we're done, close the output channel to shut down any
+	// workers.
+	defer close(b.NextItem)
+
+	for {
+		// If the current list is empty, wait for a new list before
+		// even checking if workers are ready.
+		if current_item == nil {
+			if p, ok := <-b.newlist; ok {
+				current_list = p
+			} else {
+				// The channel was closed; shut down.
+				return
+			}
+			current_item = current_list.Front()
+		}
+		select {
+		case p, ok := <-b.newlist:
+			if ok {
+				current_list = p
+				current_item = current_list.Front()
+			} else {
+				// The input channel is closed; time to shut down
+				return
+			}
+		case r.NextItem <- current_item:
+			current_item = current_item.Next()
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list