[ARVADOS] updated: 1.3.0-2769-g199d58a4d

Git user git at public.arvados.org
Tue Jul 7 16:06:48 UTC 2020


Summary of changes:
 doc/install/install-api-server.html.textile.liquid |  5 +--
 .../install-arv-git-httpd.html.textile.liquid      |  3 +-
 doc/install/install-keep-web.html.textile.liquid   |  2 +-
 doc/install/install-keepproxy.html.textile.liquid  |  3 +-
 .../install-workbench-app.html.textile.liquid      |  3 +-
 .../install-workbench2-app.html.textile.liquid     |  3 +-
 doc/install/install-ws.html.textile.liquid         |  2 +-
 sdk/R/install_deps.R                               |  6 +++
 tools/keep-exercise/keep-exercise.go               | 51 +++++++++-------------
 9 files changed, 34 insertions(+), 44 deletions(-)

  discards  90b3f51e0462234c54322f1381a8e7bc230938a4 (commit)
  discards  50b26890bd4d4e786b28d6072a11fcbe6d8bee0d (commit)
  discards  1c3aa927b986a08f164461e08b71a222803a8d7f (commit)
       via  199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e (commit)
       via  9d977a7eae2fb2dd8556ae2391b21b22d28a2ff0 (commit)
       via  63bcdd653f111d752ec11ccb3ab58ee5150478d5 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (90b3f51e0462234c54322f1381a8e7bc230938a4)
            \
             N -- N -- N (199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e
Author: Ward Vandewege <ward at curii.com>
Date:   Mon Jul 6 09:37:32 2020 -0400

    16585: Add --repeat cli argument, which automatically repeats an
    expirement N times. Add warmup phase to ensure there are always
    sufficient blocks available for all read threads. Refactor nextLocator
    to be an atomic.Value instead of a channel, to avoid read starvation.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 163291c23..7641465aa 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -29,6 +29,8 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
+	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 
@@ -51,8 +53,12 @@ var (
 	ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
 	getVersion    = flag.Bool("version", false, "Print version information and exit.")
 	RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+	Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
 )
 
+var summary string
+var csvHeader string
+
 func main() {
 	flag.Parse()
 
@@ -64,6 +70,14 @@ func main() {
 
 	stderr := log.New(os.Stderr, "", log.LstdFlags)
 
+	if *ReadThreads > 0 && *WriteThreads == 0 {
+		stderr.Fatal("At least one write thread is required if rthreads is non-zero")
+	}
+
+	if *ReadThreads == 0 && *WriteThreads == 0 {
+		stderr.Fatal("Nothing to do!")
+	}
+
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		stderr.Fatal(err)
@@ -74,27 +88,62 @@ func main() {
 	}
 	kc.Want_replicas = *Replicas
 
-	transport := *(http.DefaultTransport.(*http.Transport))
-	transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
 	kc.HTTPClient = &http.Client{
-		Timeout:   10 * time.Minute,
-		Transport: &transport,
+		Timeout: 10 * time.Minute,
+		// It's not safe to copy *http.DefaultTransport
+		// because it has a mutex (which might be locked)
+		// protecting a private map (which might not be nil).
+		// So we build our own, using the Go 1.12 default
+		// values.
+		Transport: &http.Transport{
+			TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+		},
 	}
 
 	overrideServices(kc, stderr)
+	csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+	for i := 0; i < *Repeat; i++ {
+		runExperiment(kc, stderr)
+		stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+		summary += fmt.Sprintf(",%d\n", i)
+	}
+	stderr.Println("Summary:")
+	stderr.Println()
+	fmt.Println(csvHeader + ",Experiment")
+	fmt.Println(summary)
+}
 
-	nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+	var wg sync.WaitGroup
+	var nextLocator atomic.Value
 
-	go countBeans(nextLocator, stderr)
+	wg.Add(1)
+	stopCh := make(chan struct{})
+	if *ReadThreads > 0 {
+		stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
+	}
 	for i := 0; i < *WriteThreads; i++ {
 		nextBuf := make(chan []byte, 1)
-		go makeBufs(nextBuf, i, stderr)
-		go doWrites(kc, nextBuf, nextLocator, stderr)
+		wg.Add(1)
+		go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+		wg.Add(1)
+		go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
+	}
+	if *ReadThreads > 0 {
+		for nextLocator.Load() == nil {
+			select {
+			case _ = <-bytesOutChan:
+			}
+		}
+		stderr.Printf("Warmup complete")
 	}
+	go countBeans(&wg, stopCh, stderr)
 	for i := 0; i < *ReadThreads; i++ {
-		go doReads(kc, nextLocator, stderr)
+		wg.Add(1)
+		go doReads(&wg, kc, &nextLocator, stopCh, stderr)
 	}
-	<-make(chan struct{})
+	wg.Wait()
 }
 
 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
@@ -104,11 +153,12 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	t0 := time.Now()
 	var tickChan <-chan time.Time
 	var endChan <-chan time.Time
-	c := make(chan os.Signal)
+	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
@@ -121,16 +171,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 	var errors uint64
 	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
-	var abort, printCsv bool
+	var exit, abort, printCsv bool
 	csv := log.New(os.Stdout, "", 0)
-	csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+	csv.Println(csvHeader)
 	for {
 		select {
 		case <-tickChan:
 			printCsv = true
 		case <-endChan:
 			printCsv = true
-			abort = true
+			exit = true
 		case <-c:
 			printCsv = true
 			abort = true
@@ -152,7 +202,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 			if rateOut > maxRateOut {
 				maxRateOut = rateOut
 			}
-			csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+			line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
 				time.Now().Format("2006-01-02 15:04:05"),
 				elapsed,
 				bytesIn, rateIn, maxRateIn,
@@ -168,16 +218,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 				*ServiceURL,
 				*ServiceUUID,
 				*RunTime,
+				*Repeat,
 			)
+			csv.Println(line)
+			if exit {
+				summary += line
+			}
 			printCsv = false
 		}
 		if abort {
 			os.Exit(0)
 		}
+		if exit {
+			close(stopCh)
+			break
+		}
 	}
 }
 
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -194,29 +254,45 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
-		nextBuf <- buf
+		select {
+		case <-stopCh:
+			close(nextBuf)
+			return
+		case nextBuf <- buf:
+		}
 	}
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
-	for buf := range nextBuf {
-		locator, _, err := kc.PutB(buf)
-		if err != nil {
-			stderr.Print(err)
-			errorsChan <- struct{}{}
-			continue
-		}
-		bytesOutChan <- uint64(len(buf))
-		for cap(nextLocator) > len(nextLocator)+*WriteThreads {
-			// Give the readers something to do, unless
-			// they have lots queued up already.
-			nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	for {
+		select {
+		case <-stopCh:
+			return
+		case buf := <-nextBuf:
+			locator, _, err := kc.PutB(buf)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
+				continue
+			}
+			select {
+			case <-stopCh:
+				return
+			case bytesOutChan <- uint64(len(buf)):
+			}
+			nextLocator.Store(locator)
 		}
 	}
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
-	for locator := range nextLocator {
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	var locator string
+	for {
+		locator = nextLocator.Load().(string)
 		rdr, size, url, err := kc.Get(locator)
 		if err != nil {
 			stderr.Print(err)
@@ -233,7 +309,11 @@ func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.L
 			// partial/corrupt responses: we are measuring
 			// throughput, not resource consumption.
 		}
-		bytesInChan <- uint64(n)
+		select {
+		case <-stopCh:
+			return
+		case bytesInChan <- uint64(n):
+		}
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list