[ARVADOS] created: 2.1.0-1053-g3f38a38ae
Git user
git at public.arvados.org
Wed Jul 14 20:52:22 UTC 2021
at 3f38a38ae453390856b3d5cf2b4d5705df06ee13 (commit)
commit 3f38a38ae453390856b3d5cf2b4d5705df06ee13
Author: Tom Clegg <tom at curii.com>
Date: Wed Jul 14 16:52:12 2021 -0400
17394: Propagate storage classes when writing data via collectionfs.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go
index 4e0348c08..a57f2a683 100644
--- a/sdk/go/arvados/api.go
+++ b/sdk/go/arvados/api.go
@@ -8,6 +8,7 @@ import (
"bufio"
"context"
"encoding/json"
+ "io"
"net"
"github.com/sirupsen/logrus"
@@ -205,6 +206,22 @@ type LogoutOptions struct {
ReturnTo string `json:"return_to"` // Redirect to this URL after logging out
}
+type BlockWriteOptions struct {
+ Hash string
+ Data []byte
+ Reader io.Reader
+ DataSize int // Must be set if Data is nil.
+ RequestID string
+ StorageClasses []string
+ Replicas int
+ Attempts int
+}
+
+type BlockWriteResponse struct {
+ Locator string
+ Replicas int
+}
+
type API interface {
ConfigGet(ctx context.Context) (json.RawMessage, error)
Login(ctx context.Context, options LoginOptions) (LoginResponse, error)
diff --git a/sdk/go/arvados/fs_backend.go b/sdk/go/arvados/fs_backend.go
index c8308aea5..32365a531 100644
--- a/sdk/go/arvados/fs_backend.go
+++ b/sdk/go/arvados/fs_backend.go
@@ -4,7 +4,10 @@
package arvados
-import "io"
+import (
+ "context"
+ "io"
+)
type fsBackend interface {
keepClient
@@ -20,7 +23,7 @@ type keepBackend struct {
type keepClient interface {
ReadAt(locator string, p []byte, off int) (int, error)
- PutB(p []byte) (string, int, error)
+ BlockWrite(context.Context, BlockWriteOptions) (BlockWriteResponse, error)
LocalLocator(locator string) (string, error)
}
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index b743ab368..4d9db421f 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -42,7 +42,9 @@ type CollectionFileSystem interface {
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
}
// FileSystem returns a CollectionFileSystem for the collection.
@@ -52,12 +54,16 @@ func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFile
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
@@ -321,7 +327,7 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs *collectionFileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
@@ -610,7 +616,11 @@ func (fn *filenode) pruneMemSegments() {
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
@@ -631,7 +641,7 @@ func (fn *filenode) pruneMemSegments() {
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator: locator,
+ locator: resp.Locator,
size: len(buf),
offset: 0,
length: len(buf),
@@ -748,7 +758,11 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
@@ -780,7 +794,7 @@ func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator: locator,
+ locator: resp.Locator,
size: blocksize,
offset: offsets[idx],
length: len(data),
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index 05c8ea61a..74757bf7c 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -6,6 +6,7 @@ package arvados
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
@@ -50,17 +51,25 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
return copy(p, buf[off:]), nil
}
-func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
- buf := make([]byte, len(p))
- copy(buf, p)
+func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ if opts.Data == nil {
+ panic("oops, stub is not made for this")
+ }
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ buf := make([]byte, len(opts.Data))
+ copy(buf, opts.Data)
if kcs.onPut != nil {
kcs.onPut(buf)
}
+ for _, sc := range opts.StorageClasses {
+ if sc != "default" {
+ return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
+ }
+ }
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
- return locator, 1, nil
+ return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
@@ -112,6 +121,22 @@ func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
c.Check(ok, check.Equals, true)
}
+func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
+ fs, err := (&Collection{
+ StorageClassesDesired: []string{"unobtainium"},
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("food"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ _, err = fs.MarshalManifest(".")
+ c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
+}
+
func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
fs, err := (&Collection{
ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 2b560cff5..2cd6bb4d4 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -8,6 +8,7 @@ package keepclient
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
@@ -21,8 +22,8 @@ import (
"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
- "git.arvados.org/arvados.git/sdk/go/asyncbuf"
"git.arvados.org/arvados.git/sdk/go/httpserver"
)
@@ -153,23 +154,12 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
- // Buffer for reads from 'r'
- var bufsize int
- if dataBytes > 0 {
- if dataBytes > BLOCKSIZE {
- return "", 0, ErrOversizeBlock
- }
- bufsize = int(dataBytes)
- } else {
- bufsize = BLOCKSIZE
- }
-
- buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
- go func() {
- _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
- buf.CloseWithError(err)
- }()
- return kc.putReplicas(hash, buf.NewReader, dataBytes)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Reader: r,
+ DataSize: int(dataBytes),
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutHB writes a block to Keep. The hash of the bytes is given in
@@ -177,16 +167,21 @@ func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string,
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- newReader := func() io.Reader { return bytes.NewBuffer(buf) }
- return kc.putReplicas(hash, newReader, int64(len(buf)))
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Data: buf,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutB writes a block to Keep. It computes the hash itself.
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
- hash := fmt.Sprintf("%x", md5.Sum(buffer))
- return kc.PutHB(hash, buffer)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Data: buffer,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutR writes a block to Keep. It first reads all data from r into a buffer
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index f59d16fd3..724c66e13 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -173,7 +173,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
@@ -229,7 +229,7 @@ func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 7b2e47ff8..a8c82aac0 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -5,6 +5,8 @@
package keepclient
import (
+ "bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
@@ -16,7 +18,9 @@ import (
"strconv"
"strings"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/asyncbuf"
)
// DebugPrintf emits debug messages. The easiest way to enable
@@ -58,7 +62,7 @@ type uploadStatus struct {
}
func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
- uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
+ uploadStatusChan chan<- uploadStatus, expectedLength int, reqid string) {
var req *http.Request
var err error
@@ -69,7 +73,7 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
return
}
- req.ContentLength = expectedLength
+ req.ContentLength = int64(expectedLength)
if expectedLength > 0 {
req.Body = ioutil.NopCloser(body)
} else {
@@ -123,15 +127,57 @@ func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo [
}
}
-func (kc *KeepClient) putReplicas(
- hash string,
- getReader func() io.Reader,
- expectedLength int64) (locator string, replicas int, err error) {
-
- reqid := kc.getRequestID()
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var getReader func() io.Reader
+ if req.Data == nil && req.Reader == nil {
+ return resp, errors.New("invalid BlockWriteOptions: Data and Reader are both nil")
+ }
+ if req.DataSize < 0 {
+ return resp, fmt.Errorf("invalid BlockWriteOptions: negative DataSize %d", req.DataSize)
+ }
+ if req.DataSize > BLOCKSIZE || len(req.Data) > BLOCKSIZE {
+ return resp, ErrOversizeBlock
+ }
+ if req.Data != nil {
+ if req.DataSize > len(req.Data) {
+ return resp, errors.New("invalid BlockWriteOptions: DataSize > len(Data)")
+ }
+ if req.DataSize == 0 {
+ req.DataSize = len(req.Data)
+ }
+ getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
+ } else {
+ buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{req.Reader, md5.New(), req.Hash})
+ buf.CloseWithError(err)
+ }()
+ getReader = buf.NewReader
+ }
+ if req.Hash == "" {
+ m := md5.New()
+ _, err := io.Copy(m, getReader())
+ if err != nil {
+ return resp, err
+ }
+ req.Hash = fmt.Sprintf("%x", m.Sum(nil))
+ }
+ if req.StorageClasses == nil {
+ req.StorageClasses = kc.StorageClasses
+ }
+ if req.Replicas == 0 {
+ req.Replicas = kc.Want_replicas
+ }
+ if req.RequestID == "" {
+ req.RequestID = kc.getRequestID()
+ }
+ if req.Attempts == 0 {
+ req.Attempts = 1 + kc.Retries
+ }
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(kc.WritableLocalRoots(), req.Hash).GetSortedRoots()
// The next server to try contacting
nextServer := 0
@@ -153,20 +199,18 @@ func (kc *KeepClient) putReplicas(
}()
}()
- replicasWanted := kc.Want_replicas
replicasTodo := map[string]int{}
- for _, c := range kc.StorageClasses {
- replicasTodo[c] = replicasWanted
+ for _, c := range req.StorageClasses {
+ replicasTodo[c] = req.Replicas
}
- replicasDone := 0
replicasPerThread := kc.replicasPerService
if replicasPerThread < 1 {
// unlimited or unknown
- replicasPerThread = replicasWanted
+ replicasPerThread = req.Replicas
}
- retriesRemaining := 1 + kc.Retries
+ retriesRemaining := req.Attempts
var retryServers []string
lastError := make(map[string]string)
@@ -190,7 +234,7 @@ func (kc *KeepClient) putReplicas(
}
}
if !trackingClasses {
- maxConcurrency = replicasWanted - replicasDone
+ maxConcurrency = req.Replicas - resp.Replicas
}
if maxConcurrency < 1 {
// If there are no non-zero entries in
@@ -200,8 +244,8 @@ func (kc *KeepClient) putReplicas(
for active*replicasPerThread < maxConcurrency {
// Start some upload requests
if nextServer < len(sv) {
- DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
- go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
+ DebugPrintf("DEBUG: [%s] Begin upload %s to %s", req.RequestID, req.Hash, sv[nextServer])
+ go kc.uploadToKeepServer(sv[nextServer], req.Hash, classesTodo, getReader(), uploadStatusChan, req.DataSize, req.RequestID)
nextServer++
active++
} else {
@@ -211,13 +255,13 @@ func (kc *KeepClient) putReplicas(
msg += resp + "; "
}
msg = msg[:len(msg)-2]
- return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
+ return resp, InsufficientReplicasError(errors.New(msg))
}
break
}
}
- DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+ DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", req.RequestID, replicasTodo, active)
if active < 1 {
break
}
@@ -228,7 +272,7 @@ func (kc *KeepClient) putReplicas(
if status.statusCode == http.StatusOK {
delete(lastError, status.url)
- replicasDone += status.replicasStored
+ resp.Replicas += status.replicasStored
if len(status.classesStored) == 0 {
// Server doesn't report
// storage classes. Give up
@@ -244,7 +288,7 @@ func (kc *KeepClient) putReplicas(
delete(replicasTodo, className)
}
}
- locator = status.response
+ resp.Locator = status.response
} else {
msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
if len(msg) > 100 {
@@ -264,7 +308,7 @@ func (kc *KeepClient) putReplicas(
sv = retryServers
}
- return locator, replicasDone, nil
+ return resp, nil
}
func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list