[ARVADOS] created: 1.3.0-2777-gb291fc626
Git user
git at public.arvados.org
Tue Jul 21 15:18:03 UTC 2020
at b291fc6265aa6a0f60d51cc49bf32b9aea847d1b (commit)
commit b291fc6265aa6a0f60d51cc49bf32b9aea847d1b
Author: Tom Clegg <tom at tomclegg.ca>
Date: Tue Jul 21 11:17:49 2020 -0400
16535: Support delimiter/rollup option in list API.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 55d85c69a..caacd3108 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -134,9 +134,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
}
}
-func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+func walkFS(fs arvados.CustomFileSystem, path string, ignoreNotFound bool, fn func(path string, fi os.FileInfo) error) error {
f, err := fs.Open(path)
- if err != nil {
+ if os.IsNotExist(err) && ignoreNotFound {
+ return nil
+ } else if err != nil {
return fmt.Errorf("open %q: %w", path, err)
}
defer f.Close()
@@ -156,7 +158,7 @@ func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os
return err
}
if fi.IsDir() {
- err = walkFS(fs, path+"/"+fi.Name(), fn)
+ err = walkFS(fs, path+"/"+fi.Name(), false, fn)
if err != nil {
return err
}
@@ -194,45 +196,80 @@ func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.Cust
// prefix "foo" => walkpath ""
// prefix "" => walkpath ""
walkpath := params.prefix
- if !strings.HasSuffix(walkpath, "/") {
- walkpath, _ = filepath.Split(walkpath)
+ if cut := strings.LastIndex(walkpath, "/"); cut >= 0 {
+ walkpath = walkpath[:cut]
+ } else {
+ walkpath = ""
}
- walkpath = strings.TrimSuffix(walkpath, "/")
- type commonPrefix struct {
- Prefix string
- }
- type serverListResponse struct {
- s3.ListResp
- CommonPrefixes []commonPrefix
- }
- resp := serverListResponse{ListResp: s3.ListResp{
+ resp := s3.ListResp{
Name: strings.SplitN(r.URL.Path[1:], "/", 2)[0],
Prefix: params.prefix,
Delimiter: params.delimiter,
Marker: params.marker,
MaxKeys: params.maxKeys,
- }}
- err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+ }
+ commonPrefixes := map[string]bool{}
+ err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
path = path[len(bucketdir)+1:]
- if !strings.HasPrefix(path, params.prefix) {
- return filepath.SkipDir
+ if len(path) <= len(params.prefix) {
+ if path > params.prefix[:len(path)] {
+ // with prefix "foobar", walking "fooz" means we're done
+ return errDone
+ }
+ if path < params.prefix[:len(path)] {
+ // with prefix "foobar", walking "foobag" is pointless
+ return filepath.SkipDir
+ }
+ if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path+"/") {
+ // with prefix "foo/bar", walking "fo"
+ // is pointless (but walking "foo" or
+ // "foo/bar" is necessary)
+ return filepath.SkipDir
+ }
+ if len(path) < len(params.prefix) {
+ // can't skip anything, and this entry
+ // isn't in the results, so just
+ // continue descent
+ return nil
+ }
+ } else {
+ if path[:len(params.prefix)] > params.prefix {
+ // with prefix "foobar", nothing we
+ // see after "foozzz" is relevant
+ return errDone
+ }
}
- if fi.IsDir() {
+ if path < params.marker || path < params.prefix {
return nil
}
- if path < params.marker {
+ if fi.IsDir() {
return nil
}
- // TODO: check delimiter, roll up common prefixes
- if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+ if params.delimiter != "" {
+ idx := strings.Index(path[len(params.prefix):], params.delimiter)
+ if idx >= 0 {
+ // with prefix "foobar" and delimiter
+ // "z", when we hit "foobar/baz", we
+ // add "/baz" to commonPrefixes and
+ // stop descending (note that even if
+ // delimiter is "/" we don't add
+ // anything to commonPrefixes when
+ // seeing a dir: we wait until we see
+ // a file, so we don't incorrectly
+ // return results for empty dirs)
+ commonPrefixes[path[:len(params.prefix)+idx+1]] = true
+ return filepath.SkipDir
+ }
+ }
+ if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
resp.IsTruncated = true
- if params.delimiter == "" {
+ if params.delimiter != "" {
resp.NextMarker = path
}
return errDone
}
- resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+ resp.Contents = append(resp.Contents, s3.Key{
Key: path,
})
return nil
@@ -241,6 +278,12 @@ func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.Cust
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+ if params.delimiter != "" {
+ for prefix := range commonPrefixes {
+ resp.CommonPrefixes = append(resp.CommonPrefixes, prefix)
+ sort.Strings(resp.CommonPrefixes)
+ }
+ }
if err := xml.NewEncoder(w).Encode(resp); err != nil {
ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
}
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 5fab3607f..d88e3b5ed 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -10,6 +10,7 @@ import (
"fmt"
"io/ioutil"
"os"
+ "strings"
"sync"
"time"
@@ -243,15 +244,11 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
wg.Wait()
}
-func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
- stage := s.s3setup(c)
- defer stage.teardown(c)
-
- filesPerDir := 1001
-
+func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
c.Assert(err, check.IsNil)
- for _, dir := range []string{"dir1", "dir2"} {
+ for d := 0; d < dirs; d++ {
+ dir := fmt.Sprintf("dir%d", d)
c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
for i := 0; i < filesPerDir; i++ {
f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
@@ -260,9 +257,17 @@ func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
}
}
c.Assert(fs.Sync(), check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ filesPerDir := 1001
+ stage.writeBigDirs(c, 2, filesPerDir)
s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
- s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+ s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir)
}
func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
gotKeys := map[string]s3.Key{}
@@ -291,3 +296,99 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
}
c.Check(len(gotKeys), check.Equals, expectFiles)
}
+
+func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ dirs := 2
+ filesPerDir := 500
+ stage.writeBigDirs(c, dirs, filesPerDir)
+ err := stage.collbucket.PutReader("dingbats", &bytes.Buffer{}, 0, "application/octet-stream", s3.Private, s3.Options{})
+ c.Assert(err, check.IsNil)
+ resp, err := stage.collbucket.List("", "", "", 20000)
+ c.Check(err, check.IsNil)
+ var allfiles []string
+ for _, key := range resp.Contents {
+ allfiles = append(allfiles, key.Key)
+ }
+ c.Check(allfiles, check.HasLen, dirs*filesPerDir+3)
+
+ for _, trial := range []struct {
+ prefix string
+ delimiter string
+ marker string
+ }{
+ {"di", "/", ""},
+ {"di", "r", ""},
+ {"di", "n", ""},
+ {"dir0", "/", ""},
+ {"dir0", "/", "dir0/file14.txt"}, // no commonprefixes
+ {"", "", "dir0/file14.txt"}, // middle page, skip walking dir1
+ {"", "", "dir1/file14.txt"}, // middle page, skip walking dir0
+ {"", "", "dir1/file498.txt"}, // last page of results
+ {"dir1/file", "", "dir1/file498.txt"}, // last page of results, with prefix
+ {"dir1/file", "/", "dir1/file498.txt"}, // last page of results, with prefix + delimiter
+ {"dir1", "Z", "dir1/file498.txt"}, // delimiter "Z" never appears
+ {"dir2", "/", ""}, // prefix "dir2" does not exist
+ {"", "/", ""},
+ } {
+ c.Logf("\n\n=== trial %+v", trial)
+
+ maxKeys := 20
+ resp, err := stage.collbucket.List(trial.prefix, trial.delimiter, trial.marker, maxKeys)
+ c.Check(err, check.IsNil)
+ if resp.IsTruncated && trial.delimiter == "" {
+ // goamz List method fills in the missing
+ // NextMarker field if resp.IsTruncated, so
+ // now we can't really tell whether it was
+ // sent by the server or by goamz. In cases
+ // where it should be empty but isn't, assume
+ // it's goamz's fault.
+ resp.NextMarker = ""
+ }
+
+ var expectKeys []string
+ var expectPrefixes []string
+ var expectNextMarker string
+ var expectTruncated bool
+ for _, key := range allfiles {
+ full := len(expectKeys)+len(expectPrefixes) >= maxKeys
+ if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+ continue
+ } else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
+ prefix := key[:len(trial.prefix)+idx+1]
+ if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
+ // same prefix as previous key
+ } else if full {
+ expectNextMarker = key
+ expectTruncated = true
+ } else {
+ expectPrefixes = append(expectPrefixes, prefix)
+ }
+ } else if full {
+ if trial.delimiter != "" {
+ expectNextMarker = key
+ }
+ expectTruncated = true
+ break
+ } else {
+ expectKeys = append(expectKeys, key)
+ }
+ }
+
+ var gotKeys []string
+ for _, key := range resp.Contents {
+ gotKeys = append(gotKeys, key.Key)
+ }
+ var gotPrefixes []string
+ for _, prefix := range resp.CommonPrefixes {
+ gotPrefixes = append(gotPrefixes, prefix)
+ }
+ c.Check(gotKeys, check.DeepEquals, expectKeys)
+ c.Check(gotPrefixes, check.DeepEquals, expectPrefixes)
+ c.Check(resp.NextMarker, check.Equals, expectNextMarker)
+ c.Check(resp.IsTruncated, check.Equals, expectTruncated)
+ c.Logf("=== trial %+v keys %q prefixes %q nextMarker %q", trial, gotKeys, gotPrefixes, resp.NextMarker)
+ }
+}
commit d94095c1457273a49907d00d06a7d802ba979509
Author: Tom Clegg <tom at tomclegg.ca>
Date: Fri Jul 17 16:06:18 2020 -0400
16535: Add ListObjects API.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 5043c65ec..55d85c69a 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -5,12 +5,20 @@
package main
import (
+ "encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"os"
+ "path/filepath"
+ "sort"
+ "strconv"
"strings"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/tmp/GOPATH/src/github.com/AdRoll/goamz/s3"
)
// serveS3 handles r and returns true if r is a request from an S3
@@ -39,38 +47,46 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
}
defer release()
- r.URL.Path = "/by_id" + r.URL.Path
-
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
- fi, err := fs.Stat(r.URL.Path)
- switch r.Method {
- case "GET":
+ switch {
+ case r.Method == "GET" && strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") == 1:
+ // Path is "/{uuid}" or "/{uuid}/", has no object name
+ h.s3list(w, r, fs)
+ return true
+ case r.Method == "GET":
+ fspath := "/by_id" + r.URL.Path
+ fi, err := fs.Stat(fspath)
if os.IsNotExist(err) ||
(err != nil && err.Error() == "not a directory") ||
(fi != nil && fi.IsDir()) {
http.Error(w, "not found", http.StatusNotFound)
return true
}
- http.FileServer(fs).ServeHTTP(w, r)
+ // shallow copy r, and change URL path
+ r := *r
+ r.URL.Path = fspath
+ http.FileServer(fs).ServeHTTP(w, &r)
return true
- case "PUT":
+ case r.Method == "PUT":
if strings.HasSuffix(r.URL.Path, "/") {
http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
return true
}
+ fspath := "by_id" + r.URL.Path
+ _, err = fs.Stat(fspath)
if err != nil && err.Error() == "not a directory" {
// requested foo/bar, but foo is a file
http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
return true
}
- f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if os.IsNotExist(err) {
// create missing intermediate directories, then try again
- for i, c := range r.URL.Path {
+ for i, c := range fspath {
if i > 0 && c == '/' {
- dir := r.URL.Path[:i]
+ dir := fspath[:i]
if strings.HasSuffix(dir, "/") {
err = errors.New("invalid object name (consecutive '/' chars)")
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -84,7 +100,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
}
}
}
- f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
}
if err != nil {
err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
@@ -117,3 +133,115 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
return true
}
}
+
+func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+ f, err := fs.Open(path)
+ if err != nil {
+ return fmt.Errorf("open %q: %w", path, err)
+ }
+ defer f.Close()
+ if path == "/" {
+ path = ""
+ }
+ fis, err := f.Readdir(-1)
+ if err != nil {
+ return err
+ }
+ sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+ for _, fi := range fis {
+ err = fn(path+"/"+fi.Name(), fi)
+ if err == filepath.SkipDir {
+ continue
+ } else if err != nil {
+ return err
+ }
+ if fi.IsDir() {
+ err = walkFS(fs, path+"/"+fi.Name(), fn)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+ var params struct {
+ bucket string
+ delimiter string
+ marker string
+ maxKeys int
+ prefix string
+ }
+ params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+ params.delimiter = r.FormValue("delimiter")
+ params.marker = r.FormValue("marker")
+ if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 {
+ params.maxKeys = int(mk)
+ } else {
+ params.maxKeys = 100
+ }
+ params.prefix = r.FormValue("prefix")
+
+ bucketdir := "by_id/" + params.bucket
+ // walkpath is the directory (relative to bucketdir) we need
+ // to walk: the innermost directory that is guaranteed to
+ // contain all paths that have the requested prefix. Examples:
+ // prefix "foo/bar" => walkpath "foo"
+ // prefix "foo/bar/" => walkpath "foo/bar"
+ // prefix "foo" => walkpath ""
+ // prefix "" => walkpath ""
+ walkpath := params.prefix
+ if !strings.HasSuffix(walkpath, "/") {
+ walkpath, _ = filepath.Split(walkpath)
+ }
+ walkpath = strings.TrimSuffix(walkpath, "/")
+
+ type commonPrefix struct {
+ Prefix string
+ }
+ type serverListResponse struct {
+ s3.ListResp
+ CommonPrefixes []commonPrefix
+ }
+ resp := serverListResponse{ListResp: s3.ListResp{
+ Name: strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+ Prefix: params.prefix,
+ Delimiter: params.delimiter,
+ Marker: params.marker,
+ MaxKeys: params.maxKeys,
+ }}
+ err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+ path = path[len(bucketdir)+1:]
+ if !strings.HasPrefix(path, params.prefix) {
+ return filepath.SkipDir
+ }
+ if fi.IsDir() {
+ return nil
+ }
+ if path < params.marker {
+ return nil
+ }
+ // TODO: check delimiter, roll up common prefixes
+ if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+ resp.IsTruncated = true
+ if params.delimiter == "" {
+ resp.NextMarker = path
+ }
+ return errDone
+ }
+ resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+ Key: path,
+ })
+ return nil
+ })
+ if err != nil && err != errDone {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ if err := xml.NewEncoder(w).Encode(resp); err != nil {
+ ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+ }
+}
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index c4c216c8b..5fab3607f 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -7,6 +7,7 @@ package main
import (
"bytes"
"crypto/rand"
+ "fmt"
"io/ioutil"
"os"
"sync"
@@ -23,6 +24,8 @@ import (
type s3stage struct {
arv *arvados.Client
+ ac *arvadosclient.ArvadosClient
+ kc *keepclient.KeepClient
proj arvados.Group
projbucket *s3.Bucket
coll arvados.Collection
@@ -62,6 +65,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
c.Assert(err, check.IsNil)
err = fs.Sync()
c.Assert(err, check.IsNil)
+ err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
region := aws.Region{
@@ -71,6 +76,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
client := s3.New(*auth, region)
return s3stage{
arv: arv,
+ ac: ac,
+ kc: kc,
proj: proj,
projbucket: &s3.Bucket{
S3: client,
@@ -227,9 +234,60 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
return
}
- _, err = bucket.GetReader(objname)
- c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ if objname != "" && objname != "/" {
+ _, err = bucket.GetReader(objname)
+ c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ }
}()
}
wg.Wait()
}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+
+ filesPerDir := 1001
+
+ fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+ c.Assert(err, check.IsNil)
+ for _, dir := range []string{"dir1", "dir2"} {
+ c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+ for i := 0; i < filesPerDir; i++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+ }
+ }
+ c.Assert(fs.Sync(), check.IsNil)
+ s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
+ s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
+ s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+ gotKeys := map[string]s3.Key{}
+ nextMarker := ""
+ pages := 0
+ for {
+ resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+ if !c.Check(err, check.IsNil) {
+ break
+ }
+ c.Check(len(resp.Contents) <= pageSize, check.Equals, true)
+ if pages++; !c.Check(pages <= (expectFiles/pageSize)+1, check.Equals, true) {
+ break
+ }
+ for _, key := range resp.Contents {
+ gotKeys[key.Key] = key
+ }
+ if !resp.IsTruncated {
+ c.Check(resp.NextMarker, check.Equals, "")
+ break
+ }
+ if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+ break
+ }
+ nextMarker = resp.NextMarker
+ }
+ c.Check(len(gotKeys), check.Equals, expectFiles)
+}
commit bdddd6c310ce73958b251e977faab7bbadd75452
Author: Tom Clegg <tom at tomclegg.ca>
Date: Wed Jul 15 12:15:38 2020 -0400
16535: Support Sync() on customfilesystem.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index f01369a88..59a6a6ba8 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -7,7 +7,6 @@ package arvados
import (
"bytes"
"crypto/md5"
- "crypto/sha1"
"errors"
"fmt"
"io"
@@ -33,6 +32,9 @@ type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ authToken string // client's auth token (used for signing locators)
+ sigkey string // blob signing key
+ sigttl time.Duration // blob signing ttl
sync.RWMutex
}
@@ -49,7 +51,7 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
}
func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := fmt.Sprintf("%x+%d+A12345 at abcde", md5.Sum(p), len(p))
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
buf := make([]byte, len(p))
copy(buf, p)
if kcs.onPut != nil {
@@ -61,9 +63,12 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
return locator, 1, nil
}
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+ if strings.Contains(locator, "+A") {
+ return locator, nil
+ }
kcs.Lock()
defer kcs.Unlock()
if strings.Contains(locator, "+R") {
@@ -74,8 +79,9 @@ func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
}
}
- fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
- return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+ locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+ locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ return locator, nil
}
type CollectionFSSuite struct {
@@ -92,7 +98,11 @@ func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.kc = &keepClientStub{
blocks: map[string][]byte{
"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
- }}
+ },
+ sigkey: fixtureBlobSigningKey,
+ sigttl: fixtureBlobSigningTTL,
+ authToken: fixtureActiveToken,
+ }
s.fs, err = s.coll.FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
diff --git a/sdk/go/arvados/fs_lookup.go b/sdk/go/arvados/fs_lookup.go
index ad44ef22d..cb4ccfcf9 100644
--- a/sdk/go/arvados/fs_lookup.go
+++ b/sdk/go/arvados/fs_lookup.go
@@ -26,6 +26,20 @@ type lookupnode struct {
staleOne map[string]time.Time
}
+// Sync flushes pending writes for loaded children and, if successful,
+// triggers a reload on next lookup.
+func (ln *lookupnode) Sync() error {
+ err := ln.treenode.Sync()
+ if err != nil {
+ return err
+ }
+ ln.staleLock.Lock()
+ ln.staleAll = time.Time{}
+ ln.staleOne = nil
+ ln.staleLock.Unlock()
+ return nil
+}
+
func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
ln.staleLock.Lock()
defer ln.staleLock.Unlock()
diff --git a/sdk/go/arvados/fs_project_test.go b/sdk/go/arvados/fs_project_test.go
index 5628dcc9c..dd35323b7 100644
--- a/sdk/go/arvados/fs_project_test.go
+++ b/sdk/go/arvados/fs_project_test.go
@@ -199,6 +199,22 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
err = wf.Close()
c.Check(err, check.IsNil)
+ err = project.Sync()
+ c.Check(err, check.IsNil)
+ _, err = s.fs.Open("/home/A Project/oob/test.txt")
+ c.Check(err, check.IsNil)
+
+ // Sync again to mark the project dir as stale, so the
+ // collection gets reloaded from the controller on next
+ // lookup.
+ err = project.Sync()
+ c.Check(err, check.IsNil)
+
+ // Ensure collection was flushed by Sync
+ var latest Collection
+ err = s.client.RequestAndDecode(&latest, "GET", "arvados/v1/collections/"+oob.UUID, nil, nil)
+ c.Check(latest.ManifestText, check.Matches, `.*:test.txt.*\n`)
+
// Delete test.txt behind s.fs's back by updating the
// collection record with an empty ManifestText.
err = s.client.RequestAndDecode(nil, "PATCH", "arvados/v1/collections/"+oob.UUID, nil, map[string]interface{}{
@@ -209,8 +225,6 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
})
c.Assert(err, check.IsNil)
- err = project.Sync()
- c.Check(err, check.IsNil)
_, err = s.fs.Open("/home/A Project/oob/test.txt")
c.Check(err, check.NotNil)
_, err = s.fs.Open("/home/A Project/oob")
@@ -220,7 +234,7 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
c.Assert(err, check.IsNil)
err = project.Sync()
- c.Check(err, check.IsNil)
+ c.Check(err, check.NotNil) // can't update the deleted collection
_, err = s.fs.Open("/home/A Project/oob")
- c.Check(err, check.NotNil)
+ c.Check(err, check.IsNil) // parent dir still has old collection -- didn't reload, because Sync failed
}
diff --git a/sdk/go/arvados/fs_site_test.go b/sdk/go/arvados/fs_site_test.go
index fff0b7e01..96ed74de8 100644
--- a/sdk/go/arvados/fs_site_test.go
+++ b/sdk/go/arvados/fs_site_test.go
@@ -7,6 +7,7 @@ package arvados
import (
"net/http"
"os"
+ "time"
check "gopkg.in/check.v1"
)
@@ -22,6 +23,8 @@ const (
fixtureFooCollectionPDH = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
fixtureFooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
fixtureNonexistentCollection = "zzzzz-4zz18-totallynotexist"
+ fixtureBlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ fixtureBlobSigningTTL = 336 * time.Hour
)
var _ = check.Suite(&SiteFSSuite{})
@@ -41,7 +44,11 @@ func (s *SiteFSSuite) SetUpTest(c *check.C) {
s.kc = &keepClientStub{
blocks: map[string][]byte{
"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
- }}
+ },
+ sigkey: fixtureBlobSigningKey,
+ sigttl: fixtureBlobSigningTTL,
+ authToken: fixtureActiveToken,
+ }
s.fs = s.client.SiteFileSystem(s.kc)
}
commit ae6e78f726bfcfd2534e6732839a6a254cb330db
Author: Tom Clegg <tom at tomclegg.ca>
Date: Tue Jul 14 10:22:09 2020 -0400
16535: Fix error response codes for invalid names (4xx, not 5xx).
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index c6f501e49..5043c65ec 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -5,6 +5,7 @@
package main
import (
+ "errors"
"fmt"
"io"
"net/http"
@@ -43,27 +44,38 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+ fi, err := fs.Stat(r.URL.Path)
switch r.Method {
case "GET":
- fi, err := fs.Stat(r.URL.Path)
- if os.IsNotExist(err) {
- http.Error(w, err.Error(), http.StatusNotFound)
- return true
- } else if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return true
- } else if fi.IsDir() {
+ if os.IsNotExist(err) ||
+ (err != nil && err.Error() == "not a directory") ||
+ (fi != nil && fi.IsDir()) {
http.Error(w, "not found", http.StatusNotFound)
+ return true
}
http.FileServer(fs).ServeHTTP(w, r)
return true
case "PUT":
+ if strings.HasSuffix(r.URL.Path, "/") {
+ http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
+ return true
+ }
+ if err != nil && err.Error() == "not a directory" {
+ // requested foo/bar, but foo is a file
+ http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+ return true
+ }
f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
if os.IsNotExist(err) {
// create missing intermediate directories, then try again
for i, c := range r.URL.Path {
if i > 0 && c == '/' {
dir := r.URL.Path[:i]
+ if strings.HasSuffix(dir, "/") {
+ err = errors.New("invalid object name (consecutive '/' chars)")
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
err := fs.Mkdir(dir, 0755)
if err != nil && err != os.ErrExist {
err = fmt.Errorf("mkdir %q failed: %w", dir, err)
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index fbfef7b91..c4c216c8b 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -9,6 +9,7 @@ import (
"crypto/rand"
"io/ioutil"
"os"
+ "sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -110,7 +111,7 @@ func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix
c.Check(err, check.IsNil)
rdr, err = bucket.GetReader(prefix + "missingfile")
- c.Check(err, check.NotNil)
+ c.Check(err, check.ErrorMatches, `404 Not Found`)
rdr, err = bucket.GetReader(prefix + "sailboat.txt")
c.Assert(err, check.IsNil)
@@ -152,7 +153,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
objname := prefix + trial.path
_, err := bucket.GetReader(objname)
- c.Assert(err, check.NotNil)
+ c.Assert(err, check.ErrorMatches, `404 Not Found`)
buf := make([]byte, trial.size)
rand.Read(buf)
@@ -167,7 +168,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
buf2, err := ioutil.ReadAll(rdr)
c.Check(err, check.IsNil)
c.Check(buf2, check.HasLen, len(buf))
- c.Check(buf2, check.DeepEquals, buf)
+ c.Check(bytes.Equal(buf, buf2), check.Equals, true)
}
}
@@ -182,6 +183,7 @@ func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
}
func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
+ var wg sync.WaitGroup
for _, trial := range []struct {
path string
}{
@@ -209,19 +211,25 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
path: "",
},
} {
- c.Logf("=== %v", trial)
+ trial := trial
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ c.Logf("=== %v", trial)
- objname := prefix + trial.path
+ objname := prefix + trial.path
- buf := make([]byte, 1234)
- rand.Read(buf)
+ buf := make([]byte, 1234)
+ rand.Read(buf)
- err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
- if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", objname)) {
- continue
- }
+ err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ if !c.Check(err, check.ErrorMatches, `400 Bad.*`, check.Commentf("PUT %q should fail", objname)) {
+ return
+ }
- _, err = bucket.GetReader(objname)
- c.Check(err, check.NotNil)
+ _, err = bucket.GetReader(objname)
+ c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+ }()
}
+ wg.Wait()
}
commit 9b07afa9e26a364bb7f72d5741db0c276a90e495
Author: Tom Clegg <tom at tomclegg.ca>
Date: Tue Jul 14 09:23:21 2020 -0400
16535: Refactor Sync implementation.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index d06aba369..5e57fed3b 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -31,6 +31,10 @@ var (
ErrPermission = os.ErrPermission
)
+type syncer interface {
+ Sync() error
+}
+
// A File is an *os.File-like interface for reading and writing files
// in a FileSystem.
type File interface {
@@ -299,6 +303,22 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
return
}
+func (n *treenode) Sync() error {
+ n.RLock()
+ defer n.RUnlock()
+ for _, inode := range n.inodes {
+ syncer, ok := inode.(syncer)
+ if !ok {
+ return ErrInvalidOperation
+ }
+ err := syncer.Sync()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
type fileSystem struct {
root inode
fsBackend
@@ -576,8 +596,11 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
}
func (fs *fileSystem) Sync() error {
- log.Printf("TODO: sync fileSystem")
- return ErrInvalidOperation
+ if syncer, ok := fs.root.(syncer); ok {
+ return syncer.Sync()
+ } else {
+ return ErrInvalidOperation
+ }
}
func (fs *fileSystem) Flush(string, bool) error {
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 1f888a4e8..254b90c81 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -87,15 +87,16 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
return dn.realinode().Child(name, replace)
}
-// Sync is currently unimplemented, except when it's a no-op because
-// the real inode hasn't been created.
+// Sync is a no-op if the real inode hasn't even been created yet.
func (dn *deferrednode) Sync() error {
dn.mtx.Lock()
defer dn.mtx.Unlock()
- if dn.created {
- return ErrInvalidArgument
- } else {
+ if !dn.created {
return nil
+ } else if syncer, ok := dn.wrapped.(syncer); ok {
+ return syncer.Sync()
+ } else {
+ return ErrInvalidOperation
}
}
diff --git a/sdk/go/arvados/fs_lookup.go b/sdk/go/arvados/fs_lookup.go
index 42322a14a..ad44ef22d 100644
--- a/sdk/go/arvados/fs_lookup.go
+++ b/sdk/go/arvados/fs_lookup.go
@@ -15,7 +15,7 @@ import (
//
// See (*customFileSystem)MountUsers for example usage.
type lookupnode struct {
- inode
+ treenode
loadOne func(parent inode, name string) (inode, error)
loadAll func(parent inode) ([]inode, error)
stale func(time.Time) bool
@@ -36,7 +36,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
return nil, err
}
for _, child := range all {
- _, err = ln.inode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+ _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
return child, nil
})
if err != nil {
@@ -49,7 +49,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
// newer than ln.staleAll. Reclaim memory.
ln.staleOne = nil
}
- return ln.inode.Readdir()
+ return ln.treenode.Readdir()
}
func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
@@ -57,7 +57,7 @@ func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (in
defer ln.staleLock.Unlock()
checkTime := time.Now()
if ln.stale(ln.staleAll) && ln.stale(ln.staleOne[name]) {
- _, err := ln.inode.Child(name, func(inode) (inode, error) {
+ _, err := ln.treenode.Child(name, func(inode) (inode, error) {
return ln.loadOne(ln, name)
})
if err != nil {
@@ -69,5 +69,5 @@ func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (in
ln.staleOne[name] = checkTime
}
}
- return ln.inode.Child(name, replace)
+ return ln.treenode.Child(name, replace)
}
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 8b61ccfd6..95b2f71c2 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -5,7 +5,6 @@
package arvados
import (
- "fmt"
"os"
"strings"
"sync"
@@ -41,7 +40,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
thr: newThrottle(concurrentWriters),
},
}
- root.inode = &treenode{
+ root.treenode = treenode{
fs: fs,
parent: root,
fileinfo: fileinfo{
@@ -55,9 +54,9 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
}
func (fs *customFileSystem) MountByID(mount string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return &vdirnode{
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: fs.root,
inodes: make(map[string]inode),
@@ -73,18 +72,18 @@ func (fs *customFileSystem) MountByID(mount string) {
}
func (fs *customFileSystem) MountProject(mount, uuid string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return fs.newProjectNode(fs.root, mount, uuid), nil
})
}
func (fs *customFileSystem) MountUsers(mount string) {
- fs.root.inode.Child(mount, func(inode) (inode, error) {
+ fs.root.treenode.Child(mount, func(inode) (inode, error) {
return &lookupnode{
stale: fs.Stale,
loadOne: fs.usersLoadOne,
loadAll: fs.usersLoadAll,
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: fs.root,
inodes: make(map[string]inode),
@@ -116,43 +115,7 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
}
func (fs *customFileSystem) Sync() error {
- fs.staleLock.Lock()
- fs.staleThreshold = time.Now()
- fs.staleLock.Unlock()
- return fs.syncTree("/", fs.root.inode)
-}
-
-// syncTree calls node.Sync() if it has its own Sync method, otherwise
-// it calls syncTree() on all of node's children.
-//
-// Returns ErrInvalidArgument if node does not implement Sync() and
-// isn't a directory (or if Readdir() fails for any other reason).
-func (fs *customFileSystem) syncTree(path string, node inode) error {
- if vn, ok := node.(*vdirnode); ok {
- node = vn.inode
- }
- if syncer, ok := node.(interface{ Sync() error }); ok {
- err := syncer.Sync()
- if err != nil {
- return fmt.Errorf("%s: %T: %w", path, syncer, err)
- }
- return nil
- }
- fis, err := node.Readdir()
- if err != nil {
- return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
- }
- for _, fi := range fis {
- child, err := node.Child(fi.Name(), nil)
- if err != nil {
- continue
- }
- err = fs.syncTree(path+"/"+fi.Name(), child)
- if err != nil {
- return err
- }
- }
- return nil
+ return fs.root.Sync()
}
// Stale returns true if information obtained at time t should be
@@ -197,7 +160,7 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
stale: fs.Stale,
loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
- inode: &treenode{
+ treenode: treenode{
fs: fs,
parent: root,
inodes: make(map[string]inode),
@@ -217,17 +180,17 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
// create() can return either a new node, which will be added to the
// treenode, or nil for ENOENT.
type vdirnode struct {
- inode
+ treenode
create func(parent inode, name string) inode
}
func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
- return vn.inode.Child(name, func(existing inode) (inode, error) {
+ return vn.treenode.Child(name, func(existing inode) (inode, error) {
if existing == nil && vn.create != nil {
existing = vn.create(vn, name)
if existing != nil {
existing.SetParent(vn, name)
- vn.inode.(*treenode).fileinfo.modTime = time.Now()
+ vn.treenode.fileinfo.modTime = time.Now()
}
}
if replace == nil {
commit 2e2bb873da25e0f7a25a9a9532911bfb5ad69668
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Jul 13 11:57:37 2020 -0400
16535: collectionFileSystem implements inode, eliminating wrapper.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 5970ec612..0edc48162 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -121,6 +121,62 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
}
}
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+ return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+ return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+ return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+ return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+ fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+ fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+ fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+ fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+ return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+ return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+ fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
func (fs *collectionFileSystem) Sync() error {
if fs.uuid == "" {
return nil
@@ -193,25 +249,6 @@ func (fs *collectionFileSystem) Size() int64 {
return fs.fileSystem.root.(*dirnode).TreeSize()
}
-// asChildNode() repackages fs as an inode that can be used as a child
-// node in a different fs. Not goroutine-safe.
-//
-// After calling asChildNode(), the caller should not use fs directly.
-func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
- root := fs.rootnode().(*dirnode)
- root.SetParent(parent, name)
- return &collectionfsnode{dirnode: root, fs: fs}
-}
-
-type collectionfsnode struct {
- *dirnode
- fs *collectionFileSystem
-}
-
-func (cn *collectionfsnode) Sync() error {
- return cn.fs.Sync()
-}
-
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 4eb48b2f7..1f888a4e8 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -32,12 +32,14 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- cfs, err := coll.FileSystem(fs, fs)
+ newfs, err := coll.FileSystem(fs, fs)
if err != nil {
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
+ cfs := newfs.(*collectionFileSystem)
+ cfs.SetParent(parent, coll.Name)
+ return cfs
}}
}
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 2e131c33f..8b61ccfd6 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -183,11 +183,13 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
if err != nil {
return nil
}
- cfs, err := coll.FileSystem(fs, fs)
+ newfs, err := coll.FileSystem(fs, fs)
if err != nil {
return nil
}
- return cfs.(*collectionFileSystem).asChildNode(parent, id)
+ cfs := newfs.(*collectionFileSystem)
+ cfs.SetParent(parent, id)
+ return cfs
}
func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
commit 7e31b72e90734b7ababd254e0d38999d86dad758
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Jul 13 10:46:09 2020 -0400
16535: Refactor test suite to accommodate "project bucket" tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 600b83496..fbfef7b91 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -20,11 +20,30 @@ import (
check "gopkg.in/check.v1"
)
-func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+type s3stage struct {
+ arv *arvados.Client
+ proj arvados.Group
+ projbucket *s3.Bucket
+ coll arvados.Collection
+ collbucket *s3.Bucket
+}
+
+func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
+ var proj arvados.Group
var coll arvados.Collection
arv := arvados.NewClientFromEnv()
arv.AuthToken = arvadostest.ActiveToken
- err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+ err := arv.RequestAndDecode(&proj, "POST", "arvados/v1/groups", nil, map[string]interface{}{
+ "group": map[string]interface{}{
+ "group_class": "project",
+ "name": "keep-web s3 test",
+ },
+ "ensure_unique_name": true,
+ })
+ c.Assert(err, check.IsNil)
+ err = arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+ "owner_uuid": proj.UUID,
+ "name": "keep-web s3 test collection",
"manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
}})
c.Assert(err, check.IsNil)
@@ -49,23 +68,40 @@ func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collect
S3Endpoint: "http://" + s.testServer.Addr,
}
client := s3.New(*auth, region)
- bucket := &s3.Bucket{
- S3: client,
- Name: coll.UUID,
+ return s3stage{
+ arv: arv,
+ proj: proj,
+ projbucket: &s3.Bucket{
+ S3: client,
+ Name: proj.UUID,
+ },
+ coll: coll,
+ collbucket: &s3.Bucket{
+ S3: client,
+ Name: coll.UUID,
+ },
}
- return arv, coll, bucket
}
-func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
- err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
- c.Check(err, check.IsNil)
+func (stage s3stage) teardown(c *check.C) {
+ if stage.coll.UUID != "" {
+ err := stage.arv.RequestAndDecode(&stage.coll, "DELETE", "arvados/v1/collections/"+stage.coll.UUID, nil, nil)
+ c.Check(err, check.IsNil)
+ }
}
-func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
- arv, coll, bucket := s.s3setup(c)
- defer s.s3teardown(c, arv, coll)
-
- rdr, err := bucket.GetReader("emptyfile")
+func (s *IntegrationSuite) TestS3CollectionGetObject(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3GetObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectGetObject(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3GetObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix string) {
+ rdr, err := bucket.GetReader(prefix + "emptyfile")
c.Assert(err, check.IsNil)
buf, err := ioutil.ReadAll(rdr)
c.Check(err, check.IsNil)
@@ -73,11 +109,11 @@ func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
err = rdr.Close()
c.Check(err, check.IsNil)
- rdr, err = bucket.GetReader("missingfile")
+ rdr, err = bucket.GetReader(prefix + "missingfile")
c.Check(err, check.NotNil)
- rdr, err = bucket.GetReader("sailboat.txt")
- c.Check(err, check.IsNil)
+ rdr, err = bucket.GetReader(prefix + "sailboat.txt")
+ c.Assert(err, check.IsNil)
buf, err = ioutil.ReadAll(rdr)
c.Check(err, check.IsNil)
c.Check(buf, check.DeepEquals, []byte("⛵\n"))
@@ -85,37 +121,46 @@ func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
c.Check(err, check.IsNil)
}
-func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
- arv, coll, bucket := s.s3setup(c)
- defer s.s3teardown(c, arv, coll)
-
+func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectSuccess(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
for _, trial := range []struct {
- objname string
- size int
+ path string
+ size int
}{
{
- objname: "newfile",
- size: 128000000,
+ path: "newfile",
+ size: 128000000,
}, {
- objname: "newdir/newfile",
- size: 1 << 26,
+ path: "newdir/newfile",
+ size: 1 << 26,
}, {
- objname: "newdir1/newdir2/newfile",
- size: 0,
+ path: "newdir1/newdir2/newfile",
+ size: 0,
},
} {
c.Logf("=== %v", trial)
- _, err := bucket.GetReader(trial.objname)
+ objname := prefix + trial.path
+
+ _, err := bucket.GetReader(objname)
c.Assert(err, check.NotNil)
buf := make([]byte, trial.size)
rand.Read(buf)
- err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
c.Check(err, check.IsNil)
- rdr, err := bucket.GetReader(trial.objname)
+ rdr, err := bucket.GetReader(objname)
if !c.Check(err, check.IsNil) {
continue
}
@@ -126,48 +171,57 @@ func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
}
}
-func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
- arv, coll, bucket := s.s3setup(c)
- defer s.s3teardown(c, arv, coll)
-
+func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectFailure(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
+ stage := s.s3setup(c)
+ defer stage.teardown(c)
+ s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
for _, trial := range []struct {
- objname string
+ path string
}{
{
- objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+ path: "emptyfile/newname", // emptyfile exists, see s3setup()
}, {
- objname: "emptyfile/", // emptyfile exists, see s3setup()
+ path: "emptyfile/", // emptyfile exists, see s3setup()
}, {
- objname: "emptydir", // dir already exists, see s3setup()
+ path: "emptydir", // dir already exists, see s3setup()
}, {
- objname: "emptydir/",
+ path: "emptydir/",
}, {
- objname: "emptydir//",
+ path: "emptydir//",
}, {
- objname: "newdir/",
+ path: "newdir/",
}, {
- objname: "newdir//",
+ path: "newdir//",
}, {
- objname: "/",
+ path: "/",
}, {
- objname: "//",
+ path: "//",
}, {
- objname: "foo//bar",
+ path: "foo//bar",
}, {
- objname: "",
+ path: "",
},
} {
c.Logf("=== %v", trial)
+ objname := prefix + trial.path
+
buf := make([]byte, 1234)
rand.Read(buf)
- err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
- if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+ err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", objname)) {
continue
}
- _, err = bucket.GetReader(trial.objname)
+ _, err = bucket.GetReader(objname)
c.Check(err, check.NotNil)
}
}
commit a0aebd60d543eb9bafb95ff21a695a50e473ca56
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Jul 13 10:45:47 2020 -0400
16535: Move s3 handler to s3.go.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index b8071b914..915924e28 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -6,7 +6,6 @@ package main
import (
"encoding/json"
- "fmt"
"html"
"html/template"
"io"
@@ -228,17 +227,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
- if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
- split := strings.SplitN(auth[4:], ":", 2)
- if len(split) < 2 {
- w.WriteHeader(http.StatusUnauthorized)
- return
- }
- h.serveS3(w, r, split[0])
- return
- } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
- w.WriteHeader(http.StatusBadRequest)
- fmt.Println(w, "V4 signature is not supported")
+ if h.serveS3(w, r) {
return
}
@@ -545,82 +534,6 @@ func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosCli
return
}
-func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
- _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
- if err != nil {
- http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
- return
- }
- defer release()
-
- r.URL.Path = "/by_id" + r.URL.Path
-
- fs := client.SiteFileSystem(kc)
- fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
-
- switch r.Method {
- case "GET":
- fi, err := fs.Stat(r.URL.Path)
- if os.IsNotExist(err) {
- http.Error(w, err.Error(), http.StatusNotFound)
- return
- } else if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- } else if fi.IsDir() {
- http.Error(w, "not found", http.StatusNotFound)
- }
- http.FileServer(fs).ServeHTTP(w, r)
- return
- case "PUT":
- f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
- if os.IsNotExist(err) {
- // create missing intermediate directories, then try again
- for i, c := range r.URL.Path {
- if i > 0 && c == '/' {
- dir := r.URL.Path[:i]
- err := fs.Mkdir(dir, 0755)
- if err != nil && err != os.ErrExist {
- err = fmt.Errorf("mkdir %q failed: %w", dir, err)
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- }
- }
- f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
- }
- if err != nil {
- err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
- http.Error(w, err.Error(), http.StatusBadRequest)
- return
- }
- defer f.Close()
- _, err = io.Copy(f, r.Body)
- if err != nil {
- err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
- http.Error(w, err.Error(), http.StatusBadGateway)
- return
- }
- err = f.Close()
- if err != nil {
- err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
- http.Error(w, err.Error(), http.StatusBadGateway)
- return
- }
- err = fs.Sync()
- if err != nil {
- err = fmt.Errorf("sync failed: %w", err)
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- w.WriteHeader(http.StatusOK)
- return
- default:
- http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
-}
-
func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
if len(tokens) == 0 {
w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
new file mode 100644
index 000000000..c6f501e49
--- /dev/null
+++ b/services/keep-web/s3.go
@@ -0,0 +1,107 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "strings"
+)
+
+// serveS3 handles r and returns true if r is a request from an S3
+// client, otherwise it returns false.
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
+ var token string
+ if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+ split := strings.SplitN(auth[4:], ":", 2)
+ if len(split) < 2 {
+ w.WriteHeader(http.StatusUnauthorized)
+ return true
+ }
+ token = split[0]
+ } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Println(w, "V4 signature is not supported")
+ return true
+ } else {
+ return false
+ }
+
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ return true
+ }
+ defer release()
+
+ r.URL.Path = "/by_id" + r.URL.Path
+
+ fs := client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+ switch r.Method {
+ case "GET":
+ fi, err := fs.Stat(r.URL.Path)
+ if os.IsNotExist(err) {
+ http.Error(w, err.Error(), http.StatusNotFound)
+ return true
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return true
+ } else if fi.IsDir() {
+ http.Error(w, "not found", http.StatusNotFound)
+ }
+ http.FileServer(fs).ServeHTTP(w, r)
+ return true
+ case "PUT":
+ f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ if os.IsNotExist(err) {
+ // create missing intermediate directories, then try again
+ for i, c := range r.URL.Path {
+ if i > 0 && c == '/' {
+ dir := r.URL.Path[:i]
+ err := fs.Mkdir(dir, 0755)
+ if err != nil && err != os.ErrExist {
+ err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return true
+ }
+ }
+ }
+ f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ }
+ if err != nil {
+ err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+ defer f.Close()
+ _, err = io.Copy(f, r.Body)
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return true
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return true
+ }
+ err = fs.Sync()
+ if err != nil {
+ err = fmt.Errorf("sync failed: %w", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return true
+ }
+ w.WriteHeader(http.StatusOK)
+ return true
+ default:
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return true
+ }
+}
commit 86f966c2b2a14d9f30135cf2fc2b9cb08270d1b1
Author: Tom Clegg <tom at tomclegg.ca>
Date: Mon Jul 13 09:55:57 2020 -0400
16535: Add s3 endpoint.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 37bd49491..5970ec612 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -193,6 +193,25 @@ func (fs *collectionFileSystem) Size() int64 {
return fs.fileSystem.root.(*dirnode).TreeSize()
}
+// asChildNode() repackages fs as an inode that can be used as a child
+// node in a different fs. Not goroutine-safe.
+//
+// After calling asChildNode(), the caller should not use fs directly.
+func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
+ root := fs.rootnode().(*dirnode)
+ root.SetParent(parent, name)
+ return &collectionfsnode{dirnode: root, fs: fs}
+}
+
+type collectionfsnode struct {
+ *dirnode
+ fs *collectionFileSystem
+}
+
+func (cn *collectionfsnode) Sync() error {
+ return cn.fs.Sync()
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 439eaec7c..4eb48b2f7 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -37,9 +37,7 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
log.Printf("BUG: unhandled error: %s", err)
return placeholder
}
- root := cfs.rootnode()
- root.SetParent(parent, coll.Name)
- return root
+ return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
}}
}
@@ -87,6 +85,18 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
return dn.realinode().Child(name, replace)
}
+// Sync is currently unimplemented, except when it's a no-op because
+// the real inode hasn't been created.
+func (dn *deferrednode) Sync() error {
+ dn.mtx.Lock()
+ defer dn.mtx.Unlock()
+ if dn.created {
+ return ErrInvalidArgument
+ } else {
+ return nil
+ }
+}
+
func (dn *deferrednode) Truncate(size int64) error { return dn.realinode().Truncate(size) }
func (dn *deferrednode) SetParent(p inode, name string) { dn.realinode().SetParent(p, name) }
func (dn *deferrednode) IsDir() bool { return dn.currentinode().IsDir() }
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 7826d335c..2e131c33f 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -5,6 +5,7 @@
package arvados
import (
+ "fmt"
"os"
"strings"
"sync"
@@ -116,8 +117,41 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
func (fs *customFileSystem) Sync() error {
fs.staleLock.Lock()
- defer fs.staleLock.Unlock()
fs.staleThreshold = time.Now()
+ fs.staleLock.Unlock()
+ return fs.syncTree("/", fs.root.inode)
+}
+
+// syncTree calls node.Sync() if it has its own Sync method, otherwise
+// it calls syncTree() on all of node's children.
+//
+// Returns ErrInvalidArgument if node does not implement Sync() and
+// isn't a directory (or if Readdir() fails for any other reason).
+func (fs *customFileSystem) syncTree(path string, node inode) error {
+ if vn, ok := node.(*vdirnode); ok {
+ node = vn.inode
+ }
+ if syncer, ok := node.(interface{ Sync() error }); ok {
+ err := syncer.Sync()
+ if err != nil {
+ return fmt.Errorf("%s: %T: %w", path, syncer, err)
+ }
+ return nil
+ }
+ fis, err := node.Readdir()
+ if err != nil {
+ return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
+ }
+ for _, fi := range fis {
+ child, err := node.Child(fi.Name(), nil)
+ if err != nil {
+ continue
+ }
+ err = fs.syncTree(path+"/"+fi.Name(), child)
+ if err != nil {
+ return err
+ }
+ }
return nil
}
@@ -153,9 +187,7 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
if err != nil {
return nil
}
- root := cfs.rootnode()
- root.SetParent(parent, id)
- return root
+ return cfs.(*collectionFileSystem).asChildNode(parent, id)
}
func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 643ca4f58..b8071b914 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -6,6 +6,7 @@ package main
import (
"encoding/json"
+ "fmt"
"html"
"html/template"
"io"
@@ -227,6 +228,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
}
+ if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+ split := strings.SplitN(auth[4:], ":", 2)
+ if len(split) < 2 {
+ w.WriteHeader(http.StatusUnauthorized)
+ return
+ }
+ h.serveS3(w, r, split[0])
+ return
+ } else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Println(w, "V4 signature is not supported")
+ return
+ }
+
pathParts := strings.Split(r.URL.Path[1:], "/")
var stripParts int
@@ -509,6 +524,103 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
}
}
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+ arv = h.clientPool.Get()
+ if arv == nil {
+ return nil, nil, nil, nil, err
+ }
+ release = func() { h.clientPool.Put(arv) }
+ arv.ApiToken = token
+ kc, err = keepclient.MakeKeepClient(arv)
+ if err != nil {
+ release()
+ return
+ }
+ kc.RequestID = reqID
+ client = (&arvados.Client{
+ APIHost: arv.ApiServer,
+ AuthToken: arv.ApiToken,
+ Insecure: arv.ApiInsecure,
+ }).WithRequestID(reqID)
+ return
+}
+
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+ if err != nil {
+ http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+ return
+ }
+ defer release()
+
+ r.URL.Path = "/by_id" + r.URL.Path
+
+ fs := client.SiteFileSystem(kc)
+ fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+ switch r.Method {
+ case "GET":
+ fi, err := fs.Stat(r.URL.Path)
+ if os.IsNotExist(err) {
+ http.Error(w, err.Error(), http.StatusNotFound)
+ return
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ } else if fi.IsDir() {
+ http.Error(w, "not found", http.StatusNotFound)
+ }
+ http.FileServer(fs).ServeHTTP(w, r)
+ return
+ case "PUT":
+ f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ if os.IsNotExist(err) {
+ // create missing intermediate directories, then try again
+ for i, c := range r.URL.Path {
+ if i > 0 && c == '/' {
+ dir := r.URL.Path[:i]
+ err := fs.Mkdir(dir, 0755)
+ if err != nil && err != os.ErrExist {
+ err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ }
+ }
+ f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+ }
+ if err != nil {
+ err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ defer f.Close()
+ _, err = io.Copy(f, r.Body)
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ return
+ }
+ err = fs.Sync()
+ if err != nil {
+ err = fmt.Errorf("sync failed: %w", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ return
+ default:
+ http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+ return
+ }
+}
+
func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
if len(tokens) == 0 {
w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
@@ -519,25 +631,13 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
return
}
- arv := h.clientPool.Get()
- if arv == nil {
+ _, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+ if err != nil {
http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
return
}
- defer h.clientPool.Put(arv)
- arv.ApiToken = tokens[0]
+ defer release()
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
- return
- }
- kc.RequestID = r.Header.Get("X-Request-Id")
- client := (&arvados.Client{
- APIHost: arv.ApiServer,
- AuthToken: arv.ApiToken,
- Insecure: arv.ApiInsecure,
- }).WithRequestID(r.Header.Get("X-Request-Id"))
fs := client.SiteFileSystem(kc)
fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
f, err := fs.Open(r.URL.Path)
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
new file mode 100644
index 000000000..600b83496
--- /dev/null
+++ b/services/keep-web/s3_test.go
@@ -0,0 +1,173 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "crypto/rand"
+ "io/ioutil"
+ "os"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/AdRoll/goamz/aws"
+ "github.com/AdRoll/goamz/s3"
+ check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+ var coll arvados.Collection
+ arv := arvados.NewClientFromEnv()
+ arv.AuthToken = arvadostest.ActiveToken
+ err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+ "manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+ }})
+ c.Assert(err, check.IsNil)
+ ac, err := arvadosclient.New(arv)
+ c.Assert(err, check.IsNil)
+ kc, err := keepclient.MakeKeepClient(ac)
+ c.Assert(err, check.IsNil)
+ fs, err := coll.FileSystem(arv, kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("⛵\n"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ err = fs.Sync()
+ c.Assert(err, check.IsNil)
+
+ auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+ region := aws.Region{
+ Name: s.testServer.Addr,
+ S3Endpoint: "http://" + s.testServer.Addr,
+ }
+ client := s3.New(*auth, region)
+ bucket := &s3.Bucket{
+ S3: client,
+ Name: coll.UUID,
+ }
+ return arv, coll, bucket
+}
+
+func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
+ err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ rdr, err := bucket.GetReader("emptyfile")
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf), check.Equals, 0)
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+
+ rdr, err = bucket.GetReader("missingfile")
+ c.Check(err, check.NotNil)
+
+ rdr, err = bucket.GetReader("sailboat.txt")
+ c.Check(err, check.IsNil)
+ buf, err = ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+ err = rdr.Close()
+ c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ for _, trial := range []struct {
+ objname string
+ size int
+ }{
+ {
+ objname: "newfile",
+ size: 128000000,
+ }, {
+ objname: "newdir/newfile",
+ size: 1 << 26,
+ }, {
+ objname: "newdir1/newdir2/newfile",
+ size: 0,
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ _, err := bucket.GetReader(trial.objname)
+ c.Assert(err, check.NotNil)
+
+ buf := make([]byte, trial.size)
+ rand.Read(buf)
+
+ err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ c.Check(err, check.IsNil)
+
+ rdr, err := bucket.GetReader(trial.objname)
+ if !c.Check(err, check.IsNil) {
+ continue
+ }
+ buf2, err := ioutil.ReadAll(rdr)
+ c.Check(err, check.IsNil)
+ c.Check(buf2, check.HasLen, len(buf))
+ c.Check(buf2, check.DeepEquals, buf)
+ }
+}
+
+func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
+ arv, coll, bucket := s.s3setup(c)
+ defer s.s3teardown(c, arv, coll)
+
+ for _, trial := range []struct {
+ objname string
+ }{
+ {
+ objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+ }, {
+ objname: "emptyfile/", // emptyfile exists, see s3setup()
+ }, {
+ objname: "emptydir", // dir already exists, see s3setup()
+ }, {
+ objname: "emptydir/",
+ }, {
+ objname: "emptydir//",
+ }, {
+ objname: "newdir/",
+ }, {
+ objname: "newdir//",
+ }, {
+ objname: "/",
+ }, {
+ objname: "//",
+ }, {
+ objname: "foo//bar",
+ }, {
+ objname: "",
+ },
+ } {
+ c.Logf("=== %v", trial)
+
+ buf := make([]byte, 1234)
+ rand.Read(buf)
+
+ err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+ if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+ continue
+ }
+
+ _, err = bucket.GetReader(trial.objname)
+ c.Check(err, check.NotNil)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list