[ARVADOS] created: cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a
git at public.curoverse.com
git at public.curoverse.com
Fri Feb 27 11:28:39 EST 2015
at cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a (commit)
commit cb9fbffa7f480dae5f17eb44f27d0b3523da0f0a
Author: Radhika Chippada <radhika at curoverse.com>
Date: Fri Feb 27 11:12:29 2015 -0500
3761: Run pull list worker, which processes pull reqests from the list.
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 75b6c40..b6ab8fa 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -274,6 +274,10 @@ func main() {
log.Fatal(err)
}
+ // Initialize Pull queue and worker
+ pullq = NewWorkQueue()
+ go RunPullWorker(pullq.NextItem)
+
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
new file mode 100644
index 0000000..2342fd2
--- /dev/null
+++ b/services/keepstore/pull_worker.go
@@ -0,0 +1,118 @@
+package main
+
+import (
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
+ "log"
+ "os"
+ "strconv"
+ "time"
+)
+
+var arv arvadosclient.ArvadosClient
+var keepClient keepclient.KeepClient
+
+/*
+ Keepstore initiates pull worker channel goroutine.
+ The channel will process pull list.
+ For each (next) pull request:
+ For each locator listed, execute Pull on the server(s) listed
+ Skip the rest of the servers if no errors
+ Repeat
+*/
+func RunPullWorker(nextItem <-chan interface{}) {
+ var err error
+ arv, err = arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("Error setting up arvados client %s", err.Error())
+ }
+ arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
+
+ keepClient, err = keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ log.Fatalf("Error setting up keep client %s", err.Error())
+ }
+
+ for item := range nextItem {
+ pullReq := item.(PullRequest)
+ for _, addr := range pullReq.Servers {
+ err := Pull(addr, pullReq.Locator)
+ if err == nil {
+ break
+ }
+ }
+ }
+}
+
+/*
+ For each Pull request:
+ Generate a random API token.
+ Generate a permission signature using this token, timestamp ~60 seconds in the future, and desired block hash.
+ Using this token & signature, retrieve the given block.
+ Write to storage
+*/
+func Pull(addr string, locator string) (err error) {
+ log.Printf("Pull %s/%s starting", addr, locator)
+
+ defer func() {
+ if err == nil {
+ log.Printf("Pull %s/%s success", addr, locator)
+ } else {
+ log.Printf("Pull %s/%s error: %s", addr, locator, err)
+ }
+ }()
+
+ service_roots := make(map[string]string)
+ service_roots[locator] = addr
+ keepClient.SetServiceRoots(service_roots)
+
+ read_content, err := GetContent(addr, locator)
+ log.Print(read_content, err)
+ if err != nil {
+ return
+ }
+
+ err = PutBlock(read_content, locator)
+ return
+}
+
+// Fetch the content for the given locator using keepclient.
+var GetContent = func(addr string, locator string) ([]byte, error) {
+ // Generate signature with a random token
+ expires_at := time.Now().Unix() + 60 // now + 1 min in seconds
+ hints := "+A" + GenerateRandomApiToken() + "@" + strconv.FormatInt(expires_at, 16)
+ signature := keepclient.MakeLocator2(locator, hints)
+
+ reader, blocklen, _, err := keepClient.AuthorizedGet(locator, signature.Signature, signature.Timestamp)
+ defer reader.Close()
+ if err != nil {
+ return nil, err
+ }
+
+ read_content, err := ioutil.ReadAll(reader)
+ log.Print(read_content, err)
+ if err != nil {
+ return nil, err
+ }
+
+ if (read_content == nil) || (int64(len(read_content)) != blocklen) {
+ return nil, errors.New(fmt.Sprintf("Content not found for: %s/%s", addr, locator))
+ }
+
+ return read_content, nil
+}
+
+const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
+
+func GenerateRandomApiToken() string {
+ var bytes = make([]byte, 36)
+ rand.Read(bytes)
+ for i, b := range bytes {
+ bytes[i] = ALPHA_NUMERIC[b%byte(len(ALPHA_NUMERIC))]
+ }
+ return (string(bytes))
+}
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
new file mode 100644
index 0000000..907bfac
--- /dev/null
+++ b/services/keepstore/pull_worker_test.go
@@ -0,0 +1,99 @@
+package main
+
+import (
+ "errors"
+ "net/http"
+ "testing"
+ "time"
+)
+
+func TestPullWorker(t *testing.T) {
+ defer teardown()
+
+ // Since keepstore does not come into picture in tests,
+ // we need to explicitly start the goroutine in tests.
+ go RunPullWorker(pullq.NextItem)
+
+ data_manager_token = "DATA MANAGER TOKEN"
+
+ first_pull_list := []byte(`[
+ {
+ "locator":"locator1_to_verify_first_pull_list",
+ "servers":[
+ "server_1",
+ "server_2"
+ ]
+ },
+ {
+ "locator":"locator2_to_verify_first_pull_list",
+ "servers":[
+ "server_1"
+ ]
+ }
+ ]`)
+
+ second_pull_list := []byte(`[
+ {
+ "locator":"locator_to_verify_second_pull_list",
+ "servers":[
+ "server_1",
+ "server_2"
+ ]
+ }
+ ]`)
+
+ type PullWorkerTestData struct {
+ name string
+ req RequestTester
+ response_code int
+ response_body string
+ read_content string
+ read_error bool
+ }
+ var testcases = []PullWorkerTestData{
+ {
+ "Pull request 1 from the data manager in worker",
+ RequestTester{"/pull", data_manager_token, "PUT", first_pull_list},
+ http.StatusOK,
+ "Received 2 pull requests\n",
+ "hello",
+ false,
+ },
+ {
+ "Pull request 2 from the data manager in worker",
+ RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
+ http.StatusOK,
+ "Received 1 pull requests\n",
+ "hola",
+ false,
+ },
+ {
+ "Pull request with error on get",
+ RequestTester{"/pull", data_manager_token, "PUT", second_pull_list},
+ http.StatusOK,
+ "Received 1 pull requests\n",
+ "unused",
+ true,
+ },
+ }
+
+ for _, testData := range testcases {
+ // Override GetContent to mock keepclient functionality
+ GetContent = func(addr string, locator string) ([]byte, error) {
+ if testData.read_error {
+ return nil, errors.New("Error getting data")
+ } else {
+ return []byte(testData.read_content), nil
+ }
+ }
+
+ response := IssueRequest(&testData.req)
+ ExpectStatusCode(t, testData.name, testData.response_code, response)
+ ExpectBody(t, testData.name, testData.response_body, response)
+
+ // give the channel a second to read and process all pull list entries
+ time.Sleep(1000 * time.Millisecond)
+
+ expectChannelEmpty(t, pullq.NextItem)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list