[ARVADOS] created: 8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4

git at public.curoverse.com git at public.curoverse.com
Fri Oct 9 12:17:06 EDT 2015


        at  8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4 (commit)


commit 8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 9 12:18:57 2015 -0400

    7491: Go keepclient retries GET on either network error, or 500 error from
    server.  Retry count is KeepClient.Retries.  Also updated KeepClient
    constructor to use "defaultCollectionReplication" from discovery document if
    available.

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 53dfb2b..cc6efcb 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -29,6 +29,7 @@ var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.It
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 var InvalidLocatorError = errors.New("Invalid locator")
+var KeepServerError = errors.New("One or more keep servers returned an error")
 
 // ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
 var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
@@ -49,6 +50,7 @@ type KeepClient struct {
 	gatewayRoots       *map[string]string
 	lock               sync.RWMutex
 	Client             *http.Client
+	Retries            int
 
 	// set to 1 if all writable services are of disk type, otherwise 0
 	replicasPerService int
@@ -59,12 +61,23 @@ type KeepClient struct {
 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
 	var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
 	insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+	defaultReplicationLevel := 2
+	value, err := arv.Discovery("defaultCollectionReplication")
+	if err == nil {
+		v, ok := value.(float64)
+		if ok && v > 0 {
+			defaultReplicationLevel = int(v)
+		}
+	}
+
 	kc := &KeepClient{
 		Arvados:       arv,
-		Want_replicas: 2,
+		Want_replicas: defaultReplicationLevel,
 		Using_proxy:   false,
 		Client: &http.Client{Transport: &http.Transport{
 			TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+		Retries: 2,
 	}
 	return kc, kc.DiscoverKeepServers()
 }
@@ -136,35 +149,63 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
 // instead of EOF.
 func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
 	var errs []string
+	server_error := false
+
 	for _, host := range kc.getSortedRoots(locator) {
 		url := host + "/" + locator
-		req, err := http.NewRequest("GET", url, nil)
-		if err != nil {
-			continue
-		}
-		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
-		resp, err := kc.Client.Do(req)
-		if err != nil || resp.StatusCode != http.StatusOK {
-			if resp != nil {
-				var respbody []byte
-				if resp.Body != nil {
-					respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+		tries_remaining := 1 + kc.Retries
+		for tries_remaining > 0 {
+			tries_remaining -= 1
+			req, err := http.NewRequest("GET", url, nil)
+			if err != nil {
+				continue
+			}
+			req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+			resp, err := kc.Client.Do(req)
+			if err != nil || resp.StatusCode != http.StatusOK {
+				if resp != nil {
+					var respbody []byte
+					if resp.Body != nil {
+						respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+					}
+					errs = append(errs, fmt.Sprintf("%s: %d %s",
+						url, resp.StatusCode, strings.TrimSpace(string(respbody))))
+
+					if resp.StatusCode >= 500 {
+						// Server side failure, may be
+						// transient, can try again.
+						server_error = true
+					} else {
+						// Some other error (4xx),
+						// typically 403 or 404, don't
+						// try again.
+						tries_remaining = 0
+					}
+				} else {
+					// Probably a network error, may be
+					// transient, can try again.
+					server_error = true
+					errs = append(errs, fmt.Sprintf("%s: %v", url, err))
 				}
-				errs = append(errs, fmt.Sprintf("%s: %d %s",
-					url, resp.StatusCode, strings.TrimSpace(string(respbody))))
 			} else {
-				errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+				// Success.
+				return HashCheckingReader{
+					Reader: resp.Body,
+					Hash:   md5.New(),
+					Check:  locator[0:32],
+				}, resp.ContentLength, url, nil
 			}
-			continue
 		}
-		return HashCheckingReader{
-			Reader: resp.Body,
-			Hash:   md5.New(),
-			Check:  locator[0:32],
-		}, resp.ContentLength, url, nil
 	}
 	log.Printf("DEBUG: GET %s failed: %v", locator, errs)
-	return nil, 0, "", BlockNotFound
+
+	if server_error {
+		// There was at least one failure to get a final answer
+		return nil, 0, "", KeepServerError
+	} else {
+		// Ever server returned a 4xx error
+		return nil, 0, "", BlockNotFound
+	}
 }
 
 // Ask() verifies that a block with the given hash is available and
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index ee60d28..e2c4d90 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -184,6 +184,31 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	fh.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
+type FailThenSucceedHandler struct {
+	handled        chan string
+	count          int
+	successhandler StubGetHandler
+}
+
+func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	if fh.count == 0 {
+		resp.WriteHeader(500)
+		fh.count += 1
+		fh.handled <- fmt.Sprintf("http://%s", req.Host)
+	} else {
+		fh.successhandler.ServeHTTP(resp, req)
+	}
+}
+
+type Error404Handler struct {
+	handled chan string
+}
+
+func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	resp.WriteHeader(404)
+	fh.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
 func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 	log.Printf("TestFailedUploadToStubKeepServer")
 
@@ -479,6 +504,26 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	log.Printf("TestGet done")
 }
 
+func (s *StandaloneSuite) TestGet404(c *C) {
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	st := Error404Handler{make(chan string, 1)}
+
+	ks := RunFakeKeepServer(st)
+	defer ks.listener.Close()
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+	arv.ApiToken = "abc123"
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+	r, n, url2, err := kc.Get(hash)
+	c.Check(err, Equals, BlockNotFound)
+	c.Check(n, Equals, int64(0))
+	c.Check(url2, Equals, "")
+	c.Check(r, Equals, nil)
+}
+
 func (s *StandaloneSuite) TestGetFail(c *C) {
 	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
@@ -493,7 +538,52 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
 
 	r, n, url2, err := kc.Get(hash)
-	c.Check(err, Equals, BlockNotFound)
+	c.Check(err, Equals, KeepServerError)
+	c.Check(n, Equals, int64(0))
+	c.Check(url2, Equals, "")
+	c.Check(r, Equals, nil)
+}
+
+func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	st := &FailThenSucceedHandler{make(chan string, 1), 0,
+		StubGetHandler{
+			c,
+			hash,
+			"abc123",
+			http.StatusOK,
+			[]byte("foo")}}
+
+	ks := RunFakeKeepServer(st)
+	defer ks.listener.Close()
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+	arv.ApiToken = "abc123"
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+	r, n, url2, err := kc.Get(hash)
+	defer r.Close()
+	c.Check(err, Equals, nil)
+	c.Check(n, Equals, int64(3))
+	c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+
+	content, err2 := ioutil.ReadAll(r)
+	c.Check(err2, Equals, nil)
+	c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetNetError(c *C) {
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+	arv.ApiToken = "abc123"
+	kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, map[string]string{"http://localhost:62222": ""}, nil)
+
+	r, n, url2, err := kc.Get(hash)
+	c.Check(err, Equals, KeepServerError)
 	c.Check(n, Equals, int64(0))
 	c.Check(url2, Equals, "")
 	c.Check(r, Equals, nil)
@@ -675,7 +765,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	content := []byte("waz")
 	hash := fmt.Sprintf("%x", md5.Sum(content))
 
-	fh := FailHandler{
+	fh := Error404Handler{
 		make(chan string, 4)}
 
 	st := StubGetHandler{

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list