[ARVADOS] updated: 4e98b0adbda5d6525825e72f67d9542c47147d0a

git at public.curoverse.com git at public.curoverse.com
Fri Oct 23 11:39:39 EDT 2015


Summary of changes:
 sdk/go/crunchrunner/crunchrunner.go | 32 +++++++++++----
 sdk/go/crunchrunner/upload.go       | 79 +++++++++++++++++--------------------
 2 files changed, 61 insertions(+), 50 deletions(-)

       via  4e98b0adbda5d6525825e72f67d9542c47147d0a (commit)
      from  5d3de212473f9fbf0e797c1fc03ec1dbf8f532ef (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4e98b0adbda5d6525825e72f67d9542c47147d0a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 23 11:16:27 2015 -0400

    7582: Adjust signal catching to eliminate races.  Tighten up code based on comments.

diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index ea29be8..02f2be4 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -128,10 +128,6 @@ func setupSignals(cmd *exec.Cmd) chan os.Signal {
 	// Set up signal handlers
 	// Forward SIGINT, SIGTERM and SIGQUIT to inner process
 	sigChan := make(chan os.Signal, 1)
-	go func(sig <-chan os.Signal) {
-		catch := <-sig
-		cmd.Process.Signal(catch)
-	}(sigChan)
 	signal.Notify(sigChan, syscall.SIGTERM)
 	signal.Notify(sigChan, syscall.SIGINT)
 	signal.Notify(sigChan, syscall.SIGQUIT)
@@ -237,11 +233,30 @@ func runner(api IArvadosClient,
 	}
 	log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
 
-	err = cmd.Start()
+	var caughtSignal os.Signal
+	{
+		sigChan := setupSignals(cmd)
+		defer signal.Stop(sigChan)
 
-	signals := setupSignals(cmd)
-	err = cmd.Wait()
-	signal.Stop(signals)
+		err = cmd.Start()
+		if err != nil {
+			return TempFail{err}
+		}
+
+		go func(sig <-chan os.Signal) {
+			for sig := range sig {
+				caughtSignal = sig
+				cmd.Process.Signal(caughtSignal)
+			}
+		}(sigChan)
+
+		err = cmd.Wait()
+	}
+
+	if caughtSignal != nil {
+		log.Printf("Caught signal %v", caughtSignal)
+		return PermFail{}
+	}
 
 	if err != nil {
 		// Run() returns ExitError on non-zero exit code, but we handle
@@ -321,6 +336,7 @@ func main() {
 		log.Fatal(err)
 	}
 
+	syscall.Umask(0022)
 	err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
 
 	if err == nil {
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index ac3f065..4ced0ce 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -33,10 +33,9 @@ type IKeepClient interface {
 	PutHB(hash string, buf []byte) (string, int, error)
 }
 
-func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
-	// Needed to conform to Writer interface, but not implemented
-	// because io.Copy will actually use ReadFrom instead.
-	return 0, nil
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+	n, err := m.ReadFrom(bytes.NewReader(p))
+	return int(n), err
 }
 
 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
@@ -50,37 +49,34 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
 		count, err = r.Read(m.Block.data[m.Block.offset:])
 		total += int64(count)
 		m.Block.offset += int64(count)
-		if count > 0 {
-			if m.Block.offset == keepclient.BLOCKSIZE {
-				m.uploader <- m.Block
-				m.Block = nil
-			}
+		if m.Block.offset == keepclient.BLOCKSIZE {
+			m.uploader <- m.Block
+			m.Block = nil
 		}
 	}
 
-	return total, err
+	if err == io.EOF {
+		return total, nil
+	} else {
+		return total, err
+	}
+
 }
 
 func (m *ManifestStreamWriter) goUpload() {
 	var errors []error
 	uploader := m.uploader
 	finish := m.finish
-	for true {
-		select {
-		case block, valid := <-uploader:
-			if !valid {
-				finish <- errors
-				return
-			}
-			hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-			signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
-			if err != nil {
-				errors = append(errors, err)
-			} else {
-				m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
-			}
+	for block := range uploader {
+		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+		signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+		if err != nil {
+			errors = append(errors, err)
+		} else {
+			m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
 		}
 	}
+	finish <- errors
 }
 
 type ManifestWriter struct {
@@ -128,7 +124,7 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
 
 	var count int64
 	count, err = io.Copy(stream, file)
-	if err != nil && err != io.EOF {
+	if err != nil {
 		return err
 	}
 
@@ -142,23 +138,22 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
 
 func (m *ManifestWriter) Finish() error {
 	var errstring string
-	for _, v := range m.Streams {
-		if v.uploader != nil {
-			if v.Block != nil {
-				v.uploader <- v.Block
-			}
-			close(v.uploader)
-			v.uploader = nil
-
-			errors := <-v.finish
-			close(v.finish)
-			v.finish = nil
-
-			if errors != nil {
-				for _, r := range errors {
-					errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
-				}
-			}
+	for _, stream := range m.Streams {
+		if stream.uploader == nil {
+			continue
+		}
+		if stream.Block != nil {
+			stream.uploader <- stream.Block
+		}
+		close(stream.uploader)
+		stream.uploader = nil
+
+		errors := <-stream.finish
+		close(stream.finish)
+		stream.finish = nil
+
+		for _, r := range errors {
+			errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
 		}
 	}
 	if errstring != "" {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list