[arvados] created: 2.6.0-339-g8772ee6a8

git repository hosting git at public.arvados.org
Thu Jun 29 20:08:57 UTC 2023


        at  8772ee6a857973d1cac026b2185cd1e3d56a292d (commit)


commit 8772ee6a857973d1cac026b2185cd1e3d56a292d
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 29 14:59:16 2023 -0400

    20559: Explain locking strategy in comments.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index 29b7f2b0b..b519cbe84 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -78,15 +78,48 @@ type cachedSession struct {
 	arvadosclient *arvadosclient.ArvadosClient
 	keepclient    *keepclient.KeepClient
 
-	// mtx is RLocked while session is not safe to evict from cache
+	// Each session uses a system of three mutexes (plus the
+	// cache-wide mutex) to enable the following semantics:
+	//
+	// - There are never multiple sessions in use for a given
+	// token.
+	//
+	// - If the cached in-memory filesystems/user records are
+	// older than the configured cache TTL when a request starts,
+	// the request will use new ones.
+	//
+	// - Unused sessions are garbage-collected.
+	//
+	// In particular, when it is necessary to reset a session's
+	// filesystem/user record (to save memory or respect the
+	// configured cache TTL), any operations that are already
+	// using the existing filesystem/user record are allowed to
+	// finish before the new filesystem is constructed.
+	//
+	// The locks must be acquired in the following order:
+	// cache.mtx, session.mtx, session.refresh, session.inuse.
+
+	// mtx is RLocked while session is not safe to evict from
+	// cache -- i.e., a checkout() has decided to use it, and its
+	// caller is not finished with it. When locking or rlocking
+	// this mtx, the cache mtx MUST already be held.
+	//
+	// This mutex enables pruneSessions to detect when it is safe
+	// to completely remove the session entry from the cache.
 	mtx sync.RWMutex
-	// refresh is locked while reading or writing the following fields
-	refresh    sync.Mutex
+	// refresh must be locked in order to read or write the
+	// fs/user/userLoaded fields. This mutex enables GetSession
+	// and pruneSessions to remove/replace fs and user values
+	// safely.
+	refresh sync.Mutex
+	// inuse must be RLocked while the session is in use by a
+	// caller. This mutex enables pruneSessions() to wait for all
+	// existing usage to finish by calling inuse.Lock().
+	inuse sync.RWMutex
+
 	fs         arvados.CustomFileSystem
 	user       arvados.User
 	userLoaded bool
-	// inuse is RLocked while session is in use by a caller
-	inuse sync.RWMutex
 }
 
 func (sess *cachedSession) Release() {
@@ -271,18 +304,23 @@ func (c *cache) pruneSessions() {
 		// next GetSession will reload fs/user, or a
 		// subsequent pruneSessions will remove the session.
 		go func() {
-			// Ensure nobody is in GetSession
+			// Ensure nobody is mid-GetSession() (note we
+			// already know nobody is mid-checkout()
+			// because we have c.mtx locked)
 			sess.refresh.Lock()
-			// Wait for current usage to finish
+			defer sess.refresh.Unlock()
+			// Wait for current usage to finish (i.e.,
+			// anyone who has decided to use the current
+			// values of sess.fs and sess.user, and hasn't
+			// called Release() yet)
 			sess.inuse.Lock()
+			defer sess.inuse.Unlock()
 			// Release memory
 			sess.fs = nil
 			if sess.expire.Before(now) {
 				// Mark user data as stale
 				sess.userLoaded = false
 			}
-			sess.inuse.Unlock()
-			sess.refresh.Unlock()
 			// Next GetSession will make a new fs
 		}()
 	}

commit 5bedc30c5af3a290c220c134edcafc3ce828e1a4
Author: Tom Clegg <tom at curii.com>
Date:   Thu Jun 29 11:57:29 2023 -0400

    20559: Refactor session sharing.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go
index c77a1b4bb..29b7f2b0b 100644
--- a/services/keep-web/cache.go
+++ b/services/keep-web/cache.go
@@ -8,7 +8,6 @@ import (
 	"errors"
 	"net/http"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvados"
@@ -28,6 +27,7 @@ type cache struct {
 	metrics   cacheMetrics
 	sessions  *lru.TwoQueueCache
 	setupOnce sync.Once
+	mtx       sync.Mutex
 
 	chPruneSessions chan struct{}
 }
@@ -72,12 +72,30 @@ func (m *cacheMetrics) setup(reg *prometheus.Registry) {
 }
 
 type cachedSession struct {
+	cache         *cache
 	expire        time.Time
-	fs            atomic.Value
 	client        *arvados.Client
 	arvadosclient *arvadosclient.ArvadosClient
 	keepclient    *keepclient.KeepClient
-	user          atomic.Value
+
+	// mtx is RLocked while session is not safe to evict from cache
+	mtx sync.RWMutex
+	// refresh is locked while reading or writing the following fields
+	refresh    sync.Mutex
+	fs         arvados.CustomFileSystem
+	user       arvados.User
+	userLoaded bool
+	// inuse is RLocked while session is in use by a caller
+	inuse sync.RWMutex
+}
+
+func (sess *cachedSession) Release() {
+	sess.inuse.RUnlock()
+	sess.mtx.RUnlock()
+	select {
+	case sess.cache.chPruneSessions <- struct{}{}:
+	default:
+	}
 }
 
 func (c *cache) setup() {
@@ -114,98 +132,112 @@ var selectPDH = map[string]interface{}{
 	"select": []string{"portable_data_hash"},
 }
 
-// ResetSession unloads any potentially stale state. Should be called
-// after write operations, so subsequent reads don't return stale
-// data.
-func (c *cache) ResetSession(token string) {
-	c.setupOnce.Do(c.setup)
-	c.sessions.Remove(token)
-}
-
-// Get a long-lived CustomFileSystem suitable for doing a read operation
-// with the given token.
-func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
+func (c *cache) checkout(token string) (*cachedSession, error) {
 	c.setupOnce.Do(c.setup)
-	now := time.Now()
+	c.mtx.Lock()
+	defer c.mtx.Unlock()
 	ent, _ := c.sessions.Get(token)
 	sess, _ := ent.(*cachedSession)
-	expired := false
 	if sess == nil {
-		c.metrics.sessionMisses.Inc()
-		sess = &cachedSession{
-			expire: now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration()),
-		}
-		var err error
-		sess.client, err = arvados.NewClientFromConfig(c.cluster)
+		client, err := arvados.NewClientFromConfig(c.cluster)
 		if err != nil {
-			return nil, nil, nil, err
+			return nil, err
 		}
-		sess.client.AuthToken = token
-		sess.client.Timeout = time.Minute
+		client.AuthToken = token
+		client.Timeout = time.Minute
 		// A non-empty origin header tells controller to
 		// prioritize our traffic as interactive, which is
 		// true most of the time.
 		origin := c.cluster.Services.WebDAVDownload.ExternalURL
-		sess.client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
-		sess.arvadosclient, err = arvadosclient.New(sess.client)
+		client.SendHeader = http.Header{"Origin": {origin.Scheme + "://" + origin.Host}}
+		arvadosclient, err := arvadosclient.New(client)
 		if err != nil {
-			return nil, nil, nil, err
+			return nil, err
+		}
+		sess = &cachedSession{
+			cache:         c,
+			client:        client,
+			arvadosclient: arvadosclient,
+			keepclient:    keepclient.New(arvadosclient),
 		}
-		sess.keepclient = keepclient.New(sess.arvadosclient)
 		c.sessions.Add(token, sess)
-	} else if sess.expire.Before(now) {
-		c.metrics.sessionMisses.Inc()
-		expired = true
-	} else {
-		c.metrics.sessionHits.Inc()
-	}
-	select {
-	case c.chPruneSessions <- struct{}{}:
-	default:
 	}
+	sess.mtx.RLock()
+	return sess, nil
+}
 
-	fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
-	if fs == nil || expired {
-		fs = sess.client.SiteFileSystem(sess.keepclient)
-		fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
-		sess.fs.Store(fs)
+// Get a long-lived CustomFileSystem suitable for doing a read or
+// write operation with the given token.
+//
+// If the returned error is nil, the caller must call Release() on the
+// returned session when finished using it.
+func (c *cache) GetSession(token string) (arvados.CustomFileSystem, *cachedSession, *arvados.User, error) {
+	sess, err := c.checkout(token)
+	if err != nil {
+		return nil, nil, nil, err
 	}
+	sess.refresh.Lock()
+	defer sess.refresh.Unlock()
+	now := time.Now()
+	refresh := sess.expire.Before(now)
+	if sess.fs == nil || !sess.userLoaded || refresh {
+		// Wait for all active users to finish (otherwise they
+		// might make changes to an old fs after we start
+		// using the new fs).
+		sess.inuse.Lock()
+		if !sess.userLoaded || refresh {
+			err := sess.client.RequestAndDecode(&sess.user, "GET", "/arvados/v1/users/current", nil, nil)
+			if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
+				// token is OK, but "get user id" api is out
+				// of scope -- use existing/expired info if
+				// any, or leave empty for unknown user
+			} else if err != nil {
+				sess.inuse.Unlock()
+				sess.mtx.RUnlock()
+				return nil, nil, nil, err
+			}
+			sess.userLoaded = true
+		}
 
-	user, _ := sess.user.Load().(*arvados.User)
-	if user == nil || expired {
-		user = new(arvados.User)
-		err := sess.client.RequestAndDecode(user, "GET", "/arvados/v1/users/current", nil, nil)
-		if he := errorWithHTTPStatus(nil); errors.As(err, &he) && he.HTTPStatus() == http.StatusForbidden {
-			// token is OK, but "get user id" api is out
-			// of scope -- return nil, signifying unknown
-			// user
-		} else if err != nil {
-			return nil, nil, nil, err
+		if sess.fs == nil || refresh {
+			sess.fs = sess.client.SiteFileSystem(sess.keepclient)
+			sess.fs.ForwardSlashNameSubstitution(c.cluster.Collections.ForwardSlashNameSubstitution)
+			sess.expire = now.Add(c.cluster.Collections.WebDAVCache.TTL.Duration())
+			c.metrics.sessionMisses.Inc()
+		} else {
+			c.metrics.sessionHits.Inc()
 		}
-		sess.user.Store(user)
+		sess.inuse.Unlock()
+	} else {
+		c.metrics.sessionHits.Inc()
 	}
-
-	return fs, sess, user, nil
+	sess.inuse.RLock()
+	return sess.fs, sess, &sess.user, nil
 }
 
-// Remove all expired session cache entries, then remove more entries
-// until approximate remaining size <= maxsize/2
+// Remove all expired idle session cache entries, and remove in-memory
+// filesystems until approximate remaining size <= maxsize/2
 func (c *cache) pruneSessions() {
 	now := time.Now()
 	keys := c.sessions.Keys()
 	sizes := make([]int64, len(keys))
+	prune := []string(nil)
 	var size int64
 	for i, token := range keys {
+		token := token.(string)
 		ent, ok := c.sessions.Peek(token)
 		if !ok {
 			continue
 		}
-		s := ent.(*cachedSession)
-		if s.expire.Before(now) {
-			c.sessions.Remove(token)
-			continue
+		sess := ent.(*cachedSession)
+		sess.refresh.Lock()
+		expired := sess.expire.Before(now)
+		fs := sess.fs
+		sess.refresh.Unlock()
+		if expired {
+			prune = append(prune, token)
 		}
-		if fs, ok := s.fs.Load().(arvados.CustomFileSystem); ok {
+		if fs != nil {
 			sizes[i] = fs.MemorySize()
 			size += sizes[i]
 		}
@@ -214,10 +246,46 @@ func (c *cache) pruneSessions() {
 	// least frequently used entries (which Keys() returns last).
 	for i := len(keys) - 1; i >= 0 && size > c.cluster.Collections.WebDAVCache.MaxCollectionBytes; i-- {
 		if sizes[i] > 0 {
-			c.sessions.Remove(keys[i])
+			prune = append(prune, keys[i].(string))
 			size -= sizes[i]
 		}
 	}
+
+	c.mtx.Lock()
+	defer c.mtx.Unlock()
+	for _, token := range prune {
+		ent, ok := c.sessions.Peek(token)
+		if !ok {
+			continue
+		}
+		sess := ent.(*cachedSession)
+		if sess.mtx.TryLock() {
+			c.sessions.Remove(token)
+			continue
+		}
+		// We can't remove a session that's been checked out
+		// -- that would allow another session to be created
+		// for the same token using a different in-memory
+		// filesystem. Instead, we wait for active requests to
+		// finish and then "unload" it. After this, either the
+		// next GetSession will reload fs/user, or a
+		// subsequent pruneSessions will remove the session.
+		go func() {
+			// Ensure nobody is in GetSession
+			sess.refresh.Lock()
+			// Wait for current usage to finish
+			sess.inuse.Lock()
+			// Release memory
+			sess.fs = nil
+			if sess.expire.Before(now) {
+				// Mark user data as stale
+				sess.userLoaded = false
+			}
+			sess.inuse.Unlock()
+			sess.refresh.Unlock()
+			// Next GetSession will make a new fs
+		}()
+	}
 }
 
 // collectionBytes returns the approximate combined memory size of the
@@ -229,7 +297,11 @@ func (c *cache) collectionBytes() uint64 {
 		if !ok {
 			continue
 		}
-		if fs, ok := ent.(*cachedSession).fs.Load().(arvados.CustomFileSystem); ok {
+		sess := ent.(*cachedSession)
+		sess.refresh.Lock()
+		fs := sess.fs
+		sess.refresh.Unlock()
+		if fs != nil {
 			size += uint64(fs.MemorySize())
 		}
 	}
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 4625602fc..3af326a1a 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -411,16 +411,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 			// collection id is outside scope of supplied
 			// token
 			tokenScopeProblem = true
+			sess.Release()
 			continue
 		} else if os.IsNotExist(err) {
 			// collection does not exist or is not
 			// readable using this token
+			sess.Release()
 			continue
 		} else if err != nil {
 			http.Error(w, err.Error(), http.StatusInternalServerError)
+			sess.Release()
 			return
 		}
 		defer f.Close()
+		defer sess.Release()
 
 		collectionDir, sessionFS, session, tokenUser = f, fs, sess, user
 		break
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index 3b957c5a0..4a7627639 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -1627,6 +1627,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
+	s.handler.Cluster.Collections.WebDAVCache.TTL = arvados.Duration(time.Second * 2)
 	lockTidyInterval = time.Second
 	client := arvados.NewClientFromEnv()
 	client.AuthToken = arvadostest.ActiveTokenV2

commit 5a76116145b95da344f9fd2c7d3a51be2c4abc68
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 14 17:17:18 2023 -0400

    20559: Garbage collect unused per-collection mutexes.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index e9b497a0b..4625602fc 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -37,8 +37,9 @@ type handler struct {
 	Cluster   *arvados.Cluster
 	setupOnce sync.Once
 
-	lockMtx sync.Mutex
-	lock    map[string]*sync.RWMutex
+	lockMtx    sync.Mutex
+	lock       map[string]*sync.RWMutex
+	lockTidied time.Time
 }
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
@@ -958,6 +959,16 @@ var lockTidyInterval = time.Minute * 10
 func (h *handler) collectionLock(collectionID string, writing bool) sync.Locker {
 	h.lockMtx.Lock()
 	defer h.lockMtx.Unlock()
+	if time.Since(h.lockTidied) > lockTidyInterval {
+		// Periodically delete all locks that aren't in use.
+		h.lockTidied = time.Now()
+		for id, locker := range h.lock {
+			if locker.TryLock() {
+				locker.Unlock()
+				delete(h.lock, id)
+			}
+		}
+	}
 	locker := h.lock[collectionID]
 	if locker == nil {
 		locker = new(sync.RWMutex)
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index e7f8559c2..3b957c5a0 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -1627,6 +1627,7 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
 }
 
 func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
+	lockTidyInterval = time.Second
 	client := arvados.NewClientFromEnv()
 	client.AuthToken = arvadostest.ActiveTokenV2
 	// Start small, and increase concurrency (2^2, 4^2, ...)

commit 5890e8edb97b50955dc37d94625649ad30473f42
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jun 14 16:45:58 2023 -0400

    20559: Serialize writes for a given collection ID.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 3cdaf5d2b..e9b497a0b 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -18,6 +18,7 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
 	"git.arvados.org/arvados.git/lib/webdavfs"
@@ -35,6 +36,9 @@ type handler struct {
 	Cache     cache
 	Cluster   *arvados.Cluster
 	setupOnce sync.Once
+
+	lockMtx sync.Mutex
+	lock    map[string]*sync.RWMutex
 }
 
 var urlPDHDecoder = strings.NewReplacer(" ", "+", "-", "+")
@@ -530,7 +534,11 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 	}
 	h.logUploadOrDownload(r, session.arvadosclient, sessionFS, fsprefix+strings.Join(targetPath, "/"), nil, tokenUser)
 
-	if writeMethod[r.Method] {
+	writing := writeMethod[r.Method]
+	locker := h.collectionLock(collectionID, writing)
+	defer locker.Unlock()
+
+	if writing {
 		// Save the collection only if/when all
 		// webdav->filesystem operations succeed --
 		// and send a 500 error if the modified
@@ -942,6 +950,31 @@ func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string)
 	return nil, ""
 }
 
+var lockTidyInterval = time.Minute * 10
+
+// Lock the specified collection for reading or writing. Caller must
+// call Unlock() on the returned Locker when the operation is
+// finished.
+func (h *handler) collectionLock(collectionID string, writing bool) sync.Locker {
+	h.lockMtx.Lock()
+	defer h.lockMtx.Unlock()
+	locker := h.lock[collectionID]
+	if locker == nil {
+		locker = new(sync.RWMutex)
+		if h.lock == nil {
+			h.lock = map[string]*sync.RWMutex{}
+		}
+		h.lock[collectionID] = locker
+	}
+	if writing {
+		locker.Lock()
+		return locker
+	} else {
+		locker.RLock()
+		return locker.RLocker()
+	}
+}
+
 func ServeCORSPreflight(w http.ResponseWriter, header http.Header) bool {
 	method := header.Get("Access-Control-Request-Method")
 	if method == "" {
diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index 60fe4acc4..e7f8559c2 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -1629,10 +1629,12 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
 func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
 	client := arvados.NewClientFromEnv()
 	client.AuthToken = arvadostest.ActiveTokenV2
-	// Start small, and increase concurrency (2^2, 4^2, 8^2, 16^2)
+	// Start small, and increase concurrency (2^2, 4^2, ...)
 	// only until hitting failure. Avoids unnecessarily long
 	// failure reports.
-	for n := 2; n < 32 && !c.Failed(); n++ {
+	for n := 2; n < 16 && !c.Failed(); n = n * 2 {
+		c.Logf("%s: n=%d", c.TestName(), n)
+
 		var coll arvados.Collection
 		err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, nil)
 		c.Assert(err, check.IsNil)
@@ -1679,5 +1681,14 @@ func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
 			}()
 		}
 		wg.Wait()
+		for i := 0; i < n; i++ {
+			u := mustParseURL(fmt.Sprintf("http://%s.collections.example.com/i=%d", coll.UUID, i))
+			resp := httptest.NewRecorder()
+			req, err := http.NewRequest("PROPFIND", u.String(), &bytes.Buffer{})
+			c.Assert(err, check.IsNil)
+			req.Header.Set("Authorization", "Bearer "+client.AuthToken)
+			s.handler.ServeHTTP(resp, req)
+			c.Assert(resp.Code, check.Equals, http.StatusMultiStatus)
+		}
 	}
 }

commit d9c447c1a53c03493fb1dd51a8eeb3f0022d07d5
Author: Tom Clegg <tom at curii.com>
Date:   Tue Jun 13 17:47:14 2023 -0400

    20559: Test concurrent writes using same token.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go
index c9b48f99a..60fe4acc4 100644
--- a/services/keep-web/handler_test.go
+++ b/services/keep-web/handler_test.go
@@ -18,6 +18,7 @@ import (
 	"path/filepath"
 	"regexp"
 	"strings"
+	"sync"
 	"time"
 
 	"git.arvados.org/arvados.git/lib/config"
@@ -1624,3 +1625,59 @@ func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
 		}
 	}
 }
+
+func (s *IntegrationSuite) TestConcurrentWrites(c *check.C) {
+	client := arvados.NewClientFromEnv()
+	client.AuthToken = arvadostest.ActiveTokenV2
+	// Start small, and increase concurrency (2^2, 4^2, 8^2, 16^2)
+	// only until hitting failure. Avoids unnecessarily long
+	// failure reports.
+	for n := 2; n < 32 && !c.Failed(); n++ {
+		var coll arvados.Collection
+		err := client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, nil)
+		c.Assert(err, check.IsNil)
+		defer client.RequestAndDecode(&coll, "DELETE", "arvados/v1/collections/"+coll.UUID, nil, nil)
+
+		var wg sync.WaitGroup
+		for i := 0; i < n && !c.Failed(); i++ {
+			i := i
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				u := mustParseURL(fmt.Sprintf("http://%s.collections.example.com/i=%d", coll.UUID, i))
+				resp := httptest.NewRecorder()
+				req, err := http.NewRequest("MKCOL", u.String(), nil)
+				c.Assert(err, check.IsNil)
+				req.Header.Set("Authorization", "Bearer "+client.AuthToken)
+				s.handler.ServeHTTP(resp, req)
+				c.Assert(resp.Code, check.Equals, http.StatusCreated)
+				for j := 0; j < n && !c.Failed(); j++ {
+					j := j
+					wg.Add(1)
+					go func() {
+						defer wg.Done()
+						content := fmt.Sprintf("i=%d/j=%d", i, j)
+						u := mustParseURL("http://" + coll.UUID + ".collections.example.com/" + content)
+
+						resp := httptest.NewRecorder()
+						req, err := http.NewRequest("PUT", u.String(), strings.NewReader(content))
+						c.Assert(err, check.IsNil)
+						req.Header.Set("Authorization", "Bearer "+client.AuthToken)
+						s.handler.ServeHTTP(resp, req)
+						c.Check(resp.Code, check.Equals, http.StatusCreated)
+
+						time.Sleep(time.Second)
+						resp = httptest.NewRecorder()
+						req, err = http.NewRequest("GET", u.String(), nil)
+						c.Assert(err, check.IsNil)
+						req.Header.Set("Authorization", "Bearer "+client.AuthToken)
+						s.handler.ServeHTTP(resp, req)
+						c.Check(resp.Code, check.Equals, http.StatusOK)
+						c.Check(resp.Body.String(), check.Equals, content)
+					}()
+				}
+			}()
+		}
+		wg.Wait()
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list