[ARVADOS] updated: 753d0ef652e1593c0fbd204a3973554794215c2f

git at public.curoverse.com git at public.curoverse.com
Mon Mar 2 21:12:49 EST 2015


Summary of changes:
 services/keepstore/pull_worker.go      | 63 +++++++++++++++-------------------
 services/keepstore/pull_worker_test.go | 21 +++++++++---
 2 files changed, 45 insertions(+), 39 deletions(-)

       via  753d0ef652e1593c0fbd204a3973554794215c2f (commit)
      from  a5b7c827e89fb082a51c6ab48ab98261f85d2a24 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 753d0ef652e1593c0fbd204a3973554794215c2f
Author: Radhika Chippada <radhika at curoverse.com>
Date:   Mon Mar 2 21:09:42 2015 -0500

    3761: code refactoring

diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 5082897..6d64677 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"io"
 	"io/ioutil"
 	"log"
 	"os"
@@ -29,7 +30,6 @@ func RunPullWorker(nextItem <-chan interface{}) {
 	if err != nil {
 		log.Fatalf("Error setting up arvados client %s", err.Error())
 	}
-	arv.ApiToken = os.Getenv("ARVADOS_API_TOKEN")
 
 	keepClient, err = keepclient.MakeKeepClient(&arv)
 	if err != nil {
@@ -37,13 +37,7 @@ func RunPullWorker(nextItem <-chan interface{}) {
 	}
 
 	for item := range nextItem {
-		pullReq := item.(PullRequest)
-		for _, addr := range pullReq.Servers {
-			err := Pull(addr, pullReq.Locator)
-			if err == nil {
-				break
-			}
-		}
+		Pull(item.(PullRequest))
 	}
 }
 
@@ -54,54 +48,53 @@ func RunPullWorker(nextItem <-chan interface{}) {
 		Using this token & signature, retrieve the given block.
 		Write to storage
 */
-func Pull(addr string, locator string) (err error) {
-	log.Printf("Pull %s/%s starting", addr, locator)
-
+func Pull(pullRequest PullRequest) (err error) {
 	defer func() {
 		if err == nil {
-			log.Printf("Pull %s/%s success", addr, locator)
+			log.Printf("Pull %s success", pullRequest)
 		} else {
-			log.Printf("Pull %s/%s error: %s", addr, locator, err)
+			log.Printf("Pull %s error: %s", pullRequest, err)
 		}
 	}()
 
 	service_roots := make(map[string]string)
-	service_roots[locator] = addr
-	keepClient.SetServiceRoots(service_roots)
-
-	read_content, err := GetContent(addr, locator)
-	log.Print(read_content, err)
-	if err != nil {
-		return
+	for _, addr := range pullRequest.Servers {
+		service_roots[addr] = addr
 	}
+	keepClient.SetServiceRoots(service_roots)
 
-	err = PutContent(read_content, locator)
-	return
-}
-
-// Fetch the content for the given locator using keepclient.
-var GetContent = func(addr string, locator string) ([]byte, error) {
 	// Generate signature with a random token
 	PermissionSecret = []byte(os.Getenv("ARVADOS_API_TOKEN"))
 	expires_at := time.Now().Add(60 * time.Second)
-	signedLocator := SignLocator(locator, GenerateRandomApiToken(), expires_at)
-	reader, blocklen, _, err := keepClient.Get(signedLocator)
-	defer reader.Close()
+	signedLocator := SignLocator(pullRequest.Locator, GenerateRandomApiToken(), expires_at)
+
+	reader, contentLen, _, err := GetContent(signedLocator)
+
 	if err != nil {
-		return nil, err
+		return
+	}
+	if reader == nil {
+		return errors.New(fmt.Sprintf("No reader found for : %s", signedLocator))
 	}
+	defer reader.Close()
 
 	read_content, err := ioutil.ReadAll(reader)
-	log.Print(read_content, err)
 	if err != nil {
-		return nil, err
+		return err
 	}
 
-	if (read_content == nil) || (int64(len(read_content)) != blocklen) {
-		return nil, errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
+	if (read_content == nil) || (int64(len(read_content)) != contentLen) {
+		return errors.New(fmt.Sprintf("Content not found for: %s", signedLocator))
 	}
 
-	return read_content, nil
+	err = PutContent(read_content, pullRequest.Locator)
+	return
+}
+
+// Fetch the content for the given locator using keepclient.
+var GetContent = func(signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
+	reader, blocklen, url, err := keepClient.Get(signedLocator)
+	return reader, blocklen, url, err
 }
 
 const ALPHA_NUMERIC = "0123456789abcdefghijklmnopqrstuvwxyz"
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 0bdb886..8e6241f 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -1,7 +1,9 @@
 package main
 
 import (
+	"bytes"
 	"errors"
+	"io"
 	"net/http"
 	"testing"
 	"time"
@@ -27,7 +29,7 @@ func TestPullWorker(t *testing.T) {
     {
 			"locator":"locator2_to_verify_first_pull_list",
 			"servers":[
-				"server_1"
+				"server_3"
 		 	]
 		}
 	]`)
@@ -92,11 +94,14 @@ func TestPullWorker(t *testing.T) {
 
 	for _, testData := range testcases {
 		// Override GetContent to mock keepclient functionality
-		GetContent = func(addr string, locator string) ([]byte, error) {
+		GetContent = func(signedLocator string) (reader io.ReadCloser, contentLength int64, url string, err error) {
 			if testData.read_error {
-				return nil, errors.New("Error getting data")
+				return nil, 0, "", errors.New("Error getting data")
 			} else {
-				return []byte(testData.read_content), nil
+				cb := &ClosingBuffer{bytes.NewBufferString("Hi!")}
+				var rc io.ReadCloser
+				rc = cb
+				return rc, 3, "", nil
 			}
 		}
 
@@ -119,3 +124,11 @@ func TestPullWorker(t *testing.T) {
 		expectChannelEmpty(t, pullq.NextItem)
 	}
 }
+
+type ClosingBuffer struct {
+	*bytes.Buffer
+}
+
+func (cb *ClosingBuffer) Close() (err error) {
+	return
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list