[ARVADOS] created: cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a

git at public.curoverse.com git at public.curoverse.com
Fri Feb 27 11:28:39 EST 2015


        at  cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a (commit)


commit cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a
Author: Radhika Chippada <radhika at curoverse.com>
Date:   Fri Feb 27 11:12:29 2015 -0500

    3761: Run pull list worker, which processes pull reqests from the list.

diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 75b6c40..b6ab8fa 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -274,6 +274,10 @@ func main() {
 		log.Fatal(err)
 	}
 
+	// Initialize Pull queue and worker
+	pullq = NewWorkQueue()
+	go RunPullWorker(pullq.NextItem)
+
 	// Shut down the server gracefully (by closing the listener)
 	// if SIGTERM is received.
 	term := make(chan os.Signal, 1)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
new file mode 100644
index 0000000..2342fd2
--- /dev/null
+++ b/services/keepstore/pull_worker.go
@@ -0,0 +1,118 @@
+package main
+
+import (
+	"crypto/rand"
+	"errors"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"io/ioutil"
+	"log"
+	"os"
+	"strconv"
+	"time"
+)
+
+var arv arvadosclient.ArvadosClient
+var keepClient keepclient.KeepClient
+
+/*
+	Keepstore initiates pull worker channel goroutine.
+	The channel will process pull list.
+		For each (next) pull request:
+			For each locator listed, execute Pull on the server(s) listed
+			Skip the rest of the servers if no errors
+		Repeat
+*/
+func RunPullWorker(nextItem <-chan interface{}) {
+	var err error
+	arv, err = arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatalf("Error setting up arvados client %s", err.Error())
+	}
+	arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
+
+	keepClient, err = keepclient.MakeKeepClient(&arv)
+	if err != nil {
+		log.Fatalf("Error setting up keep client %s", err.Error())
+	}
+
+	for item := range nextItem {
+		pullReq := item.(PullRequest)
+		for _, addr := range pullReq.Servers {
+			err := Pull(addr, pullReq.Locator)
+			if err == nil {
+				break
+			}
+		}
+	}
+}
+
+/*
+	For each Pull request:
+		Generate a random API token.
+		Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
+		Using this token & signature, retrieve the given block.
+		Write to storage
+*/
+func Pull(addr string, locator string) (err error) {
+	log.Printf("Pull %s/%s starting", addr, locator)
+
+	defer func() {
+		if err == nil {
+			log.Printf("Pull %s/%s success", addr, locator)
+		} else {
+			log.Printf("Pull %s/%s error: %s", addr, locator, err)
+		}
+	}()
+
+	service_roots := make(map[string]string)
+	service_roots[locator] = addr
+	keepClient.SetServiceRoots(service_roots)
+
+	read_content, err := GetContent(addr, locator)
+	log.Print(read_content, err)
+	if err != nil {
+		return
+	}
+
+	err = PutBlock(read_content, locator)
+	return
+}
+
+// Fetch the content for the given locator using keepclient.
+var GetContent = func(addr string, locator string) ([]byte, error) {
+	// Generate signature with a random token
+	expires_at := time.Now().Unix() + 60 // now + 1 min in seconds
+	hints := "+A" + GenerateRandomApiToken() + "@" + strconv.FormatInt(expires_at, 16)
+	signature := keepclient.MakeLocator2(locator, hints)
+
+	reader, blocklen, _, err := keepClient.AuthorizedGet(locator, signature.Signature, signature.Timestamp)
+	defer reader.Close()
+	if err != nil {
+		return nil, err
+	}
+
+	read_content, err := ioutil.ReadAll(reader)
+	log.Print(read_content, err)
+	if err != nil {
+		return nil, err
+	}
+
+	if (read_content == nil) || (int64(len(read_content)) != blocklen) {
+		return nil, errors.New(fmt.Sprintf("Content not found for: %s/%s", addr, locator))
+	}
+
+	return read_content, nil
+}
+
+const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
+
+func GenerateRandomApiToken() string {
+	var bytes = make([]byte, 36)
+	rand.Read(bytes)
+	for i, b := range bytes {
+		bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
+	}
+	return (string(bytes))
+}
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
new file mode 100644
index 0000000..907bfac
--- /dev/null
+++ b/services/keepstore/pull_worker_test.go
@@ -0,0 +1,99 @@
+package main
+
+import (
+	"errors"
+	"net/http"
+	"testing"
+	"time"
+)
+
+func TestPullWorker(t *testing.T) {
+	defer teardown()
+
+	// Since keepstore does not come into picture in tests,
+	// we need to explicitly start the goroutine in tests.
+	go RunPullWorker(pullq.NextItem)
+
+	data_manager_token = "DATA MANAGER TOKEN"
+
+	first_pull_list := []byte(`[
+		{
+			"locator":"locator1_to_verify_first_pull_list",
+			"servers":[
+				"server_1",
+				"server_2"
+		 	]
+		},
+    {
+			"locator":"locator2_to_verify_first_pull_list",
+			"servers":[
+				"server_1"
+		 	]
+		}
+	]`)
+
+	second_pull_list := []byte(`[
+		{
+			"locator":"locator_to_verify_second_pull_list",
+			"servers":[
+				"server_1",
+        "server_2"
+		 	]
+		}
+	]`)
+
+	type PullWorkerTestData struct {
+		name          string
+		req           RequestTester
+		response_code int
+		response_body string
+		read_content  string
+		read_error    bool
+	}
+	var testcases = []PullWorkerTestData{
+		{
+			"Pull request 1 from the data manager in worker",
+			RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
+			http.StatusOK,
+			"Received 2 pull requests\n",
+			"hello",
+			false,
+		},
+		{
+			"Pull request 2 from the data manager in worker",
+			RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
+			http.StatusOK,
+			"Received 1 pull requests\n",
+			"hola",
+			false,
+		},
+		{
+			"Pull request with error on get",
+			RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
+			http.StatusOK,
+			"Received 1 pull requests\n",
+			"unused",
+			true,
+		},
+	}
+
+	for _, testData := range testcases {
+		// Override GetContent to mock keepclient functionality
+		GetContent = func(addr string, locator string) ([]byte, error) {
+			if testData.read_error {
+				return nil, errors.New("Error getting data")
+			} else {
+				return []byte(testData.read_content), nil
+			}
+		}
+
+		response := IssueRequest(&testData.req)
+		ExpectStatusCode(t, testData.name, testData.response_code, response)
+		ExpectBody(t, testData.name, testData.response_body, response)
+
+		// give the channel a second to read and process all pull list entries
+		time.Sleep(1000 * time.Millisecond)
+
+		expectChannelEmpty(t, pullq.NextItem)
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list