[ARVADOS] created: 56d363875a42c1abd485c67e030f625299548aa0

git at public.curoverse.com git at public.curoverse.com
Thu Aug 21 14:09:25 EDT 2014


        at  56d363875a42c1abd485c67e030f625299548aa0 (commit)


commit 56d363875a42c1abd485c67e030f625299548aa0
Author: Tim Pierce <twp at curoverse.com>
Date:   Thu Aug 21 13:50:18 2014 -0400

    3448: check block timestamp before DELETE
    
    volume.Delete locks the target file and checks the timestamp before
    proceeding. If the file is newer than permission_ttl specifies, return a
    PermissionError. This way, a block that has been marked for recycling by
    Data Manager but subsequently was re-added by a user will not be
    prematurely deleted.
    
    PutBlock now updates the timestamp on the target block if it already
    exists on disk, to prevent DELETE from recycling old blocks that have
    just been refreshed.
    
    To update block timestamps, the blob server uses the new volume.Touch
    method.  MockVolume and UnixVolume have been updated appropriately.
    
    Refs #3448.

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 9838694..468eec4 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -397,6 +397,17 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 }
 
+// GetBlock, PutBlock and TouchBlock implement lower-level code for
+// handling blocks by rooting through volumes connected to the local
+// machine.  Once the handler has determined that system policy permits
+// the request, it calls these methods to perform the actual operation.
+//
+// TODO(twp): this code would probably be better located in the
+// VolumeManager interface. As an abstraction, the VolumeManager
+// should be the only part of the code that cares about which volume a
+// block is stored on, so it should be responsible for figuring out
+// which volume to check for fetching blocks, storing blocks, etc.
+
 func GetBlock(hash string) ([]byte, error) {
 	// Attempt to read the requested hash from a keep volume.
 	error_to_caller := NotFoundError
@@ -484,7 +495,16 @@ func PutBlock(block []byte, hash string) error {
 	// so there is nothing special to do if err != nil.
 	if oldblock, err := GetBlock(hash); err == nil {
 		if bytes.Compare(block, oldblock) == 0 {
-			return nil
+			// The block already exists; update the timestamp and
+			// return.
+			// Note that TouchBlock will fail (and therefore
+			// so will PutBlock) if the block exists but is found on a
+			// read-only volume. That is intentional: if the user has
+			// requested N replicas of a block, we want to be sure that
+			// there are at least N *writable* replicas, so a block
+			// that cannot be written to should not count toward the
+			// replication total.
+			return TouchBlock(block)
 		} else {
 			return CollisionError
 		}
@@ -520,6 +540,22 @@ func PutBlock(block []byte, hash string) error {
 	}
 }
 
+// TouchBlock finds the block identified by hash and updates its
+// filesystem modification time with the current time.
+func TouchBlock(hash string) error {
+	for _, vol := range KeepVM.Volumes() {
+		err := vol.Touch(hash)
+		if os.IsNotExist(err) {
+			continue
+		}
+		// either err is nil (meaning success) or some error other
+		// than "file does not exist" (which we should return upward).
+		return err
+	}
+	// If we got here, the block was not found on any volume.
+	return os.ErrNotExist
+}
+
 // IsValidLocator
 //     Return true if the specified string is a valid Keep locator.
 //     When Keep is extended to support hash types other than MD5,
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index b15ef23..7d791f2 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -9,11 +9,13 @@ import (
 	"fmt"
 	"os"
 	"strings"
+	"time"
 )
 
 type Volume interface {
 	Get(loc string) ([]byte, error)
 	Put(loc string, block []byte) error
+	Touch(loc string) error
 	Index(prefix string) string
 	Delete(loc string) error
 	Status() *VolumeStatus
@@ -26,12 +28,17 @@ type Volume interface {
 // on all writes and puts.
 //
 type MockVolume struct {
-	Store map[string][]byte
-	Bad   bool
+	Store      map[string][]byte
+	Timestamps map[string]time.Time
+	Bad        bool
 }
 
 func CreateMockVolume() *MockVolume {
-	return &MockVolume{make(map[string][]byte), false}
+	return &MockVolume{
+		make(map[string][]byte),
+		make(map[string]time.Time),
+		false,
+	}
 }
 
 func (v *MockVolume) Get(loc string) ([]byte, error) {
@@ -48,6 +55,14 @@ func (v *MockVolume) Put(loc string, block []byte) error {
 		return errors.New("Bad volume")
 	}
 	v.Store[loc] = block
+	return Touch(loc)
+}
+
+func (v *MockVolume) Touch(loc string) error {
+	if v.Bad {
+		return errors.New("Bad volume")
+	}
+	v.Timestamps[loc] = time.Now()
 	return nil
 }
 
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index ddddd5a..e137045 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -97,6 +97,19 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 	return response.err
 }
 
+func (v *UnixVolume) Touch(loc string) error {
+	p := v.blockPath(loc)
+	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+	if err != nil {
+		return err
+	}
+	lockfile(f)
+	defer unlockfile(f)
+	now := time.Now().Unix()
+	utime := syscall.Utimbuf{now, now}
+	return syscall.Utime(p, &utime)
+}
+
 // Read retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
@@ -230,7 +243,27 @@ func (v *UnixVolume) Index(prefix string) (output string) {
 }
 
 func (v *UnixVolume) Delete(loc string) error {
-	return os.Remove(v.blockPath(loc))
+	p := v.blockPath(loc)
+	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
+	if err != nil {
+		return err
+	}
+	lockfile(f)
+	defer unlockfile(f)
+
+	// Return PermissionError if the block has been PUT more recently
+	// than -permission_ttl.  This guards against a race condition
+	// where a block is old enough that Data Manager has added it to
+	// the trash list, but the user submitted a PUT for the block
+	// since then.
+	if fi, err := os.Stat(p); err != nil {
+		return err
+	} else {
+		if time.Since(fi.ModTime()) < permission_ttl {
+			return PermissionError
+		}
+	}
+	return os.Remove(p)
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -293,3 +326,12 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 func (v *UnixVolume) String() string {
 	return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
+
+// lockfile and unlockfile use flock(2) to manage kernel file locks.
+func lockfile(f os.File) error {
+	return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+}
+
+func unlockfile(f os.File) error {
+	return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list