[ARVADOS] created: 2.1.0-629-g757aa11ab

Git user git at public.arvados.org
Mon Apr 12 14:52:29 UTC 2021


        at  757aa11abba0455c122430755a0c28b81f61ab36 (commit)


commit 757aa11abba0455c122430755a0c28b81f61ab36
Author: Tom Clegg <tom at curii.com>
Date:   Mon Apr 12 10:52:22 2021 -0400

    17392: Test satisfying storage classes via multiple servers.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 94cfece8c..f59d16fd3 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -16,12 +16,14 @@ import (
 	"net/http"
 	"os"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
 	"git.arvados.org/arvados.git/sdk/go/arvadostest"
 	. "gopkg.in/check.v1"
+	check "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -102,12 +104,19 @@ type StubPutHandler struct {
 	expectStorageClass   string
 	returnStorageClasses string
 	handled              chan string
+	requests             []*http.Request
+	mtx                  sync.Mutex
 }
 
-func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	sph.mtx.Lock()
+	sph.requests = append(sph.requests, req)
+	sph.mtx.Unlock()
 	sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
 	sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
-	sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+	if sph.expectStorageClass != "*" {
+		sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+	}
 	body, err := ioutil.ReadAll(req.Body)
 	sph.c.Check(err, Equals, nil)
 	sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
@@ -152,13 +161,15 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 	log.Printf("TestUploadToStubKeepServer")
 
-	st := StubPutHandler{
-		c,
-		"acbd18db4cc2f85cedef654fccc4a4d8",
-		"abc123",
-		"foo",
-		"", "default=1",
-		make(chan string)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "default=1",
+		handled:              make(chan string),
+	}
 
 	UploadToStubHelper(c, st,
 		func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -174,13 +185,15 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
-	st := StubPutHandler{
-		c,
-		"acbd18db4cc2f85cedef654fccc4a4d8",
-		"abc123",
-		"foo",
-		"", "default=1",
-		make(chan string)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "default=1",
+		handled:              make(chan string),
+	}
 
 	UploadToStubHelper(c, st,
 		func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -204,13 +217,15 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
 		{" =foo=1 ", nil},
 		{"foo", nil},
 	} {
-		st := StubPutHandler{
-			c,
-			"acbd18db4cc2f85cedef654fccc4a4d8",
-			"abc123",
-			"foo",
-			"", trial.respHeader,
-			make(chan string)}
+		st := &StubPutHandler{
+			c:                    c,
+			expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+			expectAPIToken:       "abc123",
+			expectBody:           "foo",
+			expectStorageClass:   "",
+			returnStorageClasses: trial.respHeader,
+			handled:              make(chan string),
+		}
 
 		UploadToStubHelper(c, st,
 			func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
@@ -226,6 +241,65 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
 	}
 }
 
+func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
+	nServers := 5
+	for _, trial := range []struct {
+		replicas    int
+		classes     []string
+		minRequests int
+		maxRequests int
+		success     bool
+	}{
+		{1, []string{"class1"}, 1, 1, true},
+		{2, []string{"class1"}, 1, 2, true},
+		{3, []string{"class1"}, 2, 3, true},
+		{1, []string{"class1", "class2"}, 1, 1, true},
+		{nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
+		{1, []string{"class404"}, nServers, nServers, false},
+		{1, []string{"class1", "class404"}, nServers, nServers, false},
+	} {
+		c.Logf("%+v", trial)
+		st := &StubPutHandler{
+			c:                    c,
+			expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+			expectAPIToken:       "abc123",
+			expectBody:           "foo",
+			expectStorageClass:   "*",
+			returnStorageClasses: "class1=2, class2=2",
+			handled:              make(chan string, 100),
+		}
+		ks := RunSomeFakeKeepServers(st, nServers)
+		arv, _ := arvadosclient.MakeArvadosClient()
+		kc, _ := MakeKeepClient(arv)
+		kc.Want_replicas = trial.replicas
+		kc.StorageClasses = trial.classes
+		arv.ApiToken = "abc123"
+		localRoots := make(map[string]string)
+		writableLocalRoots := make(map[string]string)
+		for i, k := range ks {
+			localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+			writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+			defer k.listener.Close()
+		}
+		kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+		_, _, err := kc.PutB([]byte("foo"))
+		if trial.success {
+			c.Check(err, check.IsNil)
+		} else {
+			c.Check(err, check.NotNil)
+		}
+		c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
+		c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
+		if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
+			// Max concurrency should be 1. First request
+			// should have succeeded for class1. Second
+			// request should only ask for class404.
+			c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
+		}
+	}
+}
+
 type FailHandler struct {
 	handled chan string
 }
@@ -303,14 +377,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
 func (s *StandaloneSuite) TestPutB(c *C) {
 	hash := Md5String("foo")
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 5)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 5),
+	}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(arv)
@@ -346,14 +421,15 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 func (s *StandaloneSuite) TestPutHR(c *C) {
 	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 5)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 5),
+	}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(arv)
@@ -396,14 +472,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 func (s *StandaloneSuite) TestPutWithFail(c *C) {
 	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 4)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 4),
+	}
 
 	fh := FailHandler{
 		make(chan string, 1)}
@@ -457,14 +534,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 1)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 1),
+	}
 
 	fh := FailHandler{
 		make(chan string, 4)}
@@ -1066,14 +1144,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
 	hash := Md5String("foo")
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 5)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 5),
+	}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(arv)
@@ -1106,14 +1185,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
 	hash := Md5String("foo")
 
-	st := StubPutHandler{
-		c,
-		hash,
-		"abc123",
-		"foo",
-		"",
-		"",
-		make(chan string, 5)}
+	st := &StubPutHandler{
+		c:                    c,
+		expectPath:           hash,
+		expectAPIToken:       "abc123",
+		expectBody:           "foo",
+		expectStorageClass:   "",
+		returnStorageClasses: "",
+		handled:              make(chan string, 5),
+	}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(arv)
@@ -1283,14 +1363,16 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
 	st := &FailThenSucceedHandler{
 		handled: make(chan string, 1),
-		successhandler: StubPutHandler{
-			c,
-			Md5String("foo"),
-			"abc123",
-			"foo",
-			"",
-			"",
-			make(chan string, 5)}}
+		successhandler: &StubPutHandler{
+			c:                    c,
+			expectPath:           Md5String("foo"),
+			expectAPIToken:       "abc123",
+			expectBody:           "foo",
+			expectStorageClass:   "",
+			returnStorageClasses: "",
+			handled:              make(chan string, 5),
+		},
+	}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(arv)

commit c3e41ce1ad90a041d380a834e1b699685f0a5658
Author: Tom Clegg <tom at curii.com>
Date:   Tue Apr 6 11:28:14 2021 -0400

    17392: Ensure requested storage classes are satisfied on write.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 21913ff96..454181265 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -83,8 +83,12 @@ var ErrNoSuchKeepServer = errors.New("No keep server matching the given UUID is
 // ErrIncompleteIndex is returned when the Index response does not end with a new empty line
 var ErrIncompleteIndex = errors.New("Got incomplete index")
 
-const XKeepDesiredReplicas = "X-Keep-Desired-Replicas"
-const XKeepReplicasStored = "X-Keep-Replicas-Stored"
+const (
+	XKeepDesiredReplicas         = "X-Keep-Desired-Replicas"
+	XKeepReplicasStored          = "X-Keep-Replicas-Stored"
+	XKeepStorageClasses          = "X-Keep-Storage-Classes"
+	XKeepStorageClassesConfirmed = "X-Keep-Storage-Classes-Confirmed"
+)
 
 type HTTPClient interface {
 	Do(*http.Request) (*http.Response, error)
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 57a89b50a..94cfece8c 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -95,12 +95,13 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
 }
 
 type StubPutHandler struct {
-	c                  *C
-	expectPath         string
-	expectAPIToken     string
-	expectBody         string
-	expectStorageClass string
-	handled            chan string
+	c                    *C
+	expectPath           string
+	expectAPIToken       string
+	expectBody           string
+	expectStorageClass   string
+	returnStorageClasses string
+	handled              chan string
 }
 
 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -110,6 +111,10 @@ func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 	body, err := ioutil.ReadAll(req.Body)
 	sph.c.Check(err, Equals, nil)
 	sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
+	resp.Header().Set("X-Keep-Replicas-Stored", "1")
+	if sph.returnStorageClasses != "" {
+		resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
+	}
 	resp.WriteHeader(200)
 	sph.handled <- fmt.Sprintf("http://%s", req.Host)
 }
@@ -152,20 +157,19 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 		"acbd18db4cc2f85cedef654fccc4a4d8",
 		"abc123",
 		"foo",
-		"hot",
+		"", "default=1",
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
 		func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
-			kc.StorageClasses = []string{"hot"}
-			go kc.uploadToKeepServer(url, st.expectPath, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+			go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
 
 			writer.Write([]byte("foo"))
 			writer.Close()
 
 			<-st.handled
 			status := <-uploadStatusChan
-			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
 		})
 }
 
@@ -175,20 +179,53 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 		"acbd18db4cc2f85cedef654fccc4a4d8",
 		"abc123",
 		"foo",
-		"",
+		"", "default=1",
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
 		func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
-			go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
+			go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
 
 			<-st.handled
 
 			status := <-uploadStatusChan
-			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+			c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
 		})
 }
 
+func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
+	for _, trial := range []struct {
+		respHeader string
+		expectMap  map[string]int
+	}{
+		{"", nil},
+		{"foo=1", map[string]int{"foo": 1}},
+		{" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
+		{" =foo=1 ", nil},
+		{"foo", nil},
+	} {
+		st := StubPutHandler{
+			c,
+			"acbd18db4cc2f85cedef654fccc4a4d8",
+			"abc123",
+			"foo",
+			"", trial.respHeader,
+			make(chan string)}
+
+		UploadToStubHelper(c, st,
+			func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
+				go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+
+				writer.Write([]byte("foo"))
+				writer.Close()
+
+				<-st.handled
+				status := <-uploadStatusChan
+				c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
+			})
+	}
+}
+
 type FailHandler struct {
 	handled chan string
 }
@@ -235,7 +272,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 		func(kc *KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
 
-			go kc.uploadToKeepServer(url, hash, reader, uploadStatusChan, 3, kc.getRequestID())
+			go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
 
 			writer.Write([]byte("foo"))
 			writer.Close()
@@ -272,6 +309,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
@@ -314,6 +352,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
@@ -363,6 +402,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 4)}
 
 	fh := FailHandler{
@@ -423,6 +463,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 1)}
 
 	fh := FailHandler{
@@ -1031,6 +1072,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
@@ -1070,6 +1112,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
 		"abc123",
 		"foo",
 		"",
+		"",
 		make(chan string, 5)}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
@@ -1246,6 +1289,7 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
 			"abc123",
 			"foo",
 			"",
+			"",
 			make(chan string, 5)}}
 
 	arv, _ := arvadosclient.MakeArvadosClient()
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 3b1afe1e2..7b2e47ff8 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -13,6 +13,7 @@ import (
 	"log"
 	"net/http"
 	"os"
+	"strconv"
 	"strings"
 
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -52,10 +53,11 @@ type uploadStatus struct {
 	url            string
 	statusCode     int
 	replicasStored int
+	classesStored  map[string]int
 	response       string
 }
 
-func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reader,
+func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
 	uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
 
 	var req *http.Request
@@ -63,7 +65,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
 		DebugPrintf("DEBUG: [%s] Error creating request PUT %v error: %v", reqid, url, err.Error())
-		uploadStatusChan <- uploadStatus{err, url, 0, 0, ""}
+		uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, ""}
 		return
 	}
 
@@ -80,14 +82,14 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
 	req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
 	req.Header.Add("Content-Type", "application/octet-stream")
 	req.Header.Add(XKeepDesiredReplicas, fmt.Sprint(kc.Want_replicas))
-	if len(kc.StorageClasses) > 0 {
-		req.Header.Add("X-Keep-Storage-Classes", strings.Join(kc.StorageClasses, ", "))
+	if len(classesTodo) > 0 {
+		req.Header.Add(XKeepStorageClasses, strings.Join(classesTodo, ", "))
 	}
 
 	var resp *http.Response
 	if resp, err = kc.httpClient().Do(req); err != nil {
 		DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
-		uploadStatusChan <- uploadStatus{err, url, 0, 0, err.Error()}
+		uploadStatusChan <- uploadStatus{err, url, 0, 0, nil, err.Error()}
 		return
 	}
 
@@ -95,6 +97,11 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
 	if xr := resp.Header.Get(XKeepReplicasStored); xr != "" {
 		fmt.Sscanf(xr, "%d", &rep)
 	}
+	scc := resp.Header.Get(XKeepStorageClassesConfirmed)
+	classesStored, err := parseStorageClassesConfirmedHeader(scc)
+	if err != nil {
+		DebugPrintf("DEBUG: [%s] Ignoring invalid %s header %q: %s", reqid, XKeepStorageClassesConfirmed, scc, err)
+	}
 
 	defer resp.Body.Close()
 	defer io.Copy(ioutil.Discard, resp.Body)
@@ -103,16 +110,16 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, body io.Reade
 	response := strings.TrimSpace(string(respbody))
 	if err2 != nil && err2 != io.EOF {
 		DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, err2.Error(), response)
-		uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+		uploadStatusChan <- uploadStatus{err2, url, resp.StatusCode, rep, classesStored, response}
 	} else if resp.StatusCode == http.StatusOK {
 		DebugPrintf("DEBUG: [%s] Upload %v success", reqid, url)
-		uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, response}
+		uploadStatusChan <- uploadStatus{nil, url, resp.StatusCode, rep, classesStored, response}
 	} else {
 		if resp.StatusCode >= 300 && response == "" {
 			response = resp.Status
 		}
 		DebugPrintf("DEBUG: [%s] Upload %v error: %v response: %v", reqid, url, resp.StatusCode, response)
-		uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
+		uploadStatusChan <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, classesStored, response}
 	}
 }
 
@@ -146,30 +153,55 @@ func (kc *KeepClient) putReplicas(
 		}()
 	}()
 
+	replicasWanted := kc.Want_replicas
+	replicasTodo := map[string]int{}
+	for _, c := range kc.StorageClasses {
+		replicasTodo[c] = replicasWanted
+	}
 	replicasDone := 0
-	replicasTodo := kc.Want_replicas
 
 	replicasPerThread := kc.replicasPerService
 	if replicasPerThread < 1 {
 		// unlimited or unknown
-		replicasPerThread = replicasTodo
+		replicasPerThread = replicasWanted
 	}
 
 	retriesRemaining := 1 + kc.Retries
 	var retryServers []string
 
 	lastError := make(map[string]string)
+	trackingClasses := len(replicasTodo) > 0
 
 	for retriesRemaining > 0 {
 		retriesRemaining--
 		nextServer = 0
 		retryServers = []string{}
-		for replicasTodo > 0 {
-			for active*replicasPerThread < replicasTodo {
+		for {
+			var classesTodo []string
+			var maxConcurrency int
+			for sc, r := range replicasTodo {
+				classesTodo = append(classesTodo, sc)
+				if maxConcurrency == 0 || maxConcurrency > r {
+					// Having more than r
+					// writes in flight
+					// would overreplicate
+					// class sc.
+					maxConcurrency = r
+				}
+			}
+			if !trackingClasses {
+				maxConcurrency = replicasWanted - replicasDone
+			}
+			if maxConcurrency < 1 {
+				// If there are no non-zero entries in
+				// replicasTodo, we're done.
+				break
+			}
+			for active*replicasPerThread < maxConcurrency {
 				// Start some upload requests
 				if nextServer < len(sv) {
 					DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
-					go kc.uploadToKeepServer(sv[nextServer], hash, getReader(), uploadStatusChan, expectedLength, reqid)
+					go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
 					nextServer++
 					active++
 				} else {
@@ -184,36 +216,48 @@ func (kc *KeepClient) putReplicas(
 					break
 				}
 			}
-			DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v",
-				reqid, replicasTodo, active)
-
-			// Now wait for something to happen.
-			if active > 0 {
-				status := <-uploadStatusChan
-				active--
-
-				if status.statusCode == 200 {
-					// good news!
-					replicasDone += status.replicasStored
-					replicasTodo -= status.replicasStored
-					locator = status.response
-					delete(lastError, status.url)
-				} else {
-					msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
-					if len(msg) > 100 {
-						msg = msg[:100]
-					}
-					lastError[status.url] = msg
-				}
 
-				if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
-					(status.statusCode >= 500 && status.statusCode != 503) {
-					// Timeout, too many requests, or other server side failure
-					// Do not retry when status code is 503, which means the keep server is full
-					retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
+			DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+			if active < 1 {
+				break
+			}
+
+			// Wait for something to happen.
+			status := <-uploadStatusChan
+			active--
+
+			if status.statusCode == http.StatusOK {
+				delete(lastError, status.url)
+				replicasDone += status.replicasStored
+				if len(status.classesStored) == 0 {
+					// Server doesn't report
+					// storage classes. Give up
+					// trying to track which ones
+					// are satisfied; just rely on
+					// total # replicas.
+					trackingClasses = false
 				}
+				for className, replicas := range status.classesStored {
+					if replicasTodo[className] > replicas {
+						replicasTodo[className] -= replicas
+					} else {
+						delete(replicasTodo, className)
+					}
+				}
+				locator = status.response
 			} else {
-				break
+				msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+				if len(msg) > 100 {
+					msg = msg[:100]
+				}
+				lastError[status.url] = msg
+			}
+
+			if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+				(status.statusCode >= 500 && status.statusCode != 503) {
+				// Timeout, too many requests, or other server side failure
+				// Do not retry when status code is 503, which means the keep server is full
+				retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
 			}
 		}
 
@@ -222,3 +266,30 @@ func (kc *KeepClient) putReplicas(
 
 	return locator, replicasDone, nil
 }
+
+func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
+	if hdr == "" {
+		return nil, nil
+	}
+	classesStored := map[string]int{}
+	for _, cr := range strings.Split(hdr, ",") {
+		cr = strings.TrimSpace(cr)
+		if cr == "" {
+			continue
+		}
+		fields := strings.SplitN(cr, "=", 2)
+		if len(fields) != 2 {
+			return nil, fmt.Errorf("expected exactly one '=' char in entry %q", cr)
+		}
+		className := fields[0]
+		if className == "" {
+			return nil, fmt.Errorf("empty class name in entry %q", cr)
+		}
+		replicas, err := strconv.Atoi(fields[1])
+		if err != nil || replicas < 1 {
+			return nil, fmt.Errorf("invalid replica count %q", fields[1])
+		}
+		classesStored[className] = replicas
+	}
+	return classesStored, nil
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list