[ARVADOS] updated: b4d9dfe1e7acb1f45c2cc699020bf9299a0db5c9

Git user git at public.curoverse.com
Fri Dec 30 14:41:23 EST 2016


Summary of changes:
 services/keepstore/pipe_adapters.go | 43 ++++++++++---------------------------
 services/keepstore/volume_unix.go   | 20 ++++-------------
 2 files changed, 15 insertions(+), 48 deletions(-)

       via  b4d9dfe1e7acb1f45c2cc699020bf9299a0db5c9 (commit)
       via  b3e5ea60bdecb41fbf954b67ab859dc4542d0c1a (commit)
      from  568c7abf660b7a68f70b6ea47ae2e7352233f053 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b4d9dfe1e7acb1f45c2cc699020bf9299a0db5c9
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Dec 30 14:41:12 2016 -0500

    10467: Remove debug printfs.

diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 56a4679..52fdad3 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -275,7 +275,6 @@ func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.Reader) error {
 	bpath := v.blockPath(loc)
 
 	if err := v.lock(ctx); err != nil {
-		log.Println("lock err:", err)
 		return err
 	}
 	defer v.unlock()
@@ -572,16 +571,12 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 	}()
 	select {
 	case <-ctx.Done():
-		log.Print("ctx Done")
 		go func() {
-			log.Print("waiting <-locked")
 			<-locked
-			log.Print("unlocking")
 			v.locker.Unlock()
 		}()
 		return ctx.Err()
 	case <-locked:
-		log.Print("got lock")
 		return nil
 	}
 }

commit b3e5ea60bdecb41fbf954b67ab859dc4542d0c1a
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Dec 30 14:39:52 2016 -0500

    10467: Remove unneeded errorReadCloser, make getter/putter interfaces more like WriterTo/ReaderFrom.

diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go
index 91aa270..0b3999c 100644
--- a/services/keepstore/pipe_adapters.go
+++ b/services/keepstore/pipe_adapters.go
@@ -5,15 +5,16 @@ import (
 	"context"
 	"io"
 	"io/ioutil"
-	"sync"
 )
 
 // getWithPipe invokes getter and copies the resulting data into
 // buf. If ctx is done before all data is copied, getWithPipe closes
 // the pipe with an error, and returns early with an error.
-func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) {
+func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, io.Writer) error) (int, error) {
 	piper, pipew := io.Pipe()
-	go getter(ctx, loc, pipew)
+	go func() {
+		pipew.CloseWithError(getter(ctx, loc, pipew))
+	}()
 	done := make(chan struct{})
 	var size int
 	var err error
@@ -34,30 +35,11 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(contex
 	}
 }
 
-type errorReadCloser struct {
-	*io.PipeReader
-	err error
-	mtx sync.Mutex
-}
-
-func (erc *errorReadCloser) Close() error {
-	erc.mtx.Lock()
-	defer erc.mtx.Unlock()
-	erc.PipeReader.Close()
-	return erc.err
-}
-
-func (erc *errorReadCloser) SetError(err error) {
-	erc.mtx.Lock()
-	defer erc.mtx.Unlock()
-	erc.err = err
-}
-
 // putWithPipe invokes putter with a new pipe, and and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
-func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.ReadCloser) error) error {
+func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, io.Reader) error) error {
 	piper, pipew := io.Pipe()
 	copyErr := make(chan error)
 	go func() {
@@ -66,13 +48,9 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex
 		close(copyErr)
 	}()
 
-	erc := errorReadCloser{
-		PipeReader: piper,
-		err:        nil,
-	}
 	putErr := make(chan error, 1)
 	go func() {
-		putErr <- putter(ctx, loc, &erc)
+		putErr <- putter(ctx, loc, piper)
 		close(putErr)
 	}()
 
@@ -86,10 +64,11 @@ func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(contex
 
 	// Ensure io.Copy goroutine isn't blocked writing to pipew
 	// (otherwise, io.Copy is still using buf so it isn't safe to
-	// return). This can cause pipew to receive corrupt data, so
-	// we first ensure putter() will get an error when calling
-	// erc.Close().
-	erc.SetError(err)
+	// return). This can cause pipew to receive corrupt data if
+	// err came from copyErr or ctx.Done() before the copy
+	// finished. That's OK, though: in that case err != nil, and
+	// CloseWithErr(err) ensures putter() will get an error from
+	// piper.Read() before seeing EOF.
 	go pipew.CloseWithError(err)
 	go io.Copy(ioutil.Discard, piper)
 	<-copyErr
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 459e73a..56a4679 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -216,21 +216,19 @@ func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, erro
 	return getWithPipe(ctx, loc, buf, v.get)
 }
 
-func (v *UnixVolume) get(ctx context.Context, loc string, w *io.PipeWriter) {
+func (v *UnixVolume) get(ctx context.Context, loc string, w io.Writer) error {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
-		w.CloseWithError(v.translateError(err))
-		return
+		return v.translateError(err)
 	}
-	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+	return v.getFunc(ctx, path, func(rdr io.Reader) error {
 		n, err := io.Copy(w, rdr)
 		if err == nil && n != stat.Size() {
 			err = io.ErrUnexpectedEOF
 		}
 		return err
 	})
-	w.CloseWithError(err)
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -254,7 +252,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
 	return putWithPipe(ctx, loc, block, v.put)
 }
 
-func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) error {
+func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.Reader) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
@@ -287,11 +285,6 @@ func (v *UnixVolume) put(ctx context.Context, loc string, rdr io.ReadCloser) err
 		os.Remove(tmpfile.Name())
 		return err
 	}
-	if err := rdr.Close(); err != nil {
-		tmpfile.Close()
-		os.Remove(tmpfile.Name())
-		return err
-	}
 	if err := tmpfile.Close(); err != nil {
 		log.Printf("closing %s: %s\n", tmpfile.Name(), err)
 		os.Remove(tmpfile.Name())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list