[ARVADOS] created: e86baeeafa1d66f4e59589995abec85c9401e39a

Git user git at public.curoverse.com
Mon Feb 27 13:26:26 EST 2017


        at  e86baeeafa1d66f4e59589995abec85c9401e39a (commit)


commit e86baeeafa1d66f4e59589995abec85c9401e39a
Author: radhika <radhika at curoverse.com>
Date:   Mon Feb 27 13:25:50 2017 -0500

    11015: use multiple writers to increate throughput of goUpload.

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 0b59f7d..561f423 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -649,7 +649,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
 	_, err = os.Stat(collectionMetafile)
 	if err != nil {
 		// Regular directory
-		cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+		cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
 		manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
 		if err != nil {
 			return fmt.Errorf("While uploading output files: %v", err)
@@ -1002,7 +1002,7 @@ func NewContainerRunner(api IArvadosClient,
 	cr.NewLogWriter = cr.NewArvLogWriter
 	cr.RunArvMount = cr.ArvMountCmd
 	cr.MkTempDir = ioutil.TempDir
-	cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+	cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
 	cr.Container.UUID = containerUUID
 	cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
 	cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
index a068a2a..7802fed 100644
--- a/services/crunch-run/upload.go
+++ b/services/crunch-run/upload.go
@@ -87,27 +87,50 @@ func (m *CollectionFileWriter) NewFile(fn string) {
 	m.fn = fn
 }
 
-func (m *CollectionFileWriter) goUpload() {
+func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
+	var mtx sync.Mutex
+	var wg sync.WaitGroup
+
 	var errors []error
 	uploader := m.uploader
 	finish := m.finish
 	for block := range uploader {
-		hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-		signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
-		if err != nil {
-			errors = append(errors, err)
-		} else {
-			m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
-		}
+		mtx.Lock()
+		m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
+		blockIndex := len(m.ManifestStream.Blocks) - 1
+		mtx.Unlock()
+
+		workers <- struct{}{} // wait for an available worker slot
+		wg.Add(1)
+
+		go func(block *Block, blockIndex int) {
+			hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+			signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+			<-workers
+
+			mtx.Lock()
+			if err != nil {
+				errors = append(errors, err)
+			} else {
+				m.ManifestStream.Blocks[blockIndex] = signedHash
+			}
+			mtx.Unlock()
+
+			wg.Done()
+		}(block, blockIndex)
 	}
+	wg.Wait()
+
 	finish <- errors
 }
 
 // CollectionWriter implements creating new Keep collections by opening files
 // and writing to them.
 type CollectionWriter struct {
+	MaxWriters int
 	IKeepClient
 	Streams []*CollectionFileWriter
+	workers chan struct{}
 	mtx     sync.Mutex
 }
 
@@ -134,10 +157,18 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 		make(chan *Block),
 		make(chan []error),
 		fn}
-	go fw.goUpload()
 
 	m.mtx.Lock()
 	defer m.mtx.Unlock()
+	if m.workers == nil {
+		if m.MaxWriters < 1 {
+			m.MaxWriters = 2
+		}
+		m.workers = make(chan struct{}, m.MaxWriters)
+	}
+
+	go fw.goUpload(m.workers)
+
 	m.Streams = append(m.Streams, fw)
 
 	return fw
@@ -218,10 +249,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 }
 
 type WalkUpload struct {
+	MaxWriters  int
 	kc          IKeepClient
 	stripPrefix string
 	streamMap   map[string]*CollectionFileWriter
 	status      *log.Logger
+	workers     chan struct{}
+	mtx         sync.Mutex
 }
 
 // WalkFunc walks a directory tree, uploads each file found and adds it to the
@@ -252,7 +286,17 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 			make(chan *Block),
 			make(chan []error),
 			""}
-		go m.streamMap[dir].goUpload()
+
+		m.mtx.Lock()
+		if m.workers == nil {
+			if m.MaxWriters < 1 {
+				m.MaxWriters = 2
+			}
+			m.workers = make(chan struct{}, m.MaxWriters)
+		}
+		m.mtx.Unlock()
+
+		go m.streamMap[dir].goUpload(m.workers)
 	}
 
 	fileWriter := m.streamMap[dir]
@@ -281,7 +325,7 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 
 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
 	streamMap := make(map[string]*CollectionFileWriter)
-	wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+	wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
 	err = filepath.Walk(root, wu.WalkFunc)
 
 	if err != nil {
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
index b4b1efd..8d4ea07 100644
--- a/services/crunch-run/upload_test.go
+++ b/services/crunch-run/upload_test.go
@@ -21,7 +21,7 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
 
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 	c.Check(err, IsNil)
 	c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
@@ -36,7 +36,7 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 	ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
@@ -54,7 +54,7 @@ func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 	ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
@@ -88,7 +88,7 @@ func (s *TestSuite) TestSimpleUploadLarge(c *C) {
 
 	ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
@@ -105,7 +105,7 @@ func (s *TestSuite) TestUploadEmptySubdir(c *C) {
 
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
@@ -121,7 +121,7 @@ func (s *TestSuite) TestUploadEmptyFile(c *C) {
 
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
 
-	cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, IsNil)
@@ -137,7 +137,7 @@ func (s *TestSuite) TestUploadError(c *C) {
 
 	ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-	cw := CollectionWriter{&KeepErrorTestClient{}, nil, sync.Mutex{}}
+	cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
 	str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
 	c.Check(err, NotNil)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list