[arvados] updated: 2.7.0-5676-gf6e6f83268

git repository hosting git at public.arvados.org
Fri Jan 12 16:21:30 UTC 2024


Summary of changes:
 sdk/go/arvados/keep_cache.go | 78 +++++++++++++++++++++++++++-----------------
 1 file changed, 48 insertions(+), 30 deletions(-)

       via  f6e6f83268a4665a050d10b4a790906598dc1018 (commit)
      from  cce2c381181ff560bc134845eaea91939e1f8888 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f6e6f83268a4665a050d10b4a790906598dc1018
Author: Tom Clegg <tom at curii.com>
Date:   Fri Jan 12 11:06:19 2024 -0500

    20318: Avoid error when cache file is deleted while being written.
    
    The previous code had one goroutine writing the cache file, and
    another goroutine(s) waiting for it to finish, then opening the file
    for reading. If the cache file was deleted before being opened for
    reading, ReadAt would return an error.
    
    The new code shares the writer's filehandle with all of the goroutines
    that are waiting for it to finish writing. This ensures ReadAt can
    always read from the newly written file even if it is deleted
    mid-write.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index a544465272..f6265d5d0a 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -89,10 +89,12 @@ type sharedCache struct {
 }
 
 type writeprogress struct {
-	cond *sync.Cond // broadcast whenever size or done changes
-	done bool       // size and err have their final values
-	size int        // bytes copied into cache file so far
-	err  error      // error encountered while copying from backend to cache
+	cond    *sync.Cond     // broadcast whenever size or done changes
+	done    bool           // size and err have their final values
+	size    int            // bytes copied into cache file so far
+	err     error          // error encountered while copying from backend to cache
+	sharedf *os.File       // readable filehandle, usable if done && err==nil
+	readers sync.WaitGroup // goroutines that haven't finished reading from f yet
 }
 
 type openFileEnt struct {
@@ -277,31 +279,21 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 	cache.setupOnce.Do(cache.setup)
 	cachefilename := cache.cacheFile(locator)
 	if n, err := cache.quickReadAt(cachefilename, dst, offset); err == nil {
-		return n, err
-	}
-	readf, err := cache.openFile(cachefilename, os.O_CREATE|os.O_RDONLY)
-	if err != nil {
-		return 0, fmt.Errorf("ReadAt: %w", err)
-	}
-	defer readf.Close()
-
-	err = syscall.Flock(int(readf.Fd()), syscall.LOCK_SH)
-	if err != nil {
-		return 0, fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
+		return n, nil
 	}
 
 	cache.writingLock.Lock()
 	progress := cache.writing[cachefilename]
-	if progress != nil {
-		cache.writingLock.Unlock()
-	} else {
+	if progress == nil {
+		// Nobody else is fetching from backend, so we'll add
+		// a new entry to cache.writing, fetch in a separate
+		// goroutine.
 		progress = &writeprogress{}
 		progress.cond = sync.NewCond(&sync.Mutex{})
 		if cache.writing == nil {
 			cache.writing = map[string]*writeprogress{}
 		}
 		cache.writing[cachefilename] = progress
-		cache.writingLock.Unlock()
 
 		// Start a goroutine to copy from backend to f. As
 		// data arrives, wake up any waiting loops (see below)
@@ -309,14 +301,10 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 		// soon as the relevant bytes have been copied.
 		go func() {
 			var size int
-			var writef *os.File
 			var err error
 			defer func() {
-				if writef != nil {
-					closeErr := writef.Close()
-					if err == nil {
-						err = closeErr
-					}
+				if err == nil && progress.sharedf != nil {
+					err = progress.sharedf.Sync()
 				}
 				progress.cond.L.Lock()
 				progress.err = err
@@ -327,13 +315,27 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 				cache.writingLock.Lock()
 				delete(cache.writing, cachefilename)
 				cache.writingLock.Unlock()
+
+				// Wait for other goroutines to wake
+				// up, notice we're done, and use our
+				// sharedf to read their data, before
+				// we close sharedf.
+				//
+				// Nobody can join the WaitGroup after
+				// the progress entry is deleted from
+				// cache.writing above. Therefore,
+				// this Wait ensures nobody else is
+				// accessing progress, and we don't
+				// need to lock anything.
+				progress.readers.Wait()
+				progress.sharedf.Close()
 			}()
-			writef, err = cache.openFile(cachefilename, os.O_WRONLY)
+			progress.sharedf, err = cache.openFile(cachefilename, os.O_CREATE|os.O_RDWR)
 			if err != nil {
 				err = fmt.Errorf("ReadAt: %w", err)
 				return
 			}
-			err = syscall.Flock(int(writef.Fd()), syscall.LOCK_SH)
+			err = syscall.Flock(int(progress.sharedf.Fd()), syscall.LOCK_SH)
 			if err != nil {
 				err = fmt.Errorf("flock(%s, lock_sh) failed: %w", cachefilename, err)
 				return
@@ -341,7 +343,7 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 			size, err = cache.KeepGateway.BlockRead(context.Background(), BlockReadOptions{
 				Locator: locator,
 				WriteTo: funcwriter(func(p []byte) (int, error) {
-					n, err := writef.Write(p)
+					n, err := progress.sharedf.Write(p)
 					if n > 0 {
 						progress.cond.L.Lock()
 						progress.size += n
@@ -354,11 +356,19 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 			cache.gotidy()
 		}()
 	}
+	// We add ourselves to the readers WaitGroup so the
+	// fetch-from-backend goroutine doesn't close the shared
+	// filehandle before we read the data we need from it.
+	progress.readers.Add(1)
+	defer progress.readers.Done()
+	cache.writingLock.Unlock()
+
 	progress.cond.L.Lock()
 	for !progress.done && progress.size < len(dst)+offset {
 		progress.cond.Wait()
 	}
-	err = progress.err
+	sharedf := progress.sharedf
+	err := progress.err
 	progress.cond.L.Unlock()
 
 	if err != nil {
@@ -366,12 +376,20 @@ func (cache *DiskCache) ReadAt(locator string, dst []byte, offset int) (int, err
 		// error before copying enough bytes to satisfy our
 		// request, we return that error.
 		return 0, err
+	} else if len(dst) == 0 {
+		// It's possible that sharedf==nil here (the writer
+		// goroutine might not have done anything at all yet)
+		// and we don't need it anyway because no bytes are
+		// being read. Reading zero bytes seems pointless, but
+		// if someone does it, we might as well return
+		// suitable values, and not crash.
+		return 0, nil
 	} else {
 		// Regardless of whether the copy-from-backend
 		// goroutine succeeded, or failed after copying the
 		// bytes we need, the only errors we need to report
 		// are errors reading from the cache file.
-		return readf.ReadAt(dst, int64(offset))
+		return sharedf.ReadAt(dst, int64(offset))
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list