[ARVADOS] updated: 4e98b0adbda5d6525825e72f67d9542c47147d0a
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 23 11:39:39 EDT 2015
Summary of changes:
sdk/go/crunchrunner/crunchrunner.go | 32 +++++++++++----
sdk/go/crunchrunner/upload.go | 79 +++++++++++++++++--------------------
2 files changed, 61 insertions(+), 50 deletions(-)
via 4e98b0adbda5d6525825e72f67d9542c47147d0a (commit)
from 5d3de212473f9fbf0e797c1fc03ec1dbf8f532ef (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4e98b0adbda5d6525825e72f67d9542c47147d0a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 23 11:16:27 2015 -0400
7582: Adjust signal catching to eliminate races. Tighten up code based on comments.
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
index ea29be8..02f2be4 100644
--- a/sdk/go/crunchrunner/crunchrunner.go
+++ b/sdk/go/crunchrunner/crunchrunner.go
@@ -128,10 +128,6 @@ func setupSignals(cmd *exec.Cmd) chan os.Signal {
// Set up signal handlers
// Forward SIGINT, SIGTERM and SIGQUIT to inner process
sigChan := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- catch := <-sig
- cmd.Process.Signal(catch)
- }(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
signal.Notify(sigChan, syscall.SIGQUIT)
@@ -237,11 +233,30 @@ func runner(api IArvadosClient,
}
log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
- err = cmd.Start()
+ var caughtSignal os.Signal
+ {
+ sigChan := setupSignals(cmd)
+ defer signal.Stop(sigChan)
- signals := setupSignals(cmd)
- err = cmd.Wait()
- signal.Stop(signals)
+ err = cmd.Start()
+ if err != nil {
+ return TempFail{err}
+ }
+
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ caughtSignal = sig
+ cmd.Process.Signal(caughtSignal)
+ }
+ }(sigChan)
+
+ err = cmd.Wait()
+ }
+
+ if caughtSignal != nil {
+ log.Printf("Caught signal %v", caughtSignal)
+ return PermFail{}
+ }
if err != nil {
// Run() returns ExitError on non-zero exit code, but we handle
@@ -321,6 +336,7 @@ func main() {
log.Fatal(err)
}
+ syscall.Umask(0022)
err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
index ac3f065..4ced0ce 100644
--- a/sdk/go/crunchrunner/upload.go
+++ b/sdk/go/crunchrunner/upload.go
@@ -33,10 +33,9 @@ type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
}
-func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
- // Needed to conform to Writer interface, but not implemented
- // because io.Copy will actually use ReadFrom instead.
- return 0, nil
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
}
func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
@@ -50,37 +49,34 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
count, err = r.Read(m.Block.data[m.Block.offset:])
total += int64(count)
m.Block.offset += int64(count)
- if count > 0 {
- if m.Block.offset == keepclient.BLOCKSIZE {
- m.uploader <- m.Block
- m.Block = nil
- }
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
}
}
- return total, err
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+
}
func (m *ManifestStreamWriter) goUpload() {
var errors []error
uploader := m.uploader
finish := m.finish
- for true {
- select {
- case block, valid := <-uploader:
- if !valid {
- finish <- errors
- return
- }
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
- if err != nil {
- errors = append(errors, err)
- } else {
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
- }
+ for block := range uploader {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
}
}
+ finish <- errors
}
type ManifestWriter struct {
@@ -128,7 +124,7 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
var count int64
count, err = io.Copy(stream, file)
- if err != nil && err != io.EOF {
+ if err != nil {
return err
}
@@ -142,23 +138,22 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
func (m *ManifestWriter) Finish() error {
var errstring string
- for _, v := range m.Streams {
- if v.uploader != nil {
- if v.Block != nil {
- v.uploader <- v.Block
- }
- close(v.uploader)
- v.uploader = nil
-
- errors := <-v.finish
- close(v.finish)
- v.finish = nil
-
- if errors != nil {
- for _, r := range errors {
- errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
- }
- }
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
+ }
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
}
}
if errstring != "" {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list