[ARVADOS] created: 1.3.0-2777-gb291fc626

Git user git at public.arvados.org
Tue Jul 21 15:18:03 UTC 2020


        at  b291fc6265aa6a0f60d51cc49bf32b9aea847d1b (commit)


commit b291fc6265aa6a0f60d51cc49bf32b9aea847d1b
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Tue Jul 21 11:17:49 2020 -0400

    16535: Support delimiter/rollup option in list API.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 55d85c69a..caacd3108 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -134,9 +134,11 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 	}
 }
 
-func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+func walkFS(fs arvados.CustomFileSystem, path string, ignoreNotFound bool, fn func(path string, fi os.FileInfo) error) error {
 	f, err := fs.Open(path)
-	if err != nil {
+	if os.IsNotExist(err) && ignoreNotFound {
+		return nil
+	} else if err != nil {
 		return fmt.Errorf("open %q: %w", path, err)
 	}
 	defer f.Close()
@@ -156,7 +158,7 @@ func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os
 			return err
 		}
 		if fi.IsDir() {
-			err = walkFS(fs, path+"/"+fi.Name(), fn)
+			err = walkFS(fs, path+"/"+fi.Name(), false, fn)
 			if err != nil {
 				return err
 			}
@@ -194,45 +196,80 @@ func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.Cust
 	// prefix "foo"      => walkpath ""
 	// prefix ""         => walkpath ""
 	walkpath := params.prefix
-	if !strings.HasSuffix(walkpath, "/") {
-		walkpath, _ = filepath.Split(walkpath)
+	if cut := strings.LastIndex(walkpath, "/"); cut >= 0 {
+		walkpath = walkpath[:cut]
+	} else {
+		walkpath = ""
 	}
-	walkpath = strings.TrimSuffix(walkpath, "/")
 
-	type commonPrefix struct {
-		Prefix string
-	}
-	type serverListResponse struct {
-		s3.ListResp
-		CommonPrefixes []commonPrefix
-	}
-	resp := serverListResponse{ListResp: s3.ListResp{
+	resp := s3.ListResp{
 		Name:      strings.SplitN(r.URL.Path[1:], "/", 2)[0],
 		Prefix:    params.prefix,
 		Delimiter: params.delimiter,
 		Marker:    params.marker,
 		MaxKeys:   params.maxKeys,
-	}}
-	err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+	}
+	commonPrefixes := map[string]bool{}
+	err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), true, func(path string, fi os.FileInfo) error {
 		path = path[len(bucketdir)+1:]
-		if !strings.HasPrefix(path, params.prefix) {
-			return filepath.SkipDir
+		if len(path) <= len(params.prefix) {
+			if path > params.prefix[:len(path)] {
+				// with prefix "foobar", walking "fooz" means we're done
+				return errDone
+			}
+			if path < params.prefix[:len(path)] {
+				// with prefix "foobar", walking "foobag" is pointless
+				return filepath.SkipDir
+			}
+			if fi.IsDir() && !strings.HasPrefix(params.prefix+"/", path+"/") {
+				// with prefix "foo/bar", walking "fo"
+				// is pointless (but walking "foo" or
+				// "foo/bar" is necessary)
+				return filepath.SkipDir
+			}
+			if len(path) < len(params.prefix) {
+				// can't skip anything, and this entry
+				// isn't in the results, so just
+				// continue descent
+				return nil
+			}
+		} else {
+			if path[:len(params.prefix)] > params.prefix {
+				// with prefix "foobar", nothing we
+				// see after "foozzz" is relevant
+				return errDone
+			}
 		}
-		if fi.IsDir() {
+		if path < params.marker || path < params.prefix {
 			return nil
 		}
-		if path < params.marker {
+		if fi.IsDir() {
 			return nil
 		}
-		// TODO: check delimiter, roll up common prefixes
-		if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+		if params.delimiter != "" {
+			idx := strings.Index(path[len(params.prefix):], params.delimiter)
+			if idx >= 0 {
+				// with prefix "foobar" and delimiter
+				// "z", when we hit "foobar/baz", we
+				// add "/baz" to commonPrefixes and
+				// stop descending (note that even if
+				// delimiter is "/" we don't add
+				// anything to commonPrefixes when
+				// seeing a dir: we wait until we see
+				// a file, so we don't incorrectly
+				// return results for empty dirs)
+				commonPrefixes[path[:len(params.prefix)+idx+1]] = true
+				return filepath.SkipDir
+			}
+		}
+		if len(resp.Contents)+len(commonPrefixes) >= params.maxKeys {
 			resp.IsTruncated = true
-			if params.delimiter == "" {
+			if params.delimiter != "" {
 				resp.NextMarker = path
 			}
 			return errDone
 		}
-		resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+		resp.Contents = append(resp.Contents, s3.Key{
 			Key: path,
 		})
 		return nil
@@ -241,6 +278,12 @@ func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.Cust
 		http.Error(w, err.Error(), http.StatusInternalServerError)
 		return
 	}
+	if params.delimiter != "" {
+		for prefix := range commonPrefixes {
+			resp.CommonPrefixes = append(resp.CommonPrefixes, prefix)
+			sort.Strings(resp.CommonPrefixes)
+		}
+	}
 	if err := xml.NewEncoder(w).Encode(resp); err != nil {
 		ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
 	}
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 5fab3607f..d88e3b5ed 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"os"
+	"strings"
 	"sync"
 	"time"
 
@@ -243,15 +244,11 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
 	wg.Wait()
 }
 
-func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
-	stage := s.s3setup(c)
-	defer stage.teardown(c)
-
-	filesPerDir := 1001
-
+func (stage *s3stage) writeBigDirs(c *check.C, dirs int, filesPerDir int) {
 	fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
 	c.Assert(err, check.IsNil)
-	for _, dir := range []string{"dir1", "dir2"} {
+	for d := 0; d < dirs; d++ {
+		dir := fmt.Sprintf("dir%d", d)
 		c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
 		for i := 0; i < filesPerDir; i++ {
 			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
@@ -260,9 +257,17 @@ func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
 		}
 	}
 	c.Assert(fs.Sync(), check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+
+	filesPerDir := 1001
+	stage.writeBigDirs(c, 2, filesPerDir)
 	s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
 	s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
-	s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+	s.testS3List(c, stage.collbucket, "dir0/", 71, filesPerDir)
 }
 func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
 	gotKeys := map[string]s3.Key{}
@@ -291,3 +296,99 @@ func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix stri
 	}
 	c.Check(len(gotKeys), check.Equals, expectFiles)
 }
+
+func (s *IntegrationSuite) TestS3CollectionListRollup(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+
+	dirs := 2
+	filesPerDir := 500
+	stage.writeBigDirs(c, dirs, filesPerDir)
+	err := stage.collbucket.PutReader("dingbats", &bytes.Buffer{}, 0, "application/octet-stream", s3.Private, s3.Options{})
+	c.Assert(err, check.IsNil)
+	resp, err := stage.collbucket.List("", "", "", 20000)
+	c.Check(err, check.IsNil)
+	var allfiles []string
+	for _, key := range resp.Contents {
+		allfiles = append(allfiles, key.Key)
+	}
+	c.Check(allfiles, check.HasLen, dirs*filesPerDir+3)
+
+	for _, trial := range []struct {
+		prefix    string
+		delimiter string
+		marker    string
+	}{
+		{"di", "/", ""},
+		{"di", "r", ""},
+		{"di", "n", ""},
+		{"dir0", "/", ""},
+		{"dir0", "/", "dir0/file14.txt"},       // no commonprefixes
+		{"", "", "dir0/file14.txt"},            // middle page, skip walking dir1
+		{"", "", "dir1/file14.txt"},            // middle page, skip walking dir0
+		{"", "", "dir1/file498.txt"},           // last page of results
+		{"dir1/file", "", "dir1/file498.txt"},  // last page of results, with prefix
+		{"dir1/file", "/", "dir1/file498.txt"}, // last page of results, with prefix + delimiter
+		{"dir1", "Z", "dir1/file498.txt"},      // delimiter "Z" never appears
+		{"dir2", "/", ""},                      // prefix "dir2" does not exist
+		{"", "/", ""},
+	} {
+		c.Logf("\n\n=== trial %+v", trial)
+
+		maxKeys := 20
+		resp, err := stage.collbucket.List(trial.prefix, trial.delimiter, trial.marker, maxKeys)
+		c.Check(err, check.IsNil)
+		if resp.IsTruncated && trial.delimiter == "" {
+			// goamz List method fills in the missing
+			// NextMarker field if resp.IsTruncated, so
+			// now we can't really tell whether it was
+			// sent by the server or by goamz. In cases
+			// where it should be empty but isn't, assume
+			// it's goamz's fault.
+			resp.NextMarker = ""
+		}
+
+		var expectKeys []string
+		var expectPrefixes []string
+		var expectNextMarker string
+		var expectTruncated bool
+		for _, key := range allfiles {
+			full := len(expectKeys)+len(expectPrefixes) >= maxKeys
+			if !strings.HasPrefix(key, trial.prefix) || key < trial.marker {
+				continue
+			} else if idx := strings.Index(key[len(trial.prefix):], trial.delimiter); trial.delimiter != "" && idx >= 0 {
+				prefix := key[:len(trial.prefix)+idx+1]
+				if len(expectPrefixes) > 0 && expectPrefixes[len(expectPrefixes)-1] == prefix {
+					// same prefix as previous key
+				} else if full {
+					expectNextMarker = key
+					expectTruncated = true
+				} else {
+					expectPrefixes = append(expectPrefixes, prefix)
+				}
+			} else if full {
+				if trial.delimiter != "" {
+					expectNextMarker = key
+				}
+				expectTruncated = true
+				break
+			} else {
+				expectKeys = append(expectKeys, key)
+			}
+		}
+
+		var gotKeys []string
+		for _, key := range resp.Contents {
+			gotKeys = append(gotKeys, key.Key)
+		}
+		var gotPrefixes []string
+		for _, prefix := range resp.CommonPrefixes {
+			gotPrefixes = append(gotPrefixes, prefix)
+		}
+		c.Check(gotKeys, check.DeepEquals, expectKeys)
+		c.Check(gotPrefixes, check.DeepEquals, expectPrefixes)
+		c.Check(resp.NextMarker, check.Equals, expectNextMarker)
+		c.Check(resp.IsTruncated, check.Equals, expectTruncated)
+		c.Logf("=== trial %+v keys %q prefixes %q nextMarker %q", trial, gotKeys, gotPrefixes, resp.NextMarker)
+	}
+}

commit d94095c1457273a49907d00d06a7d802ba979509
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Fri Jul 17 16:06:18 2020 -0400

    16535: Add ListObjects API.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index 5043c65ec..55d85c69a 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -5,12 +5,20 @@
 package main
 
 import (
+	"encoding/xml"
 	"errors"
 	"fmt"
 	"io"
 	"net/http"
 	"os"
+	"path/filepath"
+	"sort"
+	"strconv"
 	"strings"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
+	"git.arvados.org/arvados.git/tmp/GOPATH/src/github.com/AdRoll/goamz/s3"
 )
 
 // serveS3 handles r and returns true if r is a request from an S3
@@ -39,38 +47,46 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 	}
 	defer release()
 
-	r.URL.Path = "/by_id" + r.URL.Path
-
 	fs := client.SiteFileSystem(kc)
 	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 
-	fi, err := fs.Stat(r.URL.Path)
-	switch r.Method {
-	case "GET":
+	switch {
+	case r.Method == "GET" && strings.Count(strings.TrimSuffix(r.URL.Path, "/"), "/") == 1:
+		// Path is "/{uuid}" or "/{uuid}/", has no object name
+		h.s3list(w, r, fs)
+		return true
+	case r.Method == "GET":
+		fspath := "/by_id" + r.URL.Path
+		fi, err := fs.Stat(fspath)
 		if os.IsNotExist(err) ||
 			(err != nil && err.Error() == "not a directory") ||
 			(fi != nil && fi.IsDir()) {
 			http.Error(w, "not found", http.StatusNotFound)
 			return true
 		}
-		http.FileServer(fs).ServeHTTP(w, r)
+		// shallow copy r, and change URL path
+		r := *r
+		r.URL.Path = fspath
+		http.FileServer(fs).ServeHTTP(w, &r)
 		return true
-	case "PUT":
+	case r.Method == "PUT":
 		if strings.HasSuffix(r.URL.Path, "/") {
 			http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
 			return true
 		}
+		fspath := "by_id" + r.URL.Path
+		_, err = fs.Stat(fspath)
 		if err != nil && err.Error() == "not a directory" {
 			// requested foo/bar, but foo is a file
 			http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
 			return true
 		}
-		f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		f, err := fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
 		if os.IsNotExist(err) {
 			// create missing intermediate directories, then try again
-			for i, c := range r.URL.Path {
+			for i, c := range fspath {
 				if i > 0 && c == '/' {
-					dir := r.URL.Path[:i]
+					dir := fspath[:i]
 					if strings.HasSuffix(dir, "/") {
 						err = errors.New("invalid object name (consecutive '/' chars)")
 						http.Error(w, err.Error(), http.StatusBadRequest)
@@ -84,7 +100,7 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 					}
 				}
 			}
-			f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+			f, err = fs.OpenFile(fspath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
 		}
 		if err != nil {
 			err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
@@ -117,3 +133,115 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 		return true
 	}
 }
+
+func walkFS(fs arvados.CustomFileSystem, path string, fn func(path string, fi os.FileInfo) error) error {
+	f, err := fs.Open(path)
+	if err != nil {
+		return fmt.Errorf("open %q: %w", path, err)
+	}
+	defer f.Close()
+	if path == "/" {
+		path = ""
+	}
+	fis, err := f.Readdir(-1)
+	if err != nil {
+		return err
+	}
+	sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
+	for _, fi := range fis {
+		err = fn(path+"/"+fi.Name(), fi)
+		if err == filepath.SkipDir {
+			continue
+		} else if err != nil {
+			return err
+		}
+		if fi.IsDir() {
+			err = walkFS(fs, path+"/"+fi.Name(), fn)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+var errDone = errors.New("done")
+
+func (h *handler) s3list(w http.ResponseWriter, r *http.Request, fs arvados.CustomFileSystem) {
+	var params struct {
+		bucket    string
+		delimiter string
+		marker    string
+		maxKeys   int
+		prefix    string
+	}
+	params.bucket = strings.SplitN(r.URL.Path[1:], "/", 2)[0]
+	params.delimiter = r.FormValue("delimiter")
+	params.marker = r.FormValue("marker")
+	if mk, _ := strconv.ParseInt(r.FormValue("max-keys"), 10, 64); mk > 0 {
+		params.maxKeys = int(mk)
+	} else {
+		params.maxKeys = 100
+	}
+	params.prefix = r.FormValue("prefix")
+
+	bucketdir := "by_id/" + params.bucket
+	// walkpath is the directory (relative to bucketdir) we need
+	// to walk: the innermost directory that is guaranteed to
+	// contain all paths that have the requested prefix. Examples:
+	// prefix "foo/bar"  => walkpath "foo"
+	// prefix "foo/bar/" => walkpath "foo/bar"
+	// prefix "foo"      => walkpath ""
+	// prefix ""         => walkpath ""
+	walkpath := params.prefix
+	if !strings.HasSuffix(walkpath, "/") {
+		walkpath, _ = filepath.Split(walkpath)
+	}
+	walkpath = strings.TrimSuffix(walkpath, "/")
+
+	type commonPrefix struct {
+		Prefix string
+	}
+	type serverListResponse struct {
+		s3.ListResp
+		CommonPrefixes []commonPrefix
+	}
+	resp := serverListResponse{ListResp: s3.ListResp{
+		Name:      strings.SplitN(r.URL.Path[1:], "/", 2)[0],
+		Prefix:    params.prefix,
+		Delimiter: params.delimiter,
+		Marker:    params.marker,
+		MaxKeys:   params.maxKeys,
+	}}
+	err := walkFS(fs, strings.TrimSuffix(bucketdir+"/"+walkpath, "/"), func(path string, fi os.FileInfo) error {
+		path = path[len(bucketdir)+1:]
+		if !strings.HasPrefix(path, params.prefix) {
+			return filepath.SkipDir
+		}
+		if fi.IsDir() {
+			return nil
+		}
+		if path < params.marker {
+			return nil
+		}
+		// TODO: check delimiter, roll up common prefixes
+		if len(resp.Contents)+len(resp.CommonPrefixes) >= params.maxKeys {
+			resp.IsTruncated = true
+			if params.delimiter == "" {
+				resp.NextMarker = path
+			}
+			return errDone
+		}
+		resp.ListResp.Contents = append(resp.ListResp.Contents, s3.Key{
+			Key: path,
+		})
+		return nil
+	})
+	if err != nil && err != errDone {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	if err := xml.NewEncoder(w).Encode(resp); err != nil {
+		ctxlog.FromContext(r.Context()).WithError(err).Error("error writing xml response")
+	}
+}
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index c4c216c8b..5fab3607f 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -7,6 +7,7 @@ package main
 import (
 	"bytes"
 	"crypto/rand"
+	"fmt"
 	"io/ioutil"
 	"os"
 	"sync"
@@ -23,6 +24,8 @@ import (
 
 type s3stage struct {
 	arv        *arvados.Client
+	ac         *arvadosclient.ArvadosClient
+	kc         *keepclient.KeepClient
 	proj       arvados.Group
 	projbucket *s3.Bucket
 	coll       arvados.Collection
@@ -62,6 +65,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
 	c.Assert(err, check.IsNil)
 	err = fs.Sync()
 	c.Assert(err, check.IsNil)
+	err = arv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+	c.Assert(err, check.IsNil)
 
 	auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
 	region := aws.Region{
@@ -71,6 +76,8 @@ func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
 	client := s3.New(*auth, region)
 	return s3stage{
 		arv:  arv,
+		ac:   ac,
+		kc:   kc,
 		proj: proj,
 		projbucket: &s3.Bucket{
 			S3:   client,
@@ -227,9 +234,60 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
 				return
 			}
 
-			_, err = bucket.GetReader(objname)
-			c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+			if objname != "" && objname != "/" {
+				_, err = bucket.GetReader(objname)
+				c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+			}
 		}()
 	}
 	wg.Wait()
 }
+
+func (s *IntegrationSuite) TestS3CollectionList(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+
+	filesPerDir := 1001
+
+	fs, err := stage.coll.FileSystem(stage.arv, stage.kc)
+	c.Assert(err, check.IsNil)
+	for _, dir := range []string{"dir1", "dir2"} {
+		c.Assert(fs.Mkdir(dir, 0755), check.IsNil)
+		for i := 0; i < filesPerDir; i++ {
+			f, err := fs.OpenFile(fmt.Sprintf("%s/file%d.txt", dir, i), os.O_CREATE|os.O_WRONLY, 0644)
+			c.Assert(err, check.IsNil)
+			c.Assert(f.Close(), check.IsNil)
+		}
+	}
+	c.Assert(fs.Sync(), check.IsNil)
+	s.testS3List(c, stage.collbucket, "", 4000, 2+filesPerDir*2)
+	s.testS3List(c, stage.collbucket, "", 131, 2+filesPerDir*2)
+	s.testS3List(c, stage.collbucket, "dir1/", 71, filesPerDir)
+}
+func (s *IntegrationSuite) testS3List(c *check.C, bucket *s3.Bucket, prefix string, pageSize, expectFiles int) {
+	gotKeys := map[string]s3.Key{}
+	nextMarker := ""
+	pages := 0
+	for {
+		resp, err := bucket.List(prefix, "", nextMarker, pageSize)
+		if !c.Check(err, check.IsNil) {
+			break
+		}
+		c.Check(len(resp.Contents) <= pageSize, check.Equals, true)
+		if pages++; !c.Check(pages <= (expectFiles/pageSize)+1, check.Equals, true) {
+			break
+		}
+		for _, key := range resp.Contents {
+			gotKeys[key.Key] = key
+		}
+		if !resp.IsTruncated {
+			c.Check(resp.NextMarker, check.Equals, "")
+			break
+		}
+		if !c.Check(resp.NextMarker, check.Not(check.Equals), "") {
+			break
+		}
+		nextMarker = resp.NextMarker
+	}
+	c.Check(len(gotKeys), check.Equals, expectFiles)
+}

commit bdddd6c310ce73958b251e977faab7bbadd75452
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Wed Jul 15 12:15:38 2020 -0400

    16535: Support Sync() on customfilesystem.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index f01369a88..59a6a6ba8 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -7,7 +7,6 @@ package arvados
 import (
 	"bytes"
 	"crypto/md5"
-	"crypto/sha1"
 	"errors"
 	"fmt"
 	"io"
@@ -33,6 +32,9 @@ type keepClientStub struct {
 	blocks      map[string][]byte
 	refreshable map[string]bool
 	onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
+	authToken   string               // client's auth token (used for signing locators)
+	sigkey      string               // blob signing key
+	sigttl      time.Duration        // blob signing ttl
 	sync.RWMutex
 }
 
@@ -49,7 +51,7 @@ func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error
 }
 
 func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
-	locator := fmt.Sprintf("%x+%d+A12345 at abcde", md5.Sum(p), len(p))
+	locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
 	buf := make([]byte, len(p))
 	copy(buf, p)
 	if kcs.onPut != nil {
@@ -61,9 +63,12 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
 	return locator, 1, nil
 }
 
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
 
 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+	if strings.Contains(locator, "+A") {
+		return locator, nil
+	}
 	kcs.Lock()
 	defer kcs.Unlock()
 	if strings.Contains(locator, "+R") {
@@ -74,8 +79,9 @@ func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
 			return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
 		}
 	}
-	fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
-	return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+	locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+	locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+	return locator, nil
 }
 
 type CollectionFSSuite struct {
@@ -92,7 +98,11 @@ func (s *CollectionFSSuite) SetUpTest(c *check.C) {
 	s.kc = &keepClientStub{
 		blocks: map[string][]byte{
 			"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-		}}
+		},
+		sigkey:    fixtureBlobSigningKey,
+		sigttl:    fixtureBlobSigningTTL,
+		authToken: fixtureActiveToken,
+	}
 	s.fs, err = s.coll.FileSystem(s.client, s.kc)
 	c.Assert(err, check.IsNil)
 }
diff --git a/sdk/go/arvados/fs_lookup.go b/sdk/go/arvados/fs_lookup.go
index ad44ef22d..cb4ccfcf9 100644
--- a/sdk/go/arvados/fs_lookup.go
+++ b/sdk/go/arvados/fs_lookup.go
@@ -26,6 +26,20 @@ type lookupnode struct {
 	staleOne  map[string]time.Time
 }
 
+// Sync flushes pending writes for loaded children and, if successful,
+// triggers a reload on next lookup.
+func (ln *lookupnode) Sync() error {
+	err := ln.treenode.Sync()
+	if err != nil {
+		return err
+	}
+	ln.staleLock.Lock()
+	ln.staleAll = time.Time{}
+	ln.staleOne = nil
+	ln.staleLock.Unlock()
+	return nil
+}
+
 func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
 	ln.staleLock.Lock()
 	defer ln.staleLock.Unlock()
diff --git a/sdk/go/arvados/fs_project_test.go b/sdk/go/arvados/fs_project_test.go
index 5628dcc9c..dd35323b7 100644
--- a/sdk/go/arvados/fs_project_test.go
+++ b/sdk/go/arvados/fs_project_test.go
@@ -199,6 +199,22 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
 	err = wf.Close()
 	c.Check(err, check.IsNil)
 
+	err = project.Sync()
+	c.Check(err, check.IsNil)
+	_, err = s.fs.Open("/home/A Project/oob/test.txt")
+	c.Check(err, check.IsNil)
+
+	// Sync again to mark the project dir as stale, so the
+	// collection gets reloaded from the controller on next
+	// lookup.
+	err = project.Sync()
+	c.Check(err, check.IsNil)
+
+	// Ensure collection was flushed by Sync
+	var latest Collection
+	err = s.client.RequestAndDecode(&latest, "GET", "arvados/v1/collections/"+oob.UUID, nil, nil)
+	c.Check(latest.ManifestText, check.Matches, `.*:test.txt.*\n`)
+
 	// Delete test.txt behind s.fs's back by updating the
 	// collection record with an empty ManifestText.
 	err = s.client.RequestAndDecode(nil, "PATCH", "arvados/v1/collections/"+oob.UUID, nil, map[string]interface{}{
@@ -209,8 +225,6 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
 	})
 	c.Assert(err, check.IsNil)
 
-	err = project.Sync()
-	c.Check(err, check.IsNil)
 	_, err = s.fs.Open("/home/A Project/oob/test.txt")
 	c.Check(err, check.NotNil)
 	_, err = s.fs.Open("/home/A Project/oob")
@@ -220,7 +234,7 @@ func (s *SiteFSSuite) TestProjectUpdatedByOther(c *check.C) {
 	c.Assert(err, check.IsNil)
 
 	err = project.Sync()
-	c.Check(err, check.IsNil)
+	c.Check(err, check.NotNil) // can't update the deleted collection
 	_, err = s.fs.Open("/home/A Project/oob")
-	c.Check(err, check.NotNil)
+	c.Check(err, check.IsNil) // parent dir still has old collection -- didn't reload, because Sync failed
 }
diff --git a/sdk/go/arvados/fs_site_test.go b/sdk/go/arvados/fs_site_test.go
index fff0b7e01..96ed74de8 100644
--- a/sdk/go/arvados/fs_site_test.go
+++ b/sdk/go/arvados/fs_site_test.go
@@ -7,6 +7,7 @@ package arvados
 import (
 	"net/http"
 	"os"
+	"time"
 
 	check "gopkg.in/check.v1"
 )
@@ -22,6 +23,8 @@ const (
 	fixtureFooCollectionPDH        = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
 	fixtureFooCollection           = "zzzzz-4zz18-fy296fx3hot09f7"
 	fixtureNonexistentCollection   = "zzzzz-4zz18-totallynotexist"
+	fixtureBlobSigningKey          = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+	fixtureBlobSigningTTL          = 336 * time.Hour
 )
 
 var _ = check.Suite(&SiteFSSuite{})
@@ -41,7 +44,11 @@ func (s *SiteFSSuite) SetUpTest(c *check.C) {
 	s.kc = &keepClientStub{
 		blocks: map[string][]byte{
 			"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
-		}}
+		},
+		sigkey:    fixtureBlobSigningKey,
+		sigttl:    fixtureBlobSigningTTL,
+		authToken: fixtureActiveToken,
+	}
 	s.fs = s.client.SiteFileSystem(s.kc)
 }
 

commit ae6e78f726bfcfd2534e6732839a6a254cb330db
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Tue Jul 14 10:22:09 2020 -0400

    16535: Fix error response codes for invalid names (4xx, not 5xx).
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
index c6f501e49..5043c65ec 100644
--- a/services/keep-web/s3.go
+++ b/services/keep-web/s3.go
@@ -5,6 +5,7 @@
 package main
 
 import (
+	"errors"
 	"fmt"
 	"io"
 	"net/http"
@@ -43,27 +44,38 @@ func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
 	fs := client.SiteFileSystem(kc)
 	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 
+	fi, err := fs.Stat(r.URL.Path)
 	switch r.Method {
 	case "GET":
-		fi, err := fs.Stat(r.URL.Path)
-		if os.IsNotExist(err) {
-			http.Error(w, err.Error(), http.StatusNotFound)
-			return true
-		} else if err != nil {
-			http.Error(w, err.Error(), http.StatusInternalServerError)
-			return true
-		} else if fi.IsDir() {
+		if os.IsNotExist(err) ||
+			(err != nil && err.Error() == "not a directory") ||
+			(fi != nil && fi.IsDir()) {
 			http.Error(w, "not found", http.StatusNotFound)
+			return true
 		}
 		http.FileServer(fs).ServeHTTP(w, r)
 		return true
 	case "PUT":
+		if strings.HasSuffix(r.URL.Path, "/") {
+			http.Error(w, "invalid object name (trailing '/' char)", http.StatusBadRequest)
+			return true
+		}
+		if err != nil && err.Error() == "not a directory" {
+			// requested foo/bar, but foo is a file
+			http.Error(w, "object name conflicts with existing object", http.StatusBadRequest)
+			return true
+		}
 		f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
 		if os.IsNotExist(err) {
 			// create missing intermediate directories, then try again
 			for i, c := range r.URL.Path {
 				if i > 0 && c == '/' {
 					dir := r.URL.Path[:i]
+					if strings.HasSuffix(dir, "/") {
+						err = errors.New("invalid object name (consecutive '/' chars)")
+						http.Error(w, err.Error(), http.StatusBadRequest)
+						return true
+					}
 					err := fs.Mkdir(dir, 0755)
 					if err != nil && err != os.ErrExist {
 						err = fmt.Errorf("mkdir %q failed: %w", dir, err)
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index fbfef7b91..c4c216c8b 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -9,6 +9,7 @@ import (
 	"crypto/rand"
 	"io/ioutil"
 	"os"
+	"sync"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -110,7 +111,7 @@ func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix
 	c.Check(err, check.IsNil)
 
 	rdr, err = bucket.GetReader(prefix + "missingfile")
-	c.Check(err, check.NotNil)
+	c.Check(err, check.ErrorMatches, `404 Not Found`)
 
 	rdr, err = bucket.GetReader(prefix + "sailboat.txt")
 	c.Assert(err, check.IsNil)
@@ -152,7 +153,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
 		objname := prefix + trial.path
 
 		_, err := bucket.GetReader(objname)
-		c.Assert(err, check.NotNil)
+		c.Assert(err, check.ErrorMatches, `404 Not Found`)
 
 		buf := make([]byte, trial.size)
 		rand.Read(buf)
@@ -167,7 +168,7 @@ func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket,
 		buf2, err := ioutil.ReadAll(rdr)
 		c.Check(err, check.IsNil)
 		c.Check(buf2, check.HasLen, len(buf))
-		c.Check(buf2, check.DeepEquals, buf)
+		c.Check(bytes.Equal(buf, buf2), check.Equals, true)
 	}
 }
 
@@ -182,6 +183,7 @@ func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
 	s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
 }
 func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
+	var wg sync.WaitGroup
 	for _, trial := range []struct {
 		path string
 	}{
@@ -209,19 +211,25 @@ func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket,
 			path: "",
 		},
 	} {
-		c.Logf("=== %v", trial)
+		trial := trial
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			c.Logf("=== %v", trial)
 
-		objname := prefix + trial.path
+			objname := prefix + trial.path
 
-		buf := make([]byte, 1234)
-		rand.Read(buf)
+			buf := make([]byte, 1234)
+			rand.Read(buf)
 
-		err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
-		if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", objname)) {
-			continue
-		}
+			err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+			if !c.Check(err, check.ErrorMatches, `400 Bad.*`, check.Commentf("PUT %q should fail", objname)) {
+				return
+			}
 
-		_, err = bucket.GetReader(objname)
-		c.Check(err, check.NotNil)
+			_, err = bucket.GetReader(objname)
+			c.Check(err, check.ErrorMatches, `404 Not Found`, check.Commentf("GET %q should return 404", objname))
+		}()
 	}
+	wg.Wait()
 }

commit 9b07afa9e26a364bb7f72d5741db0c276a90e495
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Tue Jul 14 09:23:21 2020 -0400

    16535: Refactor Sync implementation.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/fs_base.go b/sdk/go/arvados/fs_base.go
index d06aba369..5e57fed3b 100644
--- a/sdk/go/arvados/fs_base.go
+++ b/sdk/go/arvados/fs_base.go
@@ -31,6 +31,10 @@ var (
 	ErrPermission        = os.ErrPermission
 )
 
+type syncer interface {
+	Sync() error
+}
+
 // A File is an *os.File-like interface for reading and writing files
 // in a FileSystem.
 type File interface {
@@ -299,6 +303,22 @@ func (n *treenode) Readdir() (fi []os.FileInfo, err error) {
 	return
 }
 
+func (n *treenode) Sync() error {
+	n.RLock()
+	defer n.RUnlock()
+	for _, inode := range n.inodes {
+		syncer, ok := inode.(syncer)
+		if !ok {
+			return ErrInvalidOperation
+		}
+		err := syncer.Sync()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 type fileSystem struct {
 	root inode
 	fsBackend
@@ -576,8 +596,11 @@ func (fs *fileSystem) remove(name string, recursive bool) error {
 }
 
 func (fs *fileSystem) Sync() error {
-	log.Printf("TODO: sync fileSystem")
-	return ErrInvalidOperation
+	if syncer, ok := fs.root.(syncer); ok {
+		return syncer.Sync()
+	} else {
+		return ErrInvalidOperation
+	}
 }
 
 func (fs *fileSystem) Flush(string, bool) error {
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 1f888a4e8..254b90c81 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -87,15 +87,16 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
 	return dn.realinode().Child(name, replace)
 }
 
-// Sync is currently unimplemented, except when it's a no-op because
-// the real inode hasn't been created.
+// Sync is a no-op if the real inode hasn't even been created yet.
 func (dn *deferrednode) Sync() error {
 	dn.mtx.Lock()
 	defer dn.mtx.Unlock()
-	if dn.created {
-		return ErrInvalidArgument
-	} else {
+	if !dn.created {
 		return nil
+	} else if syncer, ok := dn.wrapped.(syncer); ok {
+		return syncer.Sync()
+	} else {
+		return ErrInvalidOperation
 	}
 }
 
diff --git a/sdk/go/arvados/fs_lookup.go b/sdk/go/arvados/fs_lookup.go
index 42322a14a..ad44ef22d 100644
--- a/sdk/go/arvados/fs_lookup.go
+++ b/sdk/go/arvados/fs_lookup.go
@@ -15,7 +15,7 @@ import (
 //
 // See (*customFileSystem)MountUsers for example usage.
 type lookupnode struct {
-	inode
+	treenode
 	loadOne func(parent inode, name string) (inode, error)
 	loadAll func(parent inode) ([]inode, error)
 	stale   func(time.Time) bool
@@ -36,7 +36,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
 			return nil, err
 		}
 		for _, child := range all {
-			_, err = ln.inode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
+			_, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
 				return child, nil
 			})
 			if err != nil {
@@ -49,7 +49,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
 		// newer than ln.staleAll. Reclaim memory.
 		ln.staleOne = nil
 	}
-	return ln.inode.Readdir()
+	return ln.treenode.Readdir()
 }
 
 func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
@@ -57,7 +57,7 @@ func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (in
 	defer ln.staleLock.Unlock()
 	checkTime := time.Now()
 	if ln.stale(ln.staleAll) && ln.stale(ln.staleOne[name]) {
-		_, err := ln.inode.Child(name, func(inode) (inode, error) {
+		_, err := ln.treenode.Child(name, func(inode) (inode, error) {
 			return ln.loadOne(ln, name)
 		})
 		if err != nil {
@@ -69,5 +69,5 @@ func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (in
 			ln.staleOne[name] = checkTime
 		}
 	}
-	return ln.inode.Child(name, replace)
+	return ln.treenode.Child(name, replace)
 }
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 8b61ccfd6..95b2f71c2 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -5,7 +5,6 @@
 package arvados
 
 import (
-	"fmt"
 	"os"
 	"strings"
 	"sync"
@@ -41,7 +40,7 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
 			thr:       newThrottle(concurrentWriters),
 		},
 	}
-	root.inode = &treenode{
+	root.treenode = treenode{
 		fs:     fs,
 		parent: root,
 		fileinfo: fileinfo{
@@ -55,9 +54,9 @@ func (c *Client) CustomFileSystem(kc keepClient) CustomFileSystem {
 }
 
 func (fs *customFileSystem) MountByID(mount string) {
-	fs.root.inode.Child(mount, func(inode) (inode, error) {
+	fs.root.treenode.Child(mount, func(inode) (inode, error) {
 		return &vdirnode{
-			inode: &treenode{
+			treenode: treenode{
 				fs:     fs,
 				parent: fs.root,
 				inodes: make(map[string]inode),
@@ -73,18 +72,18 @@ func (fs *customFileSystem) MountByID(mount string) {
 }
 
 func (fs *customFileSystem) MountProject(mount, uuid string) {
-	fs.root.inode.Child(mount, func(inode) (inode, error) {
+	fs.root.treenode.Child(mount, func(inode) (inode, error) {
 		return fs.newProjectNode(fs.root, mount, uuid), nil
 	})
 }
 
 func (fs *customFileSystem) MountUsers(mount string) {
-	fs.root.inode.Child(mount, func(inode) (inode, error) {
+	fs.root.treenode.Child(mount, func(inode) (inode, error) {
 		return &lookupnode{
 			stale:   fs.Stale,
 			loadOne: fs.usersLoadOne,
 			loadAll: fs.usersLoadAll,
-			inode: &treenode{
+			treenode: treenode{
 				fs:     fs,
 				parent: fs.root,
 				inodes: make(map[string]inode),
@@ -116,43 +115,7 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
 }
 
 func (fs *customFileSystem) Sync() error {
-	fs.staleLock.Lock()
-	fs.staleThreshold = time.Now()
-	fs.staleLock.Unlock()
-	return fs.syncTree("/", fs.root.inode)
-}
-
-// syncTree calls node.Sync() if it has its own Sync method, otherwise
-// it calls syncTree() on all of node's children.
-//
-// Returns ErrInvalidArgument if node does not implement Sync() and
-// isn't a directory (or if Readdir() fails for any other reason).
-func (fs *customFileSystem) syncTree(path string, node inode) error {
-	if vn, ok := node.(*vdirnode); ok {
-		node = vn.inode
-	}
-	if syncer, ok := node.(interface{ Sync() error }); ok {
-		err := syncer.Sync()
-		if err != nil {
-			return fmt.Errorf("%s: %T: %w", path, syncer, err)
-		}
-		return nil
-	}
-	fis, err := node.Readdir()
-	if err != nil {
-		return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
-	}
-	for _, fi := range fis {
-		child, err := node.Child(fi.Name(), nil)
-		if err != nil {
-			continue
-		}
-		err = fs.syncTree(path+"/"+fi.Name(), child)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
+	return fs.root.Sync()
 }
 
 // Stale returns true if information obtained at time t should be
@@ -197,7 +160,7 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
 		stale:   fs.Stale,
 		loadOne: func(parent inode, name string) (inode, error) { return fs.projectsLoadOne(parent, uuid, name) },
 		loadAll: func(parent inode) ([]inode, error) { return fs.projectsLoadAll(parent, uuid) },
-		inode: &treenode{
+		treenode: treenode{
 			fs:     fs,
 			parent: root,
 			inodes: make(map[string]inode),
@@ -217,17 +180,17 @@ func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode
 // create() can return either a new node, which will be added to the
 // treenode, or nil for ENOENT.
 type vdirnode struct {
-	inode
+	treenode
 	create func(parent inode, name string) inode
 }
 
 func (vn *vdirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
-	return vn.inode.Child(name, func(existing inode) (inode, error) {
+	return vn.treenode.Child(name, func(existing inode) (inode, error) {
 		if existing == nil && vn.create != nil {
 			existing = vn.create(vn, name)
 			if existing != nil {
 				existing.SetParent(vn, name)
-				vn.inode.(*treenode).fileinfo.modTime = time.Now()
+				vn.treenode.fileinfo.modTime = time.Now()
 			}
 		}
 		if replace == nil {

commit 2e2bb873da25e0f7a25a9a9532911bfb5ad69668
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Jul 13 11:57:37 2020 -0400

    16535: collectionFileSystem implements inode, eliminating wrapper.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 5970ec612..0edc48162 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -121,6 +121,62 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
 	}
 }
 
+func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+	return fs.rootnode().Child(name, replace)
+}
+
+func (fs *collectionFileSystem) FS() FileSystem {
+	return fs
+}
+
+func (fs *collectionFileSystem) FileInfo() os.FileInfo {
+	return fs.rootnode().FileInfo()
+}
+
+func (fs *collectionFileSystem) IsDir() bool {
+	return true
+}
+
+func (fs *collectionFileSystem) Lock() {
+	fs.rootnode().Lock()
+}
+
+func (fs *collectionFileSystem) Unlock() {
+	fs.rootnode().Unlock()
+}
+
+func (fs *collectionFileSystem) RLock() {
+	fs.rootnode().RLock()
+}
+
+func (fs *collectionFileSystem) RUnlock() {
+	fs.rootnode().RUnlock()
+}
+
+func (fs *collectionFileSystem) Parent() inode {
+	return fs.rootnode().Parent()
+}
+
+func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+	return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
+	return 0, ptr, ErrInvalidOperation
+}
+
+func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
+	return fs.rootnode().Readdir()
+}
+
+func (fs *collectionFileSystem) SetParent(parent inode, name string) {
+	fs.rootnode().SetParent(parent, name)
+}
+
+func (fs *collectionFileSystem) Truncate(int64) error {
+	return ErrInvalidOperation
+}
+
 func (fs *collectionFileSystem) Sync() error {
 	if fs.uuid == "" {
 		return nil
@@ -193,25 +249,6 @@ func (fs *collectionFileSystem) Size() int64 {
 	return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
-// asChildNode() repackages fs as an inode that can be used as a child
-// node in a different fs. Not goroutine-safe.
-//
-// After calling asChildNode(), the caller should not use fs directly.
-func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
-	root := fs.rootnode().(*dirnode)
-	root.SetParent(parent, name)
-	return &collectionfsnode{dirnode: root, fs: fs}
-}
-
-type collectionfsnode struct {
-	*dirnode
-	fs *collectionFileSystem
-}
-
-func (cn *collectionfsnode) Sync() error {
-	return cn.fs.Sync()
-}
-
 // filenodePtr is an offset into a file that is (usually) efficient to
 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
 // then
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 4eb48b2f7..1f888a4e8 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -32,12 +32,14 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
 			log.Printf("BUG: unhandled error: %s", err)
 			return placeholder
 		}
-		cfs, err := coll.FileSystem(fs, fs)
+		newfs, err := coll.FileSystem(fs, fs)
 		if err != nil {
 			log.Printf("BUG: unhandled error: %s", err)
 			return placeholder
 		}
-		return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
+		cfs := newfs.(*collectionFileSystem)
+		cfs.SetParent(parent, coll.Name)
+		return cfs
 	}}
 }
 
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 2e131c33f..8b61ccfd6 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -183,11 +183,13 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
 	if err != nil {
 		return nil
 	}
-	cfs, err := coll.FileSystem(fs, fs)
+	newfs, err := coll.FileSystem(fs, fs)
 	if err != nil {
 		return nil
 	}
-	return cfs.(*collectionFileSystem).asChildNode(parent, id)
+	cfs := newfs.(*collectionFileSystem)
+	cfs.SetParent(parent, id)
+	return cfs
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {

commit 7e31b72e90734b7ababd254e0d38999d86dad758
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Jul 13 10:46:09 2020 -0400

    16535: Refactor test suite to accommodate "project bucket" tests.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
index 600b83496..fbfef7b91 100644
--- a/services/keep-web/s3_test.go
+++ b/services/keep-web/s3_test.go
@@ -20,11 +20,30 @@ import (
 	check "gopkg.in/check.v1"
 )
 
-func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+type s3stage struct {
+	arv        *arvados.Client
+	proj       arvados.Group
+	projbucket *s3.Bucket
+	coll       arvados.Collection
+	collbucket *s3.Bucket
+}
+
+func (s *IntegrationSuite) s3setup(c *check.C) s3stage {
+	var proj arvados.Group
 	var coll arvados.Collection
 	arv := arvados.NewClientFromEnv()
 	arv.AuthToken = arvadostest.ActiveToken
-	err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+	err := arv.RequestAndDecode(&proj, "POST", "arvados/v1/groups", nil, map[string]interface{}{
+		"group": map[string]interface{}{
+			"group_class": "project",
+			"name":        "keep-web s3 test",
+		},
+		"ensure_unique_name": true,
+	})
+	c.Assert(err, check.IsNil)
+	err = arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+		"owner_uuid":    proj.UUID,
+		"name":          "keep-web s3 test collection",
 		"manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
 	}})
 	c.Assert(err, check.IsNil)
@@ -49,23 +68,40 @@ func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collect
 		S3Endpoint: "http://" + s.testServer.Addr,
 	}
 	client := s3.New(*auth, region)
-	bucket := &s3.Bucket{
-		S3:   client,
-		Name: coll.UUID,
+	return s3stage{
+		arv:  arv,
+		proj: proj,
+		projbucket: &s3.Bucket{
+			S3:   client,
+			Name: proj.UUID,
+		},
+		coll: coll,
+		collbucket: &s3.Bucket{
+			S3:   client,
+			Name: coll.UUID,
+		},
 	}
-	return arv, coll, bucket
 }
 
-func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
-	err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
-	c.Check(err, check.IsNil)
+func (stage s3stage) teardown(c *check.C) {
+	if stage.coll.UUID != "" {
+		err := stage.arv.RequestAndDecode(&stage.coll, "DELETE", "arvados/v1/collections/"+stage.coll.UUID, nil, nil)
+		c.Check(err, check.IsNil)
+	}
 }
 
-func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
-	arv, coll, bucket := s.s3setup(c)
-	defer s.s3teardown(c, arv, coll)
-
-	rdr, err := bucket.GetReader("emptyfile")
+func (s *IntegrationSuite) TestS3CollectionGetObject(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3GetObject(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectGetObject(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3GetObject(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3GetObject(c *check.C, bucket *s3.Bucket, prefix string) {
+	rdr, err := bucket.GetReader(prefix + "emptyfile")
 	c.Assert(err, check.IsNil)
 	buf, err := ioutil.ReadAll(rdr)
 	c.Check(err, check.IsNil)
@@ -73,11 +109,11 @@ func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
 	err = rdr.Close()
 	c.Check(err, check.IsNil)
 
-	rdr, err = bucket.GetReader("missingfile")
+	rdr, err = bucket.GetReader(prefix + "missingfile")
 	c.Check(err, check.NotNil)
 
-	rdr, err = bucket.GetReader("sailboat.txt")
-	c.Check(err, check.IsNil)
+	rdr, err = bucket.GetReader(prefix + "sailboat.txt")
+	c.Assert(err, check.IsNil)
 	buf, err = ioutil.ReadAll(rdr)
 	c.Check(err, check.IsNil)
 	c.Check(buf, check.DeepEquals, []byte("⛵\n"))
@@ -85,37 +121,46 @@ func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
 	c.Check(err, check.IsNil)
 }
 
-func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
-	arv, coll, bucket := s.s3setup(c)
-	defer s.s3teardown(c, arv, coll)
-
+func (s *IntegrationSuite) TestS3CollectionPutObjectSuccess(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3PutObjectSuccess(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectSuccess(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3PutObjectSuccess(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectSuccess(c *check.C, bucket *s3.Bucket, prefix string) {
 	for _, trial := range []struct {
-		objname string
-		size    int
+		path string
+		size int
 	}{
 		{
-			objname: "newfile",
-			size:    128000000,
+			path: "newfile",
+			size: 128000000,
 		}, {
-			objname: "newdir/newfile",
-			size:    1 << 26,
+			path: "newdir/newfile",
+			size: 1 << 26,
 		}, {
-			objname: "newdir1/newdir2/newfile",
-			size:    0,
+			path: "newdir1/newdir2/newfile",
+			size: 0,
 		},
 	} {
 		c.Logf("=== %v", trial)
 
-		_, err := bucket.GetReader(trial.objname)
+		objname := prefix + trial.path
+
+		_, err := bucket.GetReader(objname)
 		c.Assert(err, check.NotNil)
 
 		buf := make([]byte, trial.size)
 		rand.Read(buf)
 
-		err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+		err = bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
 		c.Check(err, check.IsNil)
 
-		rdr, err := bucket.GetReader(trial.objname)
+		rdr, err := bucket.GetReader(objname)
 		if !c.Check(err, check.IsNil) {
 			continue
 		}
@@ -126,48 +171,57 @@ func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
 	}
 }
 
-func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
-	arv, coll, bucket := s.s3setup(c)
-	defer s.s3teardown(c, arv, coll)
-
+func (s *IntegrationSuite) TestS3CollectionPutObjectFailure(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3PutObjectFailure(c, stage.collbucket, "")
+}
+func (s *IntegrationSuite) TestS3ProjectPutObjectFailure(c *check.C) {
+	stage := s.s3setup(c)
+	defer stage.teardown(c)
+	s.testS3PutObjectFailure(c, stage.projbucket, stage.coll.Name+"/")
+}
+func (s *IntegrationSuite) testS3PutObjectFailure(c *check.C, bucket *s3.Bucket, prefix string) {
 	for _, trial := range []struct {
-		objname string
+		path string
 	}{
 		{
-			objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+			path: "emptyfile/newname", // emptyfile exists, see s3setup()
 		}, {
-			objname: "emptyfile/", // emptyfile exists, see s3setup()
+			path: "emptyfile/", // emptyfile exists, see s3setup()
 		}, {
-			objname: "emptydir", // dir already exists, see s3setup()
+			path: "emptydir", // dir already exists, see s3setup()
 		}, {
-			objname: "emptydir/",
+			path: "emptydir/",
 		}, {
-			objname: "emptydir//",
+			path: "emptydir//",
 		}, {
-			objname: "newdir/",
+			path: "newdir/",
 		}, {
-			objname: "newdir//",
+			path: "newdir//",
 		}, {
-			objname: "/",
+			path: "/",
 		}, {
-			objname: "//",
+			path: "//",
 		}, {
-			objname: "foo//bar",
+			path: "foo//bar",
 		}, {
-			objname: "",
+			path: "",
 		},
 	} {
 		c.Logf("=== %v", trial)
 
+		objname := prefix + trial.path
+
 		buf := make([]byte, 1234)
 		rand.Read(buf)
 
-		err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
-		if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+		err := bucket.PutReader(objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+		if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", objname)) {
 			continue
 		}
 
-		_, err = bucket.GetReader(trial.objname)
+		_, err = bucket.GetReader(objname)
 		c.Check(err, check.NotNil)
 	}
 }

commit a0aebd60d543eb9bafb95ff21a695a50e473ca56
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Jul 13 10:45:47 2020 -0400

    16535: Move s3 handler to s3.go.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index b8071b914..915924e28 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -6,7 +6,6 @@ package main
 
 import (
 	"encoding/json"
-	"fmt"
 	"html"
 	"html/template"
 	"io"
@@ -228,17 +227,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 		w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
 	}
 
-	if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
-		split := strings.SplitN(auth[4:], ":", 2)
-		if len(split) < 2 {
-			w.WriteHeader(http.StatusUnauthorized)
-			return
-		}
-		h.serveS3(w, r, split[0])
-		return
-	} else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
-		w.WriteHeader(http.StatusBadRequest)
-		fmt.Println(w, "V4 signature is not supported")
+	if h.serveS3(w, r) {
 		return
 	}
 
@@ -545,82 +534,6 @@ func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosCli
 	return
 }
 
-func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
-	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
-	if err != nil {
-		http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
-		return
-	}
-	defer release()
-
-	r.URL.Path = "/by_id" + r.URL.Path
-
-	fs := client.SiteFileSystem(kc)
-	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
-
-	switch r.Method {
-	case "GET":
-		fi, err := fs.Stat(r.URL.Path)
-		if os.IsNotExist(err) {
-			http.Error(w, err.Error(), http.StatusNotFound)
-			return
-		} else if err != nil {
-			http.Error(w, err.Error(), http.StatusInternalServerError)
-			return
-		} else if fi.IsDir() {
-			http.Error(w, "not found", http.StatusNotFound)
-		}
-		http.FileServer(fs).ServeHTTP(w, r)
-		return
-	case "PUT":
-		f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
-		if os.IsNotExist(err) {
-			// create missing intermediate directories, then try again
-			for i, c := range r.URL.Path {
-				if i > 0 && c == '/' {
-					dir := r.URL.Path[:i]
-					err := fs.Mkdir(dir, 0755)
-					if err != nil && err != os.ErrExist {
-						err = fmt.Errorf("mkdir %q failed: %w", dir, err)
-						http.Error(w, err.Error(), http.StatusInternalServerError)
-						return
-					}
-				}
-			}
-			f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
-		}
-		if err != nil {
-			err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
-			http.Error(w, err.Error(), http.StatusBadRequest)
-			return
-		}
-		defer f.Close()
-		_, err = io.Copy(f, r.Body)
-		if err != nil {
-			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
-			http.Error(w, err.Error(), http.StatusBadGateway)
-			return
-		}
-		err = f.Close()
-		if err != nil {
-			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
-			http.Error(w, err.Error(), http.StatusBadGateway)
-			return
-		}
-		err = fs.Sync()
-		if err != nil {
-			err = fmt.Errorf("sync failed: %w", err)
-			http.Error(w, err.Error(), http.StatusInternalServerError)
-			return
-		}
-		w.WriteHeader(http.StatusOK)
-		return
-	default:
-		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
-		return
-	}
-}
-
 func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
 	if len(tokens) == 0 {
 		w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
diff --git a/services/keep-web/s3.go b/services/keep-web/s3.go
new file mode 100644
index 000000000..c6f501e49
--- /dev/null
+++ b/services/keep-web/s3.go
@@ -0,0 +1,107 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"strings"
+)
+
+// serveS3 handles r and returns true if r is a request from an S3
+// client, otherwise it returns false.
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request) bool {
+	var token string
+	if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+		split := strings.SplitN(auth[4:], ":", 2)
+		if len(split) < 2 {
+			w.WriteHeader(http.StatusUnauthorized)
+			return true
+		}
+		token = split[0]
+	} else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Println(w, "V4 signature is not supported")
+		return true
+	} else {
+		return false
+	}
+
+	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+	if err != nil {
+		http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+		return true
+	}
+	defer release()
+
+	r.URL.Path = "/by_id" + r.URL.Path
+
+	fs := client.SiteFileSystem(kc)
+	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+	switch r.Method {
+	case "GET":
+		fi, err := fs.Stat(r.URL.Path)
+		if os.IsNotExist(err) {
+			http.Error(w, err.Error(), http.StatusNotFound)
+			return true
+		} else if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return true
+		} else if fi.IsDir() {
+			http.Error(w, "not found", http.StatusNotFound)
+		}
+		http.FileServer(fs).ServeHTTP(w, r)
+		return true
+	case "PUT":
+		f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		if os.IsNotExist(err) {
+			// create missing intermediate directories, then try again
+			for i, c := range r.URL.Path {
+				if i > 0 && c == '/' {
+					dir := r.URL.Path[:i]
+					err := fs.Mkdir(dir, 0755)
+					if err != nil && err != os.ErrExist {
+						err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+						http.Error(w, err.Error(), http.StatusInternalServerError)
+						return true
+					}
+				}
+			}
+			f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		}
+		if err != nil {
+			err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return true
+		}
+		defer f.Close()
+		_, err = io.Copy(f, r.Body)
+		if err != nil {
+			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadGateway)
+			return true
+		}
+		err = f.Close()
+		if err != nil {
+			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadGateway)
+			return true
+		}
+		err = fs.Sync()
+		if err != nil {
+			err = fmt.Errorf("sync failed: %w", err)
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return true
+		}
+		w.WriteHeader(http.StatusOK)
+		return true
+	default:
+		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+		return true
+	}
+}

commit 86f966c2b2a14d9f30135cf2fc2b9cb08270d1b1
Author: Tom Clegg <tom at tomclegg.ca>
Date:   Mon Jul 13 09:55:57 2020 -0400

    16535: Add s3 endpoint.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at tomclegg.ca>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 37bd49491..5970ec612 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -193,6 +193,25 @@ func (fs *collectionFileSystem) Size() int64 {
 	return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
+// asChildNode() repackages fs as an inode that can be used as a child
+// node in a different fs. Not goroutine-safe.
+//
+// After calling asChildNode(), the caller should not use fs directly.
+func (fs *collectionFileSystem) asChildNode(parent inode, name string) *collectionfsnode {
+	root := fs.rootnode().(*dirnode)
+	root.SetParent(parent, name)
+	return &collectionfsnode{dirnode: root, fs: fs}
+}
+
+type collectionfsnode struct {
+	*dirnode
+	fs *collectionFileSystem
+}
+
+func (cn *collectionfsnode) Sync() error {
+	return cn.fs.Sync()
+}
+
 // filenodePtr is an offset into a file that is (usually) efficient to
 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
 // then
diff --git a/sdk/go/arvados/fs_deferred.go b/sdk/go/arvados/fs_deferred.go
index 439eaec7c..4eb48b2f7 100644
--- a/sdk/go/arvados/fs_deferred.go
+++ b/sdk/go/arvados/fs_deferred.go
@@ -37,9 +37,7 @@ func deferredCollectionFS(fs FileSystem, parent inode, coll Collection) inode {
 			log.Printf("BUG: unhandled error: %s", err)
 			return placeholder
 		}
-		root := cfs.rootnode()
-		root.SetParent(parent, coll.Name)
-		return root
+		return cfs.(*collectionFileSystem).asChildNode(parent, coll.Name)
 	}}
 }
 
@@ -87,6 +85,18 @@ func (dn *deferrednode) Child(name string, replace func(inode) (inode, error)) (
 	return dn.realinode().Child(name, replace)
 }
 
+// Sync is currently unimplemented, except when it's a no-op because
+// the real inode hasn't been created.
+func (dn *deferrednode) Sync() error {
+	dn.mtx.Lock()
+	defer dn.mtx.Unlock()
+	if dn.created {
+		return ErrInvalidArgument
+	} else {
+		return nil
+	}
+}
+
 func (dn *deferrednode) Truncate(size int64) error       { return dn.realinode().Truncate(size) }
 func (dn *deferrednode) SetParent(p inode, name string)  { dn.realinode().SetParent(p, name) }
 func (dn *deferrednode) IsDir() bool                     { return dn.currentinode().IsDir() }
diff --git a/sdk/go/arvados/fs_site.go b/sdk/go/arvados/fs_site.go
index 7826d335c..2e131c33f 100644
--- a/sdk/go/arvados/fs_site.go
+++ b/sdk/go/arvados/fs_site.go
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+	"fmt"
 	"os"
 	"strings"
 	"sync"
@@ -116,8 +117,41 @@ func (c *Client) SiteFileSystem(kc keepClient) CustomFileSystem {
 
 func (fs *customFileSystem) Sync() error {
 	fs.staleLock.Lock()
-	defer fs.staleLock.Unlock()
 	fs.staleThreshold = time.Now()
+	fs.staleLock.Unlock()
+	return fs.syncTree("/", fs.root.inode)
+}
+
+// syncTree calls node.Sync() if it has its own Sync method, otherwise
+// it calls syncTree() on all of node's children.
+//
+// Returns ErrInvalidArgument if node does not implement Sync() and
+// isn't a directory (or if Readdir() fails for any other reason).
+func (fs *customFileSystem) syncTree(path string, node inode) error {
+	if vn, ok := node.(*vdirnode); ok {
+		node = vn.inode
+	}
+	if syncer, ok := node.(interface{ Sync() error }); ok {
+		err := syncer.Sync()
+		if err != nil {
+			return fmt.Errorf("%s: %T: %w", path, syncer, err)
+		}
+		return nil
+	}
+	fis, err := node.Readdir()
+	if err != nil {
+		return fmt.Errorf("%s: %T: %w", path, node, ErrInvalidArgument)
+	}
+	for _, fi := range fis {
+		child, err := node.Child(fi.Name(), nil)
+		if err != nil {
+			continue
+		}
+		err = fs.syncTree(path+"/"+fi.Name(), child)
+		if err != nil {
+			return err
+		}
+	}
 	return nil
 }
 
@@ -153,9 +187,7 @@ func (fs *customFileSystem) mountCollection(parent inode, id string) inode {
 	if err != nil {
 		return nil
 	}
-	root := cfs.rootnode()
-	root.SetParent(parent, id)
-	return root
+	return cfs.(*collectionFileSystem).asChildNode(parent, id)
 }
 
 func (fs *customFileSystem) newProjectNode(root inode, name, uuid string) inode {
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 643ca4f58..b8071b914 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -6,6 +6,7 @@ package main
 
 import (
 	"encoding/json"
+	"fmt"
 	"html"
 	"html/template"
 	"io"
@@ -227,6 +228,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 		w.Header().Set("Access-Control-Expose-Headers", "Content-Range")
 	}
 
+	if auth := r.Header.Get("Authorization"); strings.HasPrefix(auth, "AWS ") {
+		split := strings.SplitN(auth[4:], ":", 2)
+		if len(split) < 2 {
+			w.WriteHeader(http.StatusUnauthorized)
+			return
+		}
+		h.serveS3(w, r, split[0])
+		return
+	} else if strings.HasPrefix(auth, "AWS4-HMAC-SHA256 ") {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Println(w, "V4 signature is not supported")
+		return
+	}
+
 	pathParts := strings.Split(r.URL.Path[1:], "/")
 
 	var stripParts int
@@ -509,6 +524,103 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 	}
 }
 
+func (h *handler) getClients(reqID, token string) (arv *arvadosclient.ArvadosClient, kc *keepclient.KeepClient, client *arvados.Client, release func(), err error) {
+	arv = h.clientPool.Get()
+	if arv == nil {
+		return nil, nil, nil, nil, err
+	}
+	release = func() { h.clientPool.Put(arv) }
+	arv.ApiToken = token
+	kc, err = keepclient.MakeKeepClient(arv)
+	if err != nil {
+		release()
+		return
+	}
+	kc.RequestID = reqID
+	client = (&arvados.Client{
+		APIHost:   arv.ApiServer,
+		AuthToken: arv.ApiToken,
+		Insecure:  arv.ApiInsecure,
+	}).WithRequestID(reqID)
+	return
+}
+
+func (h *handler) serveS3(w http.ResponseWriter, r *http.Request, token string) {
+	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), token)
+	if err != nil {
+		http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
+		return
+	}
+	defer release()
+
+	r.URL.Path = "/by_id" + r.URL.Path
+
+	fs := client.SiteFileSystem(kc)
+	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
+
+	switch r.Method {
+	case "GET":
+		fi, err := fs.Stat(r.URL.Path)
+		if os.IsNotExist(err) {
+			http.Error(w, err.Error(), http.StatusNotFound)
+			return
+		} else if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		} else if fi.IsDir() {
+			http.Error(w, "not found", http.StatusNotFound)
+		}
+		http.FileServer(fs).ServeHTTP(w, r)
+		return
+	case "PUT":
+		f, err := fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		if os.IsNotExist(err) {
+			// create missing intermediate directories, then try again
+			for i, c := range r.URL.Path {
+				if i > 0 && c == '/' {
+					dir := r.URL.Path[:i]
+					err := fs.Mkdir(dir, 0755)
+					if err != nil && err != os.ErrExist {
+						err = fmt.Errorf("mkdir %q failed: %w", dir, err)
+						http.Error(w, err.Error(), http.StatusInternalServerError)
+						return
+					}
+				}
+			}
+			f, err = fs.OpenFile(r.URL.Path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		}
+		if err != nil {
+			err = fmt.Errorf("open %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadRequest)
+			return
+		}
+		defer f.Close()
+		_, err = io.Copy(f, r.Body)
+		if err != nil {
+			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadGateway)
+			return
+		}
+		err = f.Close()
+		if err != nil {
+			err = fmt.Errorf("write to %q failed: %w", r.URL.Path, err)
+			http.Error(w, err.Error(), http.StatusBadGateway)
+			return
+		}
+		err = fs.Sync()
+		if err != nil {
+			err = fmt.Errorf("sync failed: %w", err)
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		w.WriteHeader(http.StatusOK)
+		return
+	default:
+		http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+		return
+	}
+}
+
 func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []string, credentialsOK, attachment bool) {
 	if len(tokens) == 0 {
 		w.Header().Add("WWW-Authenticate", "Basic realm=\"collections\"")
@@ -519,25 +631,13 @@ func (h *handler) serveSiteFS(w http.ResponseWriter, r *http.Request, tokens []s
 		http.Error(w, errReadOnly.Error(), http.StatusMethodNotAllowed)
 		return
 	}
-	arv := h.clientPool.Get()
-	if arv == nil {
+	_, kc, client, release, err := h.getClients(r.Header.Get("X-Request-Id"), tokens[0])
+	if err != nil {
 		http.Error(w, "Pool failed: "+h.clientPool.Err().Error(), http.StatusInternalServerError)
 		return
 	}
-	defer h.clientPool.Put(arv)
-	arv.ApiToken = tokens[0]
+	defer release()
 
-	kc, err := keepclient.MakeKeepClient(arv)
-	if err != nil {
-		http.Error(w, "error setting up keep client: "+err.Error(), http.StatusInternalServerError)
-		return
-	}
-	kc.RequestID = r.Header.Get("X-Request-Id")
-	client := (&arvados.Client{
-		APIHost:   arv.ApiServer,
-		AuthToken: arv.ApiToken,
-		Insecure:  arv.ApiInsecure,
-	}).WithRequestID(r.Header.Get("X-Request-Id"))
 	fs := client.SiteFileSystem(kc)
 	fs.ForwardSlashNameSubstitution(h.Config.cluster.Collections.ForwardSlashNameSubstitution)
 	f, err := fs.Open(r.URL.Path)
diff --git a/services/keep-web/s3_test.go b/services/keep-web/s3_test.go
new file mode 100644
index 000000000..600b83496
--- /dev/null
+++ b/services/keep-web/s3_test.go
@@ -0,0 +1,173 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+	"bytes"
+	"crypto/rand"
+	"io/ioutil"
+	"os"
+	"time"
+
+	"git.arvados.org/arvados.git/sdk/go/arvados"
+	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+	"git.arvados.org/arvados.git/sdk/go/arvadostest"
+	"git.arvados.org/arvados.git/sdk/go/keepclient"
+	"github.com/AdRoll/goamz/aws"
+	"github.com/AdRoll/goamz/s3"
+	check "gopkg.in/check.v1"
+)
+
+func (s *IntegrationSuite) s3setup(c *check.C) (*arvados.Client, arvados.Collection, *s3.Bucket) {
+	var coll arvados.Collection
+	arv := arvados.NewClientFromEnv()
+	arv.AuthToken = arvadostest.ActiveToken
+	err := arv.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{"collection": map[string]interface{}{
+		"manifest_text": ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:emptyfile\n./emptydir d41d8cd98f00b204e9800998ecf8427e+0 0:0:.\n",
+	}})
+	c.Assert(err, check.IsNil)
+	ac, err := arvadosclient.New(arv)
+	c.Assert(err, check.IsNil)
+	kc, err := keepclient.MakeKeepClient(ac)
+	c.Assert(err, check.IsNil)
+	fs, err := coll.FileSystem(arv, kc)
+	c.Assert(err, check.IsNil)
+	f, err := fs.OpenFile("sailboat.txt", os.O_CREATE|os.O_WRONLY, 0644)
+	c.Assert(err, check.IsNil)
+	_, err = f.Write([]byte("⛵\n"))
+	c.Assert(err, check.IsNil)
+	err = f.Close()
+	c.Assert(err, check.IsNil)
+	err = fs.Sync()
+	c.Assert(err, check.IsNil)
+
+	auth := aws.NewAuth(arvadostest.ActiveTokenV2, arvadostest.ActiveTokenV2, "", time.Now().Add(time.Hour))
+	region := aws.Region{
+		Name:       s.testServer.Addr,
+		S3Endpoint: "http://" + s.testServer.Addr,
+	}
+	client := s3.New(*auth, region)
+	bucket := &s3.Bucket{
+		S3:   client,
+		Name: coll.UUID,
+	}
+	return arv, coll, bucket
+}
+
+func (s *IntegrationSuite) s3teardown(c *check.C, arv *arvados.Client, coll arvados.Collection) {
+	err := arv.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+	c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3GetObject(c *check.C) {
+	arv, coll, bucket := s.s3setup(c)
+	defer s.s3teardown(c, arv, coll)
+
+	rdr, err := bucket.GetReader("emptyfile")
+	c.Assert(err, check.IsNil)
+	buf, err := ioutil.ReadAll(rdr)
+	c.Check(err, check.IsNil)
+	c.Check(len(buf), check.Equals, 0)
+	err = rdr.Close()
+	c.Check(err, check.IsNil)
+
+	rdr, err = bucket.GetReader("missingfile")
+	c.Check(err, check.NotNil)
+
+	rdr, err = bucket.GetReader("sailboat.txt")
+	c.Check(err, check.IsNil)
+	buf, err = ioutil.ReadAll(rdr)
+	c.Check(err, check.IsNil)
+	c.Check(buf, check.DeepEquals, []byte("⛵\n"))
+	err = rdr.Close()
+	c.Check(err, check.IsNil)
+}
+
+func (s *IntegrationSuite) TestS3PutObjectSuccess(c *check.C) {
+	arv, coll, bucket := s.s3setup(c)
+	defer s.s3teardown(c, arv, coll)
+
+	for _, trial := range []struct {
+		objname string
+		size    int
+	}{
+		{
+			objname: "newfile",
+			size:    128000000,
+		}, {
+			objname: "newdir/newfile",
+			size:    1 << 26,
+		}, {
+			objname: "newdir1/newdir2/newfile",
+			size:    0,
+		},
+	} {
+		c.Logf("=== %v", trial)
+
+		_, err := bucket.GetReader(trial.objname)
+		c.Assert(err, check.NotNil)
+
+		buf := make([]byte, trial.size)
+		rand.Read(buf)
+
+		err = bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+		c.Check(err, check.IsNil)
+
+		rdr, err := bucket.GetReader(trial.objname)
+		if !c.Check(err, check.IsNil) {
+			continue
+		}
+		buf2, err := ioutil.ReadAll(rdr)
+		c.Check(err, check.IsNil)
+		c.Check(buf2, check.HasLen, len(buf))
+		c.Check(buf2, check.DeepEquals, buf)
+	}
+}
+
+func (s *IntegrationSuite) TestS3PutObjectFailure(c *check.C) {
+	arv, coll, bucket := s.s3setup(c)
+	defer s.s3teardown(c, arv, coll)
+
+	for _, trial := range []struct {
+		objname string
+	}{
+		{
+			objname: "emptyfile/newname", // emptyfile exists, see s3setup()
+		}, {
+			objname: "emptyfile/", // emptyfile exists, see s3setup()
+		}, {
+			objname: "emptydir", // dir already exists, see s3setup()
+		}, {
+			objname: "emptydir/",
+		}, {
+			objname: "emptydir//",
+		}, {
+			objname: "newdir/",
+		}, {
+			objname: "newdir//",
+		}, {
+			objname: "/",
+		}, {
+			objname: "//",
+		}, {
+			objname: "foo//bar",
+		}, {
+			objname: "",
+		},
+	} {
+		c.Logf("=== %v", trial)
+
+		buf := make([]byte, 1234)
+		rand.Read(buf)
+
+		err := bucket.PutReader(trial.objname, bytes.NewReader(buf), int64(len(buf)), "application/octet-stream", s3.Private, s3.Options{})
+		if !c.Check(err, check.NotNil, check.Commentf("name %q should be rejected", trial.objname)) {
+			continue
+		}
+
+		_, err = bucket.GetReader(trial.objname)
+		c.Check(err, check.NotNil)
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list