[ARVADOS] created: 2.1.0-629-g757aa11ab
Git user
git at public.arvados.org
Mon Apr 12 14:52:29 UTC 2021
at 757aa11abba0455c122430755a0c28b81f61ab36 (commit)
commit 757aa11abba0455c122430755a0c28b81f61ab36
Author: Tom Clegg <tom at curii.com>
Date: Mon Apr 12 10:52:22 2021 -0400
17392: Test satisfying storage classes via multiple servers.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 94cfece8c..f59d16fd3 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -16,12 +16,14 @@ import (
"net/http"
"os"
"strings"
+ "sync"
"testing"
"time"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
// Gocheck boilerplate
@@ -102,12 +104,19 @@ type StubPutHandler struct {
expectStorageClass string
returnStorageClasses string
handled chan string
+ requests []*http.Request
+ mtx sync.Mutex
}
-func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ sph.mtx.Lock()
+ sph.requests = append(sph.requests, req)
+ sph.mtx.Unlock()
sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
- sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+ if sph.expectStorageClass != "*" {
+ sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+ }
body, err := ioutil.ReadAll(req.Body)
sph.c.Check(err, Equals, nil)
sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
@@ -152,13 +161,15 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
log.Printf("TestUploadToStubKeepServer")
- st := StubPutHandler{
- c,
- "acbd18db4cc2f85cedef654fccc4a4d8",
- "abc123",
- "foo",
- "", "default=1",
- make(chan string)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "default=1",
+ handled: make(chan string),
+ }
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -174,13 +185,15 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
}
func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
- st := StubPutHandler{
- c,
- "acbd18db4cc2f85cedef654fccc4a4d8",
- "abc123",
- "foo",
- "", "default=1",
- make(chan string)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "default=1",
+ handled: make(chan string),
+ }
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -204,13 +217,15 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
{" =foo=1 ", nil},
{"foo", nil},
} {
- st := StubPutHandler{
- c,
- "acbd18db4cc2f85cedef654fccc4a4d8",
- "abc123",
- "foo",
- "", trial.respHeader,
- make(chan string)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: trial.respHeader,
+ handled: make(chan string),
+ }
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -226,6 +241,65 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
}
}
+func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
+ nServers := 5
+ for _, trial := range []struct {
+ replicas int
+ classes []string
+ minRequests int
+ maxRequests int
+ success bool
+ }{
+ {1, []string{"class1"}, 1, 1, true},
+ {2, []string{"class1"}, 1, 2, true},
+ {3, []string{"class1"}, 2, 3, true},
+ {1, []string{"class1", "class2"}, 1, 1, true},
+ {nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
+ {1, []string{"class404"}, nServers, nServers, false},
+ {1, []string{"class1", "class404"}, nServers, nServers, false},
+ } {
+ c.Logf("%+v", trial)
+ st := &StubPutHandler{
+ c: c,
+ expectPath: "acbd18db4cc2f85cedef654fccc4a4d8",
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "*",
+ returnStorageClasses: "class1=2, class2=2",
+ handled: make(chan string, 100),
+ }
+ ks := RunSomeFakeKeepServers(st, nServers)
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(arv)
+ kc.Want_replicas = trial.replicas
+ kc.StorageClasses = trial.classes
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
+ kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+ _, _, err := kc.PutB([]byte("foo"))
+ if trial.success {
+ c.Check(err, check.IsNil)
+ } else {
+ c.Check(err, check.NotNil)
+ }
+ c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
+ c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
+ if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
+ // Max concurrency should be 1. First request
+ // should have succeeded for class1. Second
+ // request should only ask for class404.
+ c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
+ }
+ }
+}
+
type FailHandler struct {
handled chan string
}
@@ -303,14 +377,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
func (s *StandaloneSuite) TestPutB(c *C) {
hash := Md5String("foo")
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 5)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ }
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
@@ -346,14 +421,15 @@ func (s *StandaloneSuite) TestPutB(c *C) {
func (s *StandaloneSuite) TestPutHR(c *C) {
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 5)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ }
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
@@ -396,14 +472,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
func (s *StandaloneSuite) TestPutWithFail(c *C) {
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 4)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 4),
+ }
fh := FailHandler{
make(chan string, 1)}
@@ -457,14 +534,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 1)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 1),
+ }
fh := FailHandler{
make(chan string, 4)}
@@ -1066,14 +1144,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
hash := Md5String("foo")
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 5)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ }
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
@@ -1106,14 +1185,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
hash := Md5String("foo")
- st := StubPutHandler{
- c,
- hash,
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 5)}
+ st := &StubPutHandler{
+ c: c,
+ expectPath: hash,
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ }
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
@@ -1283,14 +1363,16 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
func (s *StandaloneSuite) TestPutBRetry(c *C) {
st := &FailThenSucceedHandler{
handled: make(chan string, 1),
- successhandler: StubPutHandler{
- c,
- Md5String("foo"),
- "abc123",
- "foo",
- "",
- "",
- make(chan string, 5)}}
+ successhandler: &StubPutHandler{
+ c: c,
+ expectPath: Md5String("foo"),
+ expectAPIToken: "abc123",
+ expectBody: "foo",
+ expectStorageClass: "",
+ returnStorageClasses: "",
+ handled: make(chan string, 5),
+ },
+ }
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
commit c3e41ce1ad90a041d380a834e1b699685f0a5658
Author: Tom Clegg <tom at curii.com>
Date: Tue Apr 6 11:28:14 2021 -0400
17392: Ensure requested storage classes are satisfied on write.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 21913ff96..454181265 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -83,8 +83,12 @@ var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is
// ErrIncompleteIndex is returned when the Index response does not end with a new empty line
var ErrIncompleteIndex = errors.New("Got incomplete index")
-const XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
-const XKeepReplicasStored = "X-Keep-Replicas-Stored"
+const (
+ XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
+ XKeepReplicasStored = "X-Keep-Replicas-Stored"
+ XKeepStorageClasses = "X-Keep-Storage-Classes"
+ XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed"
+)
type HTTPClient interface {
Do(*http.Request) (*http.Response, error)
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 57a89b50a..94cfece8c 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -95,12 +95,13 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
}
type StubPutHandler struct {
- c *C
- expectPath string
- expectAPIToken string
- expectBody string
- expectStorageClass string
- handled chan string
+ c *C
+ expectPath string
+ expectAPIToken string
+ expectBody string
+ expectStorageClass string
+ returnStorageClasses string
+ handled chan string
}
func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -110,6 +111,10 @@ func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
body, err := ioutil.ReadAll(req.Body)
sph.c.Check(err, Equals, nil)
sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
+ resp.Header().Set("X-Keep-Replicas-Stored", "1")
+ if sph.returnStorageClasses != "" {
+ resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
+ }
resp.WriteHeader(200)
sph.handled <- fmt.Sprintf("http://%s", req.Host)
}
@@ -152,20 +157,19 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
"acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
- "hot",
+ "", "default=1",
make(chan string)}
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- kc.StorageClasses = []string{"hot"}
- go kc.uploadToKeepServer(url, st.expectPath, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
<-st.handled
status := <-uploadStatusChan
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
})
}
@@ -175,20 +179,53 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
"acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
- "",
+ "", "default=1",
make(chan string)}
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
<-st.handled
status := <-uploadStatusChan
- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
})
}
+func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
+ for _, trial := range []struct {
+ respHeader string
+ expectMap map[string]int
+ }{
+ {"", nil},
+ {"foo=1", map[string]int{"foo": 1}},
+ {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
+ {" =foo=1 ", nil},
+ {"foo", nil},
+ } {
+ st := StubPutHandler{
+ c,
+ "acbd18db4cc2f85cedef654fccc4a4d8",
+ "abc123",
+ "foo",
+ "", trial.respHeader,
+ make(chan string)}
+
+ UploadToStubHelper(c, st,
+ func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+
+ writer.Write([]byte("foo"))
+ writer.Close()
+
+ <-st.handled
+ status := <-uploadStatusChan
+ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
+ })
+ }
+}
+
type FailHandler struct {
handled chan string
}
@@ -235,7 +272,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, hash, reader, uploadStatusChan, 3, kc.getRequestID())
+ go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
@@ -272,6 +309,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
@@ -314,6 +352,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
@@ -363,6 +402,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 4)}
fh := FailHandler{
@@ -423,6 +463,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 1)}
fh := FailHandler{
@@ -1031,6 +1072,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
"abc123",
"foo",
"",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
@@ -1070,6 +1112,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
@@ -1246,6 +1289,7 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
"abc123",
"foo",
"",
+ "",
make(chan string, 5)}}
arv, _ := arvadosclient.MakeArvadosClient()
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 3b1afe1e2..7b2e47ff8 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -13,6 +13,7 @@ import (
"log"
"net/http"
"os"
+ "strconv"
"strings"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -52,10 +53,11 @@ type uploadStatus struct {
url string
statusCode int
replicasStored int
+ classesStored map[string]int
response string
}
-func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
+func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
var req *http.Request
@@ -63,7 +65,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error())
- uploadStatusChan <- uploadStatus{err, url, 0, 0, ""}
+ uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, ""}
return
}
@@ -80,14 +82,14 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
req.Header.Add("Content-Type", "application/octet-stream")
req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas))
- if len(kc.StorageClasses) > 0 {
- req.Header.Add("X-Keep-Storage-Classes", strings.Join(kc.StorageClasses, ", "))
+ if len(classesTodo) > 0 {
+ req.Header.Add(XKeepStorageClasses, strings.Join(classesTodo, ", "))
}
var resp *http.Response
if resp, err = kc.httpClient().Do(req); err != nil {
DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
- uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()}
+ uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, err.Error()}
return
}
@@ -95,6 +97,11 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
if xr := resp.Header.Get(XKeepReplicasStored); xr != "" {
fmt.Sscanf(xr, "%d", &rep)
}
+ scc := resp.Header.Get(XKeepStorageClassesConfirmed)
+ classesStored, err := parseStorageClassesConfirmedHeader(scc)
+ if err != nil {
+ DebugPrintf("DEBUG: [%s] Ignoring invalid %s header %q: %s", reqid, XKeepStorageClassesConfirmed, scc, err)
+ }
defer resp.Body.Close()
defer io.Copy(ioutil.Discard, resp.Body)
@@ -103,16 +110,16 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
response := strings.TrimSpace(string(respbody))
if err2 != nil && err2 != io.EOF {
DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response)
- uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+ uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, classesStored, response}
} else if resp.StatusCode == http.StatusOK {
DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url)
- uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response}
+ uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, classesStored, response}
} else {
if resp.StatusCode >= 300 && response == "" {
response = resp.Status
}
DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response)
- uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
+ uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, classesStored, response}
}
}
@@ -146,30 +153,55 @@ func (kc *KeepClient) putReplicas(
}()
}()
+ replicasWanted := kc.Want_replicas
+ replicasTodo := map[string]int{}
+ for _, c := range kc.StorageClasses {
+ replicasTodo[c] = replicasWanted
+ }
replicasDone := 0
- replicasTodo := kc.Want_replicas
replicasPerThread := kc.replicasPerService
if replicasPerThread < 1 {
// unlimited or unknown
- replicasPerThread = replicasTodo
+ replicasPerThread = replicasWanted
}
retriesRemaining := 1 + kc.Retries
var retryServers []string
lastError := make(map[string]string)
+ trackingClasses := len(replicasTodo) > 0
for retriesRemaining > 0 {
retriesRemaining--
nextServer = 0
retryServers = []string{}
- for replicasTodo > 0 {
- for active*replicasPerThread < replicasTodo {
+ for {
+ var classesTodo []string
+ var maxConcurrency int
+ for sc, r := range replicasTodo {
+ classesTodo = append(classesTodo, sc)
+ if maxConcurrency == 0 || maxConcurrency > r {
+ // Having more than r
+ // writes in flight
+ // would overreplicate
+ // class sc.
+ maxConcurrency = r
+ }
+ }
+ if !trackingClasses {
+ maxConcurrency = replicasWanted - replicasDone
+ }
+ if maxConcurrency < 1 {
+ // If there are no non-zero entries in
+ // replicasTodo, we're done.
+ break
+ }
+ for active*replicasPerThread < maxConcurrency {
// Start some upload requests
if nextServer < len(sv) {
DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
- go kc.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid)
+ go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
nextServer++
active++
} else {
@@ -184,36 +216,48 @@ func (kc *KeepClient) putReplicas(
break
}
}
- DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v",
- reqid, replicasTodo, active)
-
- // Now wait for something to happen.
- if active > 0 {
- status := <-uploadStatusChan
- active--
-
- if status.statusCode == 200 {
- // good news!
- replicasDone += status.replicasStored
- replicasTodo -= status.replicasStored
- locator = status.response
- delete(lastError, status.url)
- } else {
- msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
- if len(msg) > 100 {
- msg = msg[:100]
- }
- lastError[status.url] = msg
- }
- if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
- (status.statusCode >= 500 && status.statusCode != 503) {
- // Timeout, too many requests, or other server side failure
- // Do not retry when status code is 503, which means the keep server is full
- retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
+ DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+ if active < 1 {
+ break
+ }
+
+ // Wait for something to happen.
+ status := <-uploadStatusChan
+ active--
+
+ if status.statusCode == http.StatusOK {
+ delete(lastError, status.url)
+ replicasDone += status.replicasStored
+ if len(status.classesStored) == 0 {
+ // Server doesn't report
+ // storage classes. Give up
+ // trying to track which ones
+ // are satisfied; just rely on
+ // total # replicas.
+ trackingClasses = false
}
+ for className, replicas := range status.classesStored {
+ if replicasTodo[className] > replicas {
+ replicasTodo[className] -= replicas
+ } else {
+ delete(replicasTodo, className)
+ }
+ }
+ locator = status.response
} else {
- break
+ msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+ if len(msg) > 100 {
+ msg = msg[:100]
+ }
+ lastError[status.url] = msg
+ }
+
+ if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+ (status.statusCode >= 500 && status.statusCode != 503) {
+ // Timeout, too many requests, or other server side failure
+ // Do not retry when status code is 503, which means the keep server is full
+ retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
}
@@ -222,3 +266,30 @@ func (kc *KeepClient) putReplicas(
return locator, replicasDone, nil
}
+
+func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
+ if hdr == "" {
+ return nil, nil
+ }
+ classesStored := map[string]int{}
+ for _, cr := range strings.Split(hdr, ",") {
+ cr = strings.TrimSpace(cr)
+ if cr == "" {
+ continue
+ }
+ fields := strings.SplitN(cr, "=", 2)
+ if len(fields) != 2 {
+ return nil, fmt.Errorf("expected exactly one '=' char in entry %q", cr)
+ }
+ className := fields[0]
+ if className == "" {
+ return nil, fmt.Errorf("empty class name in entry %q", cr)
+ }
+ replicas, err := strconv.Atoi(fields[1])
+ if err != nil || replicas < 1 {
+ return nil, fmt.Errorf("invalid replica count %q", fields[1])
+ }
+ classesStored[className] = replicas
+ }
+ return classesStored, nil
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list