[ARVADOS] created: 1.3.0-2750-g44f7b64ea

Git user git at public.arvados.org
Mon Jul 6 13:45:53 UTC 2020


        at  44f7b64ea522898dcef0e4b096b1f5b98140c322 (commit)


commit 44f7b64ea522898dcef0e4b096b1f5b98140c322
Author: Ward Vandewege <ward at curii.com>
Date:   Mon Jul 6 09:37:32 2020 -0400

    16585: Add --repeat argument to keep-exercise, which automatically repeats
    an expirement N times. Add loud warning when the read thread(s) are starved
    for blocks to read.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 163291c23..8d0d86653 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -29,6 +29,7 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
+	"sync"
 	"syscall"
 	"time"
 
@@ -51,8 +52,12 @@ var (
 	ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
 	getVersion    = flag.Bool("version", false, "Print version information and exit.")
 	RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+	Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
 )
 
+var summary string
+var csvHeader string
+
 func main() {
 	flag.Parse()
 
@@ -74,27 +79,51 @@ func main() {
 	}
 	kc.Want_replicas = *Replicas
 
-	transport := *(http.DefaultTransport.(*http.Transport))
-	transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
 	kc.HTTPClient = &http.Client{
-		Timeout:   10 * time.Minute,
-		Transport: &transport,
+		Timeout: 10 * time.Minute,
+		// It's not safe to copy *http.DefaultTransport
+		// because it has a mutex (which might be locked)
+		// protecting a private map (which might not be nil).
+		// So we build our own, using the Go 1.12 default
+		// values.
+		Transport: &http.Transport{
+			TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+		},
 	}
 
 	overrideServices(kc, stderr)
+	csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+	for i := 0; i < *Repeat; i++ {
+		runExperiment(kc, stderr)
+		stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+		summary += fmt.Sprintf(",%d\n", i)
+	}
+	stderr.Println("Summary:")
+	stderr.Println()
+	fmt.Println(csvHeader + ",Experiment")
+	fmt.Println(summary)
+}
 
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+	var wg sync.WaitGroup
 	nextLocator := make(chan string, *ReadThreads+*WriteThreads)
 
-	go countBeans(nextLocator, stderr)
+	wg.Add(1)
+	stopCh := make(chan struct{})
+	go countBeans(&wg, nextLocator, stopCh, stderr)
 	for i := 0; i < *WriteThreads; i++ {
 		nextBuf := make(chan []byte, 1)
-		go makeBufs(nextBuf, i, stderr)
-		go doWrites(kc, nextBuf, nextLocator, stderr)
+		wg.Add(1)
+		go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+		wg.Add(1)
+		go doWrites(&wg, kc, nextBuf, nextLocator, stopCh, stderr)
 	}
 	for i := 0; i < *ReadThreads; i++ {
-		go doReads(kc, nextLocator, stderr)
+		wg.Add(1)
+		go doReads(&wg, kc, nextLocator, stopCh, stderr)
 	}
-	<-make(chan struct{})
+	wg.Wait()
 }
 
 // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
@@ -104,11 +133,12 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, nextLocator chan string, stopCh chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	t0 := time.Now()
 	var tickChan <-chan time.Time
 	var endChan <-chan time.Time
-	c := make(chan os.Signal)
+	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
@@ -121,16 +151,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 	var errors uint64
 	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
-	var abort, printCsv bool
+	var exit, abort, printCsv bool
 	csv := log.New(os.Stdout, "", 0)
-	csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+	csv.Println(csvHeader)
 	for {
 		select {
 		case <-tickChan:
 			printCsv = true
 		case <-endChan:
 			printCsv = true
-			abort = true
+			exit = true
 		case <-c:
 			printCsv = true
 			abort = true
@@ -152,7 +182,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 			if rateOut > maxRateOut {
 				maxRateOut = rateOut
 			}
-			csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+			line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
 				time.Now().Format("2006-01-02 15:04:05"),
 				elapsed,
 				bytesIn, rateIn, maxRateIn,
@@ -168,16 +198,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
 				*ServiceURL,
 				*ServiceUUID,
 				*RunTime,
+				*Repeat,
 			)
+			csv.Println(line)
+			if exit {
+				summary += line
+			}
 			printCsv = false
 		}
 		if abort {
 			os.Exit(0)
 		}
+		if exit {
+			close(stopCh)
+			break
+		}
 	}
 }
 
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -194,46 +234,82 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
-		nextBuf <- buf
+		select {
+		case <-stopCh:
+			close(nextBuf)
+			goto done
+		case nextBuf <- buf:
+		}
 	}
+done:
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
-	for buf := range nextBuf {
-		locator, _, err := kc.PutB(buf)
-		if err != nil {
-			stderr.Print(err)
-			errorsChan <- struct{}{}
-			continue
-		}
-		bytesOutChan <- uint64(len(buf))
-		for cap(nextLocator) > len(nextLocator)+*WriteThreads {
-			// Give the readers something to do, unless
-			// they have lots queued up already.
-			nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	for {
+		select {
+		case <-stopCh:
+			goto done
+		case buf := <-nextBuf:
+			locator, _, err := kc.PutB(buf)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
+				continue
+			}
+			select {
+			case <-stopCh:
+				goto done
+			case bytesOutChan <- uint64(len(buf)):
+			}
+			for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+				select {
+				case <-stopCh:
+					goto done
+				case nextLocator <- locator:
+					// Give the readers something to do, unless
+					// they have lots queued up already.
+				}
+			}
 		}
 	}
+done:
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
-	for locator := range nextLocator {
-		rdr, size, url, err := kc.Get(locator)
-		if err != nil {
-			stderr.Print(err)
-			errorsChan <- struct{}{}
-			continue
-		}
-		n, err := io.Copy(ioutil.Discard, rdr)
-		rdr.Close()
-		if n != size || err != nil {
-			stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
-			errorsChan <- struct{}{}
-			continue
-			// Note we don't count the bytes received in
-			// partial/corrupt responses: we are measuring
-			// throughput, not resource consumption.
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator <-chan string, stopCh <-chan struct{}, stderr *log.Logger) {
+	defer wg.Done()
+
+	for {
+		select {
+		case <-stopCh:
+			return
+		case locator := <-nextLocator:
+			rdr, size, url, err := kc.Get(locator)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
+				continue
+			}
+			n, err := io.Copy(ioutil.Discard, rdr)
+			rdr.Close()
+			if n != size || err != nil {
+				stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+				errorsChan <- struct{}{}
+				continue
+				// Note we don't count the bytes received in
+				// partial/corrupt responses: we are measuring
+				// throughput, not resource consumption.
+			}
+			select {
+			case <-stopCh:
+				return
+			case bytesInChan <- uint64(n):
+			}
+		default:
+			stderr.Printf("STARVED FOR BLOCKS TO READ - SLEEPING 100ms!")
+			time.Sleep(100 * time.Millisecond)
 		}
-		bytesInChan <- uint64(n)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list