[ARVADOS] updated: e764c67d6f676d2b40ebaaeffce1a7e2f2369057

Git user git at public.curoverse.com
Wed Dec 28 15:01:18 EST 2016


Summary of changes:
 .gitignore                                         |   2 +
 .../app/assets/javascripts/work_unit_component.js  |  22 ++-
 build/run-build-docker-images.sh                   |   2 +-
 build/run-build-packages.sh                        |  14 +-
 sdk/cwl/arvados_cwl/__init__.py                    |  27 +++-
 sdk/cwl/arvados_cwl/arvdocker.py                   |  73 +++++-----
 sdk/cwl/arvados_cwl/arvjob.py                      |   3 +-
 sdk/cwl/arvados_cwl/arvworkflow.py                 |  34 ++---
 sdk/cwl/arvados_cwl/crunch_script.py               |   5 +-
 sdk/cwl/arvados_cwl/pathmapper.py                  |  29 ++--
 sdk/cwl/arvados_cwl/runner.py                      |   5 +-
 sdk/cwl/setup.py                                   |   5 +-
 sdk/cwl/tests/test_container.py                    |   9 +-
 sdk/cwl/tests/test_job.py                          |   7 +-
 sdk/cwl/tests/wf/expect_packed.cwl                 |  43 +++---
 sdk/cwl/tests/wf/scatter2_subwf.cwl                |  50 ++++---
 services/api/Gemfile                               |   5 +
 services/api/app/models/container.rb               |   2 +-
 services/api/app/models/pipeline_instance.rb       |  14 ++
 .../20161213172944_full_text_search_indexes.rb     |   1 +
 services/api/lib/create_superuser_token.rb         |  15 ++-
 services/api/test/unit/container_request_test.rb   |   7 +
 .../api/test/unit/create_superuser_token_test.rb   |  21 ++-
 services/api/test/unit/pipeline_instance_test.rb   |   8 ++
 services/crunch-run/crunchrun.go                   |   6 +-
 services/crunchstat/crunchstat.go                  |  36 +++++
 services/crunchstat/crunchstat_test.go             | 150 +++++++++++++++++++++
 services/keep-balance/keep-balance.service         |   2 +-
 services/keep-balance/main.go                      |   7 +-
 services/keep-balance/usage.go                     |   2 +-
 services/keepstore/pipe_adapters.go                |  63 +++++++++
 services/keepstore/volume_unix.go                  |  90 +++++++++----
 tools/arvbox/lib/arvbox/docker/common.sh           |   9 +-
 tools/arvbox/lib/arvbox/docker/createusers.sh      |   4 +-
 34 files changed, 579 insertions(+), 193 deletions(-)
 create mode 100644 services/keepstore/pipe_adapters.go

       via  e764c67d6f676d2b40ebaaeffce1a7e2f2369057 (commit)
       via  997479d1408139e96ecdb42a60b4f727f814f6c9 (commit)
       via  da2a4d9c13b80066400161cb7638425828de18d3 (commit)
       via  1847f3a94d355eed42e0d6ba7b5f929b7af7d143 (commit)
       via  af967503ee1c446ebed6451462f5ba725bcbbdf7 (commit)
       via  1e19bc12a83dacf4a40d1c0d358aae48b6588a58 (commit)
       via  0b3b1e9d91f06966cbeecd3d4a11e1fb7e8d8434 (commit)
       via  5c7c672a4cd0d8c5bd0e6c5218062214eb5751f3 (commit)
       via  e675daba8125b20e45e0c121f1fdcac85fb4b360 (commit)
       via  91b824a77fe19242de50e838f9f8c4fb30907fa3 (commit)
       via  2ecd4749bc9b3f8be8cf41864948108068e187d8 (commit)
       via  8d4ec10fc26d93d282845c789cd61da79e4b2836 (commit)
       via  692127b79071f419ef39fb4594a4193f8e4a7a37 (commit)
       via  b27f1106beb0fb321ee123501afb5ccf2354a363 (commit)
       via  fe060158c380fe92635cae24a12eb43680021169 (commit)
       via  f6e68adfecb95e42e984951794c96f54166b7f13 (commit)
       via  90633a9dd56502566731991dd4084a57e0366064 (commit)
       via  ec600fb1b5ab89d923e6e19529d568964c4f4137 (commit)
       via  9754971aff036b3532f2e4e4ee88b902f387d69a (commit)
       via  924b783e07028b17cd5403205361e4715f7a212f (commit)
       via  603de36ec435145259feea701acbc6a3cc46f115 (commit)
       via  5bdbbbf5137c6ef12d12362715dfc7ee6568dc93 (commit)
       via  8dc951d7f9a21a2429c38cbb3e119bb382618b95 (commit)
       via  b2cf2b9b857507effb31ec147e378558b5cf17bd (commit)
       via  cc94954f69ed2d26451bae6610b38de260d2252f (commit)
       via  96d90c859cede094d83cbaf5409418e9ab0c2c19 (commit)
       via  4f03b29b3efb42a48c3f397942d5b618b89390eb (commit)
       via  2eb7a28fa7900a005bf48dc40dd1af16d0bc455d (commit)
       via  cafa08eae78c6e29898164f8b5b2fc0127d69f48 (commit)
       via  b96bff759bb71ebb80d7ea33a7b7944eb7c0269f (commit)
       via  3beb006fd2c533a192379c9730a3a730c5493b90 (commit)
       via  4115524460e89f36b99f2fb5de1adebd86daaf4a (commit)
       via  680bc629dc473d877218b1ed9351fac4020d4657 (commit)
       via  fc390927833d14b6c439db8ea72d3d52b60a5e6d (commit)
      from  ebb2559b3a09636ff687316bbe512e0e8a86b168 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit e764c67d6f676d2b40ebaaeffce1a7e2f2369057
Author: Tom Clegg <tom at curoverse.com>
Date:   Wed Dec 28 15:00:55 2016 -0500

    10467: local directory driver: skip disk IO if client disconnects before lock is acquired.

diff --git a/services/keepstore/pipe_adapters.go b/services/keepstore/pipe_adapters.go
new file mode 100644
index 0000000..0969e32
--- /dev/null
+++ b/services/keepstore/pipe_adapters.go
@@ -0,0 +1,63 @@
+package main
+
+import (
+	"bytes"
+	"context"
+	"io"
+)
+
+// getWithPipe invokes getter and copies the resulting data into
+// buf. If ctx is done before all data is copied, getWithPipe closes
+// the pipe with an error, and returns early with an error.
+func getWithPipe(ctx context.Context, loc string, buf []byte, getter func(context.Context, string, *io.PipeWriter)) (int, error) {
+	piper, pipew := io.Pipe()
+	go getter(ctx, loc, pipew)
+	done := make(chan struct{})
+	var size int
+	var err error
+	go func() {
+		size, err = io.ReadFull(piper, buf)
+		if err == io.EOF || err == io.ErrUnexpectedEOF {
+			err = nil
+		}
+		close(done)
+	}()
+	select {
+	case <-ctx.Done():
+		piper.CloseWithError(ctx.Err())
+		return 0, ctx.Err()
+	case <-done:
+		piper.Close()
+		return size, err
+	}
+}
+
+// putWithPipe invokes putter with a new pipe, and and copies data
+// from buf into the pipe. If ctx is done before all data is copied,
+// putWithPipe closes the pipe with an error, and returns early with
+// an error.
+func putWithPipe(ctx context.Context, loc string, buf []byte, putter func(context.Context, string, *io.PipeReader) error) error {
+	piper, pipew := io.Pipe()
+	copyDone := make(chan struct{})
+	go func() {
+		_, err := io.Copy(pipew, bytes.NewReader(buf))
+		pipew.CloseWithError(err)
+		close(copyDone)
+	}()
+
+	var err error
+	done := make(chan struct{})
+	go func() {
+		err = putter(ctx, loc, piper)
+		close(done)
+	}()
+	select {
+	case <-ctx.Done():
+		pipew.CloseWithError(ctx.Err())
+		<-copyDone
+		return ctx.Err()
+	case <-done:
+		pipew.Close()
+		return err
+	}
+}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index fff02aa..739cf8d 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -160,10 +160,10 @@ func (v *UnixVolume) Touch(loc string) error {
 		return err
 	}
 	defer f.Close()
-	if v.locker != nil {
-		v.locker.Lock()
-		defer v.locker.Unlock()
+	if err := v.lock(context.TODO()); err != nil {
+		return err
 	}
+	defer v.unlock()
 	if e := lockfile(f); e != nil {
 		return e
 	}
@@ -185,13 +185,10 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
 func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-	if v.locker != nil {
-		v.locker.Lock()
-		defer v.locker.Unlock()
-	}
-	if ctx.Err() != nil {
-		return ctx.Err()
+	if err := v.lock(ctx); err != nil {
+		return err
 	}
+	defer v.unlock()
 	f, err := os.Open(path)
 	if err != nil {
 		return err
@@ -216,21 +213,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
 func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+	return getWithPipe(ctx, loc, buf, v.get)
+}
+
+func (v *UnixVolume) get(ctx context.Context, loc string, w *io.PipeWriter) {
 	path := v.blockPath(loc)
 	stat, err := v.stat(path)
 	if err != nil {
-		return 0, v.translateError(err)
-	}
-	if stat.Size() > int64(len(buf)) {
-		return 0, TooLongError
+		w.CloseWithError(v.translateError(err))
+		return
 	}
-	var read int
-	size := int(stat.Size())
 	err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-		read, err = io.ReadFull(rdr, buf[:size])
+		n, err := io.Copy(w, rdr)
+		if err == nil && n != stat.Size() {
+			err = io.ErrUnexpectedEOF
+		}
 		return err
 	})
-	return read, err
+	w.CloseWithError(err)
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -251,6 +251,10 @@ func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) err
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+	return putWithPipe(ctx, loc, block, v.put)
+}
+
+func (v *UnixVolume) put(ctx context.Context, loc string, rdr *io.PipeReader) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
@@ -269,18 +273,14 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
 		log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
 		return tmperr
 	}
+
 	bpath := v.blockPath(loc)
 
-	if v.locker != nil {
-		v.locker.Lock()
-		defer v.locker.Unlock()
-	}
-	select {
-	case <-ctx.Done():
-		return ctx.Err()
-	default:
+	if err := v.lock(ctx); err != nil {
+		return err
 	}
-	if _, err := tmpfile.Write(block); err != nil {
+	defer v.unlock()
+	if _, err := io.Copy(tmpfile, rdr); err != nil {
 		log.Printf("%s: writing to %s: %s\n", v, bpath, err)
 		tmpfile.Close()
 		os.Remove(tmpfile.Name())
@@ -418,10 +418,10 @@ func (v *UnixVolume) Trash(loc string) error {
 	if v.ReadOnly {
 		return MethodDisabledError
 	}
-	if v.locker != nil {
-		v.locker.Lock()
-		defer v.locker.Unlock()
+	if err := v.lock(context.TODO()); err != nil {
+		return err
 	}
+	defer v.unlock()
 	p := v.blockPath(loc)
 	f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
 	if err != nil {
@@ -559,6 +559,38 @@ func (v *UnixVolume) Replication() int {
 	return v.DirectoryReplication
 }
 
+// lock acquires the serialize lock, if one is in use. If ctx is done
+// before the lock is acquired, lock returns ctx.Err() instead of
+// acquiring the lock.
+func (v *UnixVolume) lock(ctx context.Context) error {
+	if v.locker == nil {
+		return nil
+	}
+	locked := make(chan struct{})
+	go func() {
+		v.locker.Lock()
+		close(locked)
+	}()
+	select {
+	case <-ctx.Done():
+		go func() {
+			<-locked
+			v.locker.Unlock()
+		}()
+		return ctx.Err()
+	case <-locked:
+		return nil
+	}
+}
+
+// unlock releases the serialize lock, if one is in use.
+func (v *UnixVolume) unlock() {
+	if v.locker == nil {
+		return
+	}
+	v.locker.Unlock()
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
 	return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list