[ARVADOS] updated: 1.3.0-2769-g90b3f51e0

Git user git at public.arvados.org
Mon Jul 6 20:52:05 UTC 2020


Summary of changes:
 build/run-build-packages-one-target.sh             |   2 +
 build/run-build-packages.sh                        |  10 +-
 build/run-library.sh                               |  59 +++++--
 doc/install/install-webshell.html.textile.liquid   |   9 +-
 lib/pam/.gitignore                                 |   2 +
 lib/pam/README                                     |  18 ++
 lib/pam/docker_test.go                             | 173 +++++++++++++++++++
 .../arv-mount => lib/pam/fpm-info.sh               |   2 +-
 lib/pam/pam-configs-arvados                        |  19 +++
 lib/pam/pam_arvados.go                             | 185 +++++++++++++++++++++
 lib/pam/pam_c.go                                   |  24 +++
 lib/pam/testclient.go                              |  83 +++++++++
 sdk/go/arvados/link.go                             |  17 +-
 sdk/go/arvados/virtual_machine.go                  |  25 +++
 14 files changed, 596 insertions(+), 32 deletions(-)
 create mode 100644 lib/pam/.gitignore
 create mode 100644 lib/pam/README
 create mode 100644 lib/pam/docker_test.go
 copy sdk/cli/test/binstub_arv-mount/arv-mount => lib/pam/fpm-info.sh (76%)
 mode change 100755 => 100644
 create mode 100644 lib/pam/pam-configs-arvados
 create mode 100644 lib/pam/pam_arvados.go
 create mode 100644 lib/pam/pam_c.go
 create mode 100644 lib/pam/testclient.go
 create mode 100644 sdk/go/arvados/virtual_machine.go

  discards  e66b9230ffb00603765db8b45a685e9999026933 (commit)
  discards  b272e9772244a684fe3356d92cdf8109392d1b09 (commit)
  discards  f56c55a3ce637fefd0d63a4331e78f0777c074ed (commit)
  discards  8ae218a2edbe5f579007cf07882a56d62125e48f (commit)
  discards  1d361408863a5fab20067098f8a818f38b1ce8fa (commit)
  discards  979d449cdca429e9a9edacb0ac1906af19afe3b5 (commit)
       via  90b3f51e0462234c54322f1381a8e7bc230938a4 (commit)
       via  50b26890bd4d4e786b28d6072a11fcbe6d8bee0d (commit)
       via  1c3aa927b986a08f164461e08b71a222803a8d7f (commit)
       via  4fa1cccccf752fd58dd2b10b1df4275a6de16ea2 (commit)
       via  d5b5f3d7c9a218c26fcb40c5e1d79136b2d75749 (commit)
       via  9117766cd617a23ad5aa4ecd0aacfb2924db4b50 (commit)
       via  183f8c6feeb8d3adbf36a1a4adf122607fb55617 (commit)
       via  61d58bb6d4687c0794137700df1ba6aca418a191 (commit)
       via  e66f567ca467f2dfa576983c503deb98fd35028e (commit)
       via  741b677dc5e85f60bc03ef130873e49ac0b75766 (commit)
       via  85709a9266b0b70038c8c41a1e109670e1c47cd4 (commit)
       via  0999dbe4a8bb91e316dd09ff25a00fcf20309ee4 (commit)
       via  0dfee4fdb4671b398c63a7861bf9af1cd1b4794a (commit)
       via  c2ad835ffbc40e7e061526df7f9b9f6e0c1a83d3 (commit)
       via  dddfa30b07b2584353df378528f84945faa3ad7f (commit)
       via  ff22ba71afe839832943099cc1fe273197c45ec7 (commit)
       via  34316951a4e4f8439940a7eda5f1b044565f072b (commit)
       via  2c5417221843491727e4e5505012fc115e3bc7b0 (commit)
       via  ce3903121eb9645e99f6f6846de421b9af1bb23f (commit)
       via  876f66bbe9f30e48a45208a627dfd50e2a68ec22 (commit)
       via  b694b711c69fdfd46b6732a6d6c67e37951dab39 (commit)
       via  7c9a94f70a895d5293ebdb5953518e417d173191 (commit)
       via  b578113d9c7f0d20d8d6f091b0dfc37e9b788ae8 (commit)
       via  a35ec27b40ce3ca0797cdcd8e0a79b2b8896af47 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (e66b9230ffb00603765db8b45a685e9999026933)
            \
             N -- N -- N (90b3f51e0462234c54322f1381a8e7bc230938a4)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 90b3f51e0462234c54322f1381a8e7bc230938a4
Author: Ward Vandewege <ward at curii.com>
Date:   Mon Jul 6 15:53:35 2020 -0400

    16585: add warmup phase to ensure there are always sufficient blocks
           available for all read threads.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 0dd94c5f3..05903f0f4 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -111,7 +111,10 @@ func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
 
 	wg.Add(1)
 	stopCh := make(chan struct{})
-	go countBeans(&wg, nextLocator, stopCh, stderr)
+	// Start warmup phase, where we write *ReadThreads blocks
+	if *ReadThreads > 0 {
+		stderr.Printf("Start warmup phase, waiting for %d available blocks before reading starts\n", *ReadThreads)
+	}
 	for i := 0; i < *WriteThreads; i++ {
 		nextBuf := make(chan []byte, 1)
 		wg.Add(1)
@@ -119,6 +122,15 @@ func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
 		wg.Add(1)
 		go doWrites(&wg, kc, nextBuf, nextLocator, stopCh, stderr)
 	}
+	if *ReadThreads > 0 {
+		for len(nextLocator) < *ReadThreads {
+			select {
+			case _ = <-bytesOutChan:
+			}
+		}
+		stderr.Printf("Warmup complete")
+	}
+	go countBeans(&wg, nextLocator, stopCh, stderr)
 	for i := 0; i < *ReadThreads; i++ {
 		wg.Add(1)
 		go doReads(&wg, kc, nextLocator, stopCh, stderr)

commit 50b26890bd4d4e786b28d6072a11fcbe6d8bee0d
Author: Ward Vandewege <ward at curii.com>
Date:   Mon Jul 6 15:15:51 2020 -0400

    16585: avoid read thead starvation, re-read the last locator if a new
           one is not available yet. This doesn't help with the startup
           phase, when doReads has not seen any locators to read yet.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 1567fcd6e..0dd94c5f3 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -278,35 +278,40 @@ func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []by
 func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator <-chan string, stopCh <-chan struct{}, stderr *log.Logger) {
 	defer wg.Done()
 
+	var locator string
 	for {
 		select {
 		case <-stopCh:
 			return
-		case locator := <-nextLocator:
-			rdr, size, url, err := kc.Get(locator)
-			if err != nil {
-				stderr.Print(err)
-				errorsChan <- struct{}{}
-				continue
-			}
-			n, err := io.Copy(ioutil.Discard, rdr)
-			rdr.Close()
-			if n != size || err != nil {
-				stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
-				errorsChan <- struct{}{}
-				continue
-				// Note we don't count the bytes received in
-				// partial/corrupt responses: we are measuring
-				// throughput, not resource consumption.
-			}
-			select {
-			case <-stopCh:
-				return
-			case bytesInChan <- uint64(n):
-			}
+		case locator = <-nextLocator:
 		default:
-			stderr.Printf("STARVED FOR BLOCKS TO READ - SLEEPING 100ms!")
+			// We don't have a new locator available, just re-use the previous one
+		}
+		if locator == "" {
+			// No locator yet (not enough doWrite loops completed), sleep for 100ms and try again
 			time.Sleep(100 * time.Millisecond)
+			continue
+		}
+		rdr, size, url, err := kc.Get(locator)
+		if err != nil {
+			stderr.Print(err)
+			errorsChan <- struct{}{}
+			continue
+		}
+		n, err := io.Copy(ioutil.Discard, rdr)
+		rdr.Close()
+		if n != size || err != nil {
+			stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+			errorsChan <- struct{}{}
+			continue
+			// Note we don't count the bytes received in
+			// partial/corrupt responses: we are measuring
+			// throughput, not resource consumption.
+		}
+		select {
+		case <-stopCh:
+			return
+		case bytesInChan <- uint64(n):
 		}
 	}
 }

commit 1c3aa927b986a08f164461e08b71a222803a8d7f
Author: Ward Vandewege <ward at curii.com>
Date:   Mon Jul 6 09:37:32 2020 -0400

    16585: Add --repeat argument to keep-exercise, which automatically repeats
    an expirement N times. Add loud warning when the read thread(s) are starved
    for blocks to read.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 163291c23..1567fcd6e 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -29,6 +29,7 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
+	"sync"
 	"syscall"
 	"time"
 
@@ -51,8 +52,12 @@ var (
 	ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
 	getVersion    = flag.Bool("version", false, "Print version information and exit.")
 	RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+	Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
 )
 
+var summary string
+var csvHeader string
+
 func main() {
 	flag.Parse()
 
@@ -74,27 +79,51 @@ func main() {
 	}
 	kc.Want_replicas = *Replicas
 
-	transport := *(http.DefaultTransport.(*http.Transport))
-	transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
 	kc.HTTPClient = &http.Client{
-		Timeout:   10 * time.Minute,
-		Transport: &transport,
+		Timeout: 10 * time.Minute,
+		// It's not safe to copy *http.DefaultTransport
+		// because it has a mutex (which might be locked)
+		// protecting a private map (which might not be nil).
+		// So we build our own, using the Go 1.12 default
+		// values.
+		Transport: &http.Transport{
+			TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+		},
 	}
 
 	overrideServices(kc, stderr)
+	csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+	for i := 0; i < *Repeat; i++ {
+		runExperiment(kc, stderr)
+		stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+		summary += fmt.Sprintf(",%d\n", i)
+	}
+	stderr.Println("Summary:")
+	stderr.Println()
+	fmt.Println(csvHeader + ",Experiment")
+	fmt.Println(summary)
+}
 
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+	var wg sync.WaitGroup
 	nextLocator := make(chan string, *ReadThreads+*WriteThreads)
 
-	go countBeans(nextLocator, stderr)
+	wg.Add(1)
+	stopCh := make(chan struct{})
+	go countBeans(&wg, nextLocator, stopCh, stderr)
 	for i := 0; i < *WriteThreads; i++ {
 		nextBuf := make(chan []byte, 1)
-		go makeBufs(nextBuf, i, stderr)
-		go doWrites(kc, nextBuf, nextLocator, stderr)
+		wg.Add(1)
+		go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+		wg.Add(1)
+		go doWrites(&wg, kc, nextBuf, nextLocator, stopCh, stderr)
 	}
 	for i := 0; i < *ReadThreads; i++ {
-		go doReads(kc, nextLocator, stderr)
+		wg.Add(1)
+		go doReads(&wg, kc, nextLocator, stopCh, stderr)
 	}
-	<-make(chan struct{})
+	wg.Wait()
 }
 
 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
@@ -104,11 +133,12 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, nextLocator chan string, stopCh chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	t0 := time.Now()
 	var tickChan <-chan time.Time
 	var endChan <-chan time.Time
-	c := make(chan os.Signal)
+	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
@@ -121,16 +151,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 	var errors uint64
 	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
-	var abort, printCsv bool
+	var exit, abort, printCsv bool
 	csv := log.New(os.Stdout, "", 0)
-	csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+	csv.Println(csvHeader)
 	for {
 		select {
 		case <-tickChan:
 			printCsv = true
 		case <-endChan:
 			printCsv = true
-			abort = true
+			exit = true
 		case <-c:
 			printCsv = true
 			abort = true
@@ -152,7 +182,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 			if rateOut > maxRateOut {
 				maxRateOut = rateOut
 			}
-			csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+			line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
 				time.Now().Format("2006-01-02 15:04:05"),
 				elapsed,
 				bytesIn, rateIn, maxRateIn,
@@ -168,16 +198,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 				*ServiceURL,
 				*ServiceUUID,
 				*RunTime,
+				*Repeat,
 			)
+			csv.Println(line)
+			if exit {
+				summary += line
+			}
 			printCsv = false
 		}
 		if abort {
 			os.Exit(0)
 		}
+		if exit {
+			close(stopCh)
+			break
+		}
 	}
 }
 
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -194,46 +234,80 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
-		nextBuf <- buf
+		select {
+		case <-stopCh:
+			close(nextBuf)
+			return
+		case nextBuf <- buf:
+		}
 	}
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
-	for buf := range nextBuf {
-		locator, _, err := kc.PutB(buf)
-		if err != nil {
-			stderr.Print(err)
-			errorsChan <- struct{}{}
-			continue
-		}
-		bytesOutChan <- uint64(len(buf))
-		for cap(nextLocator) > len(nextLocator)+*WriteThreads {
-			// Give the readers something to do, unless
-			// they have lots queued up already.
-			nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	for {
+		select {
+		case <-stopCh:
+			return
+		case buf := <-nextBuf:
+			locator, _, err := kc.PutB(buf)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
+				continue
+			}
+			select {
+			case <-stopCh:
+				return
+			case bytesOutChan <- uint64(len(buf)):
+			}
+			for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+				select {
+				case <-stopCh:
+					return
+				case nextLocator <- locator:
+					// Give the readers something to do, unless
+					// they have lots queued up already.
+				}
+			}
 		}
 	}
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
-	for locator := range nextLocator {
-		rdr, size, url, err := kc.Get(locator)
-		if err != nil {
-			stderr.Print(err)
-			errorsChan <- struct{}{}
-			continue
-		}
-		n, err := io.Copy(ioutil.Discard, rdr)
-		rdr.Close()
-		if n != size || err != nil {
-			stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
-			errorsChan <- struct{}{}
-			continue
-			// Note we don't count the bytes received in
-			// partial/corrupt responses: we are measuring
-			// throughput, not resource consumption.
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator <-chan string, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	for {
+		select {
+		case <-stopCh:
+			return
+		case locator := <-nextLocator:
+			rdr, size, url, err := kc.Get(locator)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
+				continue
+			}
+			n, err := io.Copy(ioutil.Discard, rdr)
+			rdr.Close()
+			if n != size || err != nil {
+				stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+				errorsChan <- struct{}{}
+				continue
+				// Note we don't count the bytes received in
+				// partial/corrupt responses: we are measuring
+				// throughput, not resource consumption.
+			}
+			select {
+			case <-stopCh:
+				return
+			case bytesInChan <- uint64(n):
+			}
+		default:
+			stderr.Printf("STARVED FOR BLOCKS TO READ - SLEEPING 100ms!")
+			time.Sleep(100 * time.Millisecond)
 		}
-		bytesInChan <- uint64(n)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list