[ARVADOS] created: 2.1.0-617-ga77b278e2

Git user git at public.arvados.org
Wed Mar 31 21:56:32 UTC 2021


        at  a77b278e24487974232543c56e5cef0304aeee01 (commit)


commit a77b278e24487974232543c56e5cef0304aeee01
Author: Tom Clegg <tom at curii.com>
Date:   Wed Mar 31 16:56:14 2021 -0400

    13382: Use caller-specified storage classes when writing.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 76cacc569..a9d7330c0 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -22,6 +22,7 @@ import (
 	"net/http/httptest"
 	"os"
 	"regexp"
+	"sort"
 	"strings"
 	"time"
 
@@ -71,10 +72,11 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
-	uri         string
-	apiToken    string
-	method      string
-	requestBody []byte
+	uri            string
+	apiToken       string
+	method         string
+	requestBody    []byte
+	storageClasses string
 }
 
 // Test GetBlockHandler on the following situations:
@@ -754,25 +756,25 @@ func (s *HandlerSuite) TestPullHandler(c *check.C) {
 	var testcases = []pullTest{
 		{
 			"Valid pull list from an ordinary user",
-			RequestTester{"/pull", userToken, "PUT", goodJSON},
+			RequestTester{"/pull", userToken, "PUT", goodJSON, ""},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
 			"Invalid pull request from an ordinary user",
-			RequestTester{"/pull", userToken, "PUT", badJSON},
+			RequestTester{"/pull", userToken, "PUT", badJSON, ""},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
 			"Valid pull request from the data manager",
-			RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON},
+			RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
 			http.StatusOK,
 			"Received 3 pull requests\n",
 		},
 		{
 			"Invalid pull request from the data manager",
-			RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON},
+			RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", badJSON, ""},
 			http.StatusBadRequest,
 			"",
 		},
@@ -866,25 +868,25 @@ func (s *HandlerSuite) TestTrashHandler(c *check.C) {
 	var testcases = []trashTest{
 		{
 			"Valid trash list from an ordinary user",
-			RequestTester{"/trash", userToken, "PUT", goodJSON},
+			RequestTester{"/trash", userToken, "PUT", goodJSON, ""},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
 			"Invalid trash list from an ordinary user",
-			RequestTester{"/trash", userToken, "PUT", badJSON},
+			RequestTester{"/trash", userToken, "PUT", badJSON, ""},
 			http.StatusUnauthorized,
 			"Unauthorized\n",
 		},
 		{
 			"Valid trash list from the data manager",
-			RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON},
+			RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", goodJSON, ""},
 			http.StatusOK,
 			"Received 3 trash requests\n",
 		},
 		{
 			"Invalid trash list from the data manager",
-			RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON},
+			RequestTester{"/trash", s.cluster.SystemRootToken, "PUT", badJSON, ""},
 			http.StatusBadRequest,
 			"",
 		},
@@ -921,6 +923,9 @@ func IssueRequest(handler http.Handler, rt *RequestTester) *httptest.ResponseRec
 	if rt.apiToken != "" {
 		req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
 	}
+	if rt.storageClasses != "" {
+		req.Header.Set("X-Keep-Storage-Classes", rt.storageClasses)
+	}
 	handler.ServeHTTP(response, req)
 	return response
 }
@@ -1113,6 +1118,46 @@ func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) {
 	}
 }
 
+func (s *HandlerSuite) TestPutStorageClasses(c *check.C) {
+	s.cluster.Volumes = map[string]arvados.Volume{
+		"zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "mock"}, // "default" is implicit
+		"zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"special": true, "extra": true}},
+		"zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "mock", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
+	}
+	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+	rt := RequestTester{
+		method:      "PUT",
+		uri:         "/" + TestHash,
+		requestBody: TestBlock,
+	}
+	for _, trial := range []struct {
+		ask    string
+		expect string
+	}{
+		{"", ""},
+		{"default", "default=1"},
+		{"special", "extra=1, special=1"},
+		{"extra, special", "extra=1, special=1"},
+		{"default, special", "default=1, extra=1, special=1"},
+	} {
+		c.Logf("%#v", trial)
+		rt.storageClasses = trial.ask
+		resp := IssueRequest(s.handler, &rt)
+		if trial.expect == "" {
+			// any non-empty value is correct
+			c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Not(check.Equals), "")
+		} else {
+			c.Check(sortCommaSeparated(resp.Header().Get("X-Keep-Storage-Classes-Confirmed")), check.Equals, trial.expect)
+		}
+	}
+}
+
+func sortCommaSeparated(s string) string {
+	slice := strings.Split(s, ", ")
+	sort.Strings(slice)
+	return strings.Join(slice, ", ")
+}
+
 func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
 	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
 
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a0e7fd0e0..07c294666 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -246,6 +246,14 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
+	var wantStorageClasses []string
+	if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+		wantStorageClasses = strings.Split(hdr, ",")
+		for i, sc := range wantStorageClasses {
+			wantStorageClasses[i] = strings.TrimSpace(sc)
+		}
+	}
+
 	buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
 	if err != nil {
 		http.Error(resp, err.Error(), http.StatusServiceUnavailable)
@@ -259,7 +267,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+	result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
 	bufs.Put(buf)
 
 	if err != nil {
@@ -726,8 +734,10 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 }
 
 type putResult struct {
+	classTodo        map[string]bool
+	mountUsed        map[*VolumeMount]bool
 	totalReplication int
-	classReplication map[string]int
+	classDone        map[string]int
 }
 
 // Number of distinct replicas stored. "2" can mean the block was
@@ -741,7 +751,7 @@ func (pr putResult) TotalReplication() string {
 // "default=2; special=1".
 func (pr putResult) ClassReplication() string {
 	s := ""
-	for k, v := range pr.classReplication {
+	for k, v := range pr.classDone {
 		if len(s) > 0 {
 			s += ", "
 		}
@@ -750,15 +760,51 @@ func (pr putResult) ClassReplication() string {
 	return s
 }
 
-func newPutResult(mnt *VolumeMount) putResult {
-	result := putResult{
-		totalReplication: mnt.Replication,
-		classReplication: map[string]int{},
+func (pr *putResult) Add(mnt *VolumeMount) {
+	if pr.mountUsed[mnt] {
+		logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+		return
+	}
+	pr.mountUsed[mnt] = true
+	pr.totalReplication += mnt.Replication
+	for class := range mnt.StorageClasses {
+		pr.classDone[class] += mnt.Replication
+		delete(pr.classTodo, class)
+	}
+}
+
+func (pr *putResult) Done() bool {
+	return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putResult) Want(mnt *VolumeMount) bool {
+	if pr.Done() || pr.mountUsed[mnt] {
+		return false
+	}
+	if len(pr.classTodo) == 0 {
+		// none specified == "any"
+		return true
 	}
 	for class := range mnt.StorageClasses {
-		result.classReplication[class] += mnt.Replication
+		if pr.classTodo[class] {
+			return true
+		}
+	}
+	return false
+}
+
+func newPutResult(classes []string) putResult {
+	pr := putResult{
+		classTodo: make(map[string]bool, len(classes)),
+		classDone: map[string]int{},
+		mountUsed: map[*VolumeMount]bool{},
+	}
+	for _, c := range classes {
+		if c != "" {
+			pr.classTodo[c] = true
+		}
 	}
-	return result
+	return pr
 }
 
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
@@ -788,7 +834,7 @@ func newPutResult(mnt *VolumeMount) putResult {
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putResult, error) {
 	log := ctxlog.FromContext(ctx)
 
 	// Check that BLOCK's checksum matches HASH.
@@ -798,22 +844,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 		return putResult{}, RequestHashError
 	}
 
+	result := newPutResult(wantStorageClasses)
+
 	// If we already have this data, it's intact on disk, and we
 	// can update its timestamp, return success. If we have
 	// different data with the same hash, return failure.
-	if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+	if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
 		return result, err
-	} else if ctx.Err() != nil {
-		return putResult{}, ErrClientDisconnect
+	}
+	if ctx.Err() != nil {
+		return result, ErrClientDisconnect
 	}
 
 	// Choose a Keep volume to write to.
 	// If this volume fails, try all of the volumes in order.
-	if mnt := volmgr.NextWritable(); mnt != nil {
-		if err := mnt.Put(ctx, hash, block); err != nil {
-			log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-		} else {
-			return newPutResult(mnt), nil // success!
+	if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
+		// fall through to "try all volumes" below
+	} else if err := mnt.Put(ctx, hash, block); err != nil {
+		log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+	} else {
+		result.Add(mnt)
+		if result.Done() {
+			return result, nil
 		}
 	}
 	if ctx.Err() != nil {
@@ -828,13 +880,20 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 
 	allFull := true
 	for _, mnt := range writables {
+		if !result.Want(mnt) {
+			continue
+		}
 		err := mnt.Put(ctx, hash, block)
 		if ctx.Err() != nil {
-			return putResult{}, ErrClientDisconnect
+			return result, ErrClientDisconnect
 		}
 		switch err {
 		case nil:
-			return newPutResult(mnt), nil // success!
+			result.Add(mnt)
+			if result.Done() {
+				return result, nil
+			}
+			continue
 		case FullError:
 			continue
 		default:
@@ -846,26 +905,33 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 		}
 	}
 
-	if allFull {
-		log.Error("all volumes are full")
+	if result.totalReplication > 0 {
+		// Some, but not all, of the storage classes were
+		// satisfied. This qualifies as success.
+		return result, nil
+	} else if allFull {
+		log.Error("all volumes with qualifying storage classes are full")
 		return putResult{}, FullError
+	} else {
+		// Already logged the non-full errors.
+		return putResult{}, GenericError
 	}
-	// Already logged the non-full errors.
-	return putResult{}, GenericError
 }
 
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putResult) error {
 	log := ctxlog.FromContext(ctx)
-	var bestErr error = NotFoundError
 	for _, mnt := range volmgr.AllWritable() {
+		if !result.Want(mnt) {
+			continue
+		}
 		err := mnt.Compare(ctx, hash, buf)
 		if ctx.Err() != nil {
-			return putResult{}, ctx.Err()
+			return nil
 		} else if err == CollisionError {
 			// Stop if we have a block with same hash but
 			// different content. (It will be impossible
@@ -873,7 +939,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			// both, so there's no point writing it even
 			// on a different volume.)
 			log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-			return putResult{}, err
+			return CollisionError
 		} else if os.IsNotExist(err) {
 			// Block does not exist. This is the only
 			// "normal" error: we don't log anything.
@@ -887,13 +953,15 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 		}
 		if err := mnt.Touch(hash); err != nil {
 			log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
-			bestErr = err
 			continue
 		}
 		// Compare and Touch both worked --> done.
-		return newPutResult(mnt), nil
+		result.Add(mnt)
+		if result.Done() {
+			return nil
+		}
 	}
-	return putResult{}, bestErr
+	return nil
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
index 171dee3c4..8c88a406f 100644
--- a/services/keepstore/proxy_remote.go
+++ b/services/keepstore/proxy_remote.go
@@ -177,7 +177,7 @@ func (rrc *remoteResponseCacher) Close() error {
 		rrc.ResponseWriter.Write(rrc.Buffer)
 		return nil
 	}
-	_, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32])
+	_, err := PutBlock(rrc.Context, rrc.VolumeManager, rrc.Buffer, rrc.Locator[:32], nil)
 	if rrc.Context.Err() != nil {
 		// If caller hung up, log that instead of subsequent/misleading errors.
 		http.Error(rrc.ResponseWriter, rrc.Context.Err().Error(), http.StatusGatewayTimeout)
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index 670fa1a41..57b946924 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -89,6 +89,6 @@ var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte,
 	if volume != nil {
 		return volume.Put(context.Background(), locator, data)
 	}
-	_, err := PutBlock(context.Background(), volmgr, data, locator)
+	_, err := PutBlock(context.Background(), volmgr, data, locator, nil)
 	return err
 }
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index dfc94b676..2013c8c04 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -136,7 +136,7 @@ func (s *PullWorkerTestSuite) TestSpecifyMountUUID(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorkerPullList_with_two_locators",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 2 pull requests\n",
 		readContent:  "hello",
@@ -150,7 +150,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorkerPullList_with_one_locator",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 1 pull requests\n",
 		readContent:  "hola",
@@ -164,7 +164,7 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorker_error_on_get_one_locator",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 1 pull requests\n",
 		readContent:  "unused",
@@ -178,7 +178,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorker_error_on_get_two_locators",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 2 pull requests\n",
 		readContent:  "unused",
@@ -192,7 +192,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorker_error_on_put_one_locator",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", secondPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 1 pull requests\n",
 		readContent:  "hello hello",
@@ -206,7 +206,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorker_error_on_put_two_locators",
-		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList},
+		req:          RequestTester{"/pull", s.cluster.SystemRootToken, "PUT", firstPullList, ""},
 		responseCode: http.StatusOK,
 		responseBody: "Received 2 pull requests\n",
 		readContent:  "hello again",
@@ -221,7 +221,7 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
 func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
 	testData := PullWorkerTestData{
 		name:         "TestPullWorkerPullList_with_two_locators",
-		req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
+		req:          RequestTester{"/pull", "invalidToken", "PUT", firstPullList, ""},
 		responseCode: http.StatusUnauthorized,
 		responseBody: "Unauthorized\n",
 		readContent:  "hello",
diff --git a/services/keepstore/status_test.go b/services/keepstore/status_test.go
index 7bff2584e..cafe9f72f 100644
--- a/services/keepstore/status_test.go
+++ b/services/keepstore/status_test.go
@@ -15,7 +15,7 @@ import (
 // getStatusItem("foo","bar","baz") retrieves /status.json, decodes
 // the response body into resp, and returns resp["foo"]["bar"]["baz"].
 func getStatusItem(h *handler, keys ...string) interface{} {
-	resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil})
+	resp := IssueRequest(h, &RequestTester{"/status.json", "", "GET", nil, ""})
 	var s interface{}
 	json.NewDecoder(resp.Body).Decode(&s)
 	for _, k := range keys {
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 2de21edde..cc2d21e5a 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -148,7 +148,7 @@ func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error
 		}
 		return nil
 	} else {
-		return NotFoundError
+		return os.ErrNotExist
 	}
 }
 

commit 27f4da7ad22e867869decd1908dbfa0347b20113
Author: Tom Clegg <tom at curii.com>
Date:   Thu Mar 25 10:57:52 2021 -0400

    13382: Report storage class(es) in headers after successful write.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 17ed6402c..76cacc569 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -1113,7 +1113,7 @@ func (s *HandlerSuite) TestGetHandlerNoBufferLeak(c *check.C) {
 	}
 }
 
-func (s *HandlerSuite) TestPutReplicationHeader(c *check.C) {
+func (s *HandlerSuite) TestPutResponseHeader(c *check.C) {
 	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
 
 	resp := IssueRequest(s.handler, &RequestTester{
@@ -1121,10 +1121,9 @@ func (s *HandlerSuite) TestPutReplicationHeader(c *check.C) {
 		uri:         "/" + TestHash,
 		requestBody: TestBlock,
 	})
-	if r := resp.Header().Get("X-Keep-Replicas-Stored"); r != "1" {
-		c.Logf("%#v", resp)
-		c.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
-	}
+	c.Logf("%#v", resp)
+	c.Check(resp.Header().Get("X-Keep-Replicas-Stored"), check.Equals, "1")
+	c.Check(resp.Header().Get("X-Keep-Storage-Classes-Confirmed"), check.Equals, "default=1")
 }
 
 func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index eb0ea5ad2..a0e7fd0e0 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -259,7 +259,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+	result, err := PutBlock(ctx, rtr.volmgr, buf, hash)
 	bufs.Put(buf)
 
 	if err != nil {
@@ -279,7 +279,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
 		expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
 		returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
 	}
-	resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+	resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+	resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
 	resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -724,6 +725,42 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 	return 0, errorToCaller
 }
 
+type putResult struct {
+	totalReplication int
+	classReplication map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putResult) TotalReplication() string {
+	return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putResult) ClassReplication() string {
+	s := ""
+	for k, v := range pr.classReplication {
+		if len(s) > 0 {
+			s += ", "
+		}
+		s += k + "=" + strconv.Itoa(v)
+	}
+	return s
+}
+
+func newPutResult(mnt *VolumeMount) putResult {
+	result := putResult{
+		totalReplication: mnt.Replication,
+		classReplication: map[string]int{},
+	}
+	for class := range mnt.StorageClasses {
+		result.classReplication[class] += mnt.Replication
+	}
+	return result
+}
+
 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
 //
 // PutBlock(ctx, block, hash)
@@ -751,23 +788,23 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (putResult, error) {
 	log := ctxlog.FromContext(ctx)
 
 	// Check that BLOCK's checksum matches HASH.
 	blockhash := fmt.Sprintf("%x", md5.Sum(block))
 	if blockhash != hash {
 		log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-		return 0, RequestHashError
+		return putResult{}, RequestHashError
 	}
 
 	// If we already have this data, it's intact on disk, and we
 	// can update its timestamp, return success. If we have
 	// different data with the same hash, return failure.
-	if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
-		return n, err
+	if result, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
+		return result, err
 	} else if ctx.Err() != nil {
-		return 0, ErrClientDisconnect
+		return putResult{}, ErrClientDisconnect
 	}
 
 	// Choose a Keep volume to write to.
@@ -776,28 +813,28 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 		if err := mnt.Put(ctx, hash, block); err != nil {
 			log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
 		} else {
-			return mnt.Replication, nil // success!
+			return newPutResult(mnt), nil // success!
 		}
 	}
 	if ctx.Err() != nil {
-		return 0, ErrClientDisconnect
+		return putResult{}, ErrClientDisconnect
 	}
 
 	writables := volmgr.AllWritable()
 	if len(writables) == 0 {
 		log.Error("no writable volumes")
-		return 0, FullError
+		return putResult{}, FullError
 	}
 
 	allFull := true
-	for _, vol := range writables {
-		err := vol.Put(ctx, hash, block)
+	for _, mnt := range writables {
+		err := mnt.Put(ctx, hash, block)
 		if ctx.Err() != nil {
-			return 0, ErrClientDisconnect
+			return putResult{}, ErrClientDisconnect
 		}
 		switch err {
 		case nil:
-			return vol.Replication, nil // success!
+			return newPutResult(mnt), nil // success!
 		case FullError:
 			continue
 		default:
@@ -805,16 +842,16 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 			// write did not succeed.  Report the
 			// error and continue trying.
 			allFull = false
-			log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
+			log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
 		}
 	}
 
 	if allFull {
 		log.Error("all volumes are full")
-		return 0, FullError
+		return putResult{}, FullError
 	}
 	// Already logged the non-full errors.
-	return 0, GenericError
+	return putResult{}, GenericError
 }
 
 // CompareAndTouch returns the current replication level if one of the
@@ -822,13 +859,13 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (putResult, error) {
 	log := ctxlog.FromContext(ctx)
 	var bestErr error = NotFoundError
 	for _, mnt := range volmgr.AllWritable() {
 		err := mnt.Compare(ctx, hash, buf)
 		if ctx.Err() != nil {
-			return 0, ctx.Err()
+			return putResult{}, ctx.Err()
 		} else if err == CollisionError {
 			// Stop if we have a block with same hash but
 			// different content. (It will be impossible
@@ -836,7 +873,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			// both, so there's no point writing it even
 			// on a different volume.)
 			log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-			return 0, err
+			return putResult{}, err
 		} else if os.IsNotExist(err) {
 			// Block does not exist. This is the only
 			// "normal" error: we don't log anything.
@@ -854,9 +891,9 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			continue
 		}
 		// Compare and Touch both worked --> done.
-		return mnt.Replication, nil
+		return newPutResult(mnt), nil
 	}
-	return 0, bestErr
+	return putResult{}, bestErr
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list