[ARVADOS] created: 8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 9 12:17:06 EDT 2015
at 8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4 (commit)
commit 8653e6d34f5d895ee56fe3657c7fc3ca3cf615f4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 9 12:18:57 2015 -0400
7491: Go keepclient retries GET on either network error, or 500 error from
server. Retry count is KeepClient.Retries. Also updated KeepClient
constructor to use "defaultCollectionReplication" from discovery document if
available.
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 53dfb2b..cc6efcb 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -29,6 +29,7 @@ var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.It
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
var InvalidLocatorError = errors.New("Invalid locator")
+var KeepServerError = errors.New("One or more keep servers returned an error")
// ErrNoSuchKeepServer is returned when GetIndex is invoked with a UUID with no matching keep server
var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is found")
@@ -49,6 +50,7 @@ type KeepClient struct {
gatewayRoots *map[string]string
lock sync.RWMutex
Client *http.Client
+ Retries int
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
@@ -59,12 +61,23 @@ type KeepClient struct {
func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ defaultReplicationLevel := 2
+ value, err := arv.Discovery("defaultCollectionReplication")
+ if err == nil {
+ v, ok := value.(float64)
+ if ok && v > 0 {
+ defaultReplicationLevel = int(v)
+ }
+ }
+
kc := &KeepClient{
Arvados: arv,
- Want_replicas: 2,
+ Want_replicas: defaultReplicationLevel,
Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
+ Retries: 2,
}
return kc, kc.DiscoverKeepServers()
}
@@ -136,35 +149,63 @@ func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error
// instead of EOF.
func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
var errs []string
+ server_error := false
+
for _, host := range kc.getSortedRoots(locator) {
url := host + "/" + locator
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- continue
- }
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
- resp, err := kc.Client.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- if resp != nil {
- var respbody []byte
- if resp.Body != nil {
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ tries_remaining := 1 + kc.Retries
+ for tries_remaining > 0 {
+ tries_remaining -= 1
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ continue
+ }
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
+ if resp != nil {
+ var respbody []byte
+ if resp.Body != nil {
+ respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ }
+ errs = append(errs, fmt.Sprintf("%s: %d %s",
+ url, resp.StatusCode, strings.TrimSpace(string(respbody))))
+
+ if resp.StatusCode >= 500 {
+ // Server side failure, may be
+ // transient, can try again.
+ server_error = true
+ } else {
+ // Some other error (4xx),
+ // typically 403 or 404, don't
+ // try again.
+ tries_remaining = 0
+ }
+ } else {
+ // Probably a network error, may be
+ // transient, can try again.
+ server_error = true
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
}
- errs = append(errs, fmt.Sprintf("%s: %d %s",
- url, resp.StatusCode, strings.TrimSpace(string(respbody))))
} else {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ // Success.
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, resp.ContentLength, url, nil
}
- continue
}
- return HashCheckingReader{
- Reader: resp.Body,
- Hash: md5.New(),
- Check: locator[0:32],
- }, resp.ContentLength, url, nil
}
log.Printf("DEBUG: GET %s failed: %v", locator, errs)
- return nil, 0, "", BlockNotFound
+
+ if server_error {
+ // There was at least one failure to get a final answer
+ return nil, 0, "", KeepServerError
+ } else {
+ // Ever server returned a 4xx error
+ return nil, 0, "", BlockNotFound
+ }
}
// Ask() verifies that a block with the given hash is available and
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index ee60d28..e2c4d90 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -184,6 +184,31 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
fh.handled <- fmt.Sprintf("http://%s", req.Host)
}
+type FailThenSucceedHandler struct {
+ handled chan string
+ count int
+ successhandler StubGetHandler
+}
+
+func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if fh.count == 0 {
+ resp.WriteHeader(500)
+ fh.count += 1
+ fh.handled <- fmt.Sprintf("http://%s", req.Host)
+ } else {
+ fh.successhandler.ServeHTTP(resp, req)
+ }
+}
+
+type Error404Handler struct {
+ handled chan string
+}
+
+func (fh Error404Handler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ resp.WriteHeader(404)
+ fh.handled <- fmt.Sprintf("http://%s", req.Host)
+}
+
func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
log.Printf("TestFailedUploadToStubKeepServer")
@@ -479,6 +504,26 @@ func (s *StandaloneSuite) TestGet(c *C) {
log.Printf("TestGet done")
}
+func (s *StandaloneSuite) TestGet404(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := Error404Handler{make(chan string, 1)}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ c.Check(err, Equals, BlockNotFound)
+ c.Check(n, Equals, int64(0))
+ c.Check(url2, Equals, "")
+ c.Check(r, Equals, nil)
+}
+
func (s *StandaloneSuite) TestGetFail(c *C) {
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
@@ -493,7 +538,52 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
r, n, url2, err := kc.Get(hash)
- c.Check(err, Equals, BlockNotFound)
+ c.Check(err, Equals, KeepServerError)
+ c.Check(n, Equals, int64(0))
+ c.Check(url2, Equals, "")
+ c.Check(r, Equals, nil)
+}
+
+func (s *StandaloneSuite) TestGetFailRetry(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ st := &FailThenSucceedHandler{make(chan string, 1), 0,
+ StubGetHandler{
+ c,
+ hash,
+ "abc123",
+ http.StatusOK,
+ []byte("foo")}}
+
+ ks := RunFakeKeepServer(st)
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, map[string]string{ks.url: ""}, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ defer r.Close()
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+
+ content, err2 := ioutil.ReadAll(r)
+ c.Check(err2, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
+func (s *StandaloneSuite) TestGetNetError(c *C) {
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, map[string]string{"http://localhost:62222": ""}, nil)
+
+ r, n, url2, err := kc.Get(hash)
+ c.Check(err, Equals, KeepServerError)
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
c.Check(r, Equals, nil)
@@ -675,7 +765,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
content := []byte("waz")
hash := fmt.Sprintf("%x", md5.Sum(content))
- fh := FailHandler{
+ fh := Error404Handler{
make(chan string, 4)}
st := StubGetHandler{
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list