[ARVADOS] updated: 1.3.0-2770-g47021b72b

Git user git at public.arvados.org
Tue Jul 7 20:57:49 UTC 2020


Summary of changes:
 tools/keep-exercise/keep-exercise.go | 161 ++++++++++++++++++-----------------
 1 file changed, 81 insertions(+), 80 deletions(-)

       via  47021b72b86fcf0a29305c20549f6169eb1ec734 (commit)
      from  199d58a4df0dba1ee71c6816bc3a9d9d439cfd7e (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 47021b72b86fcf0a29305c20549f6169eb1ec734
Author: Ward Vandewege <ward at curii.com>
Date:   Tue Jul 7 16:56:21 2020 -0400

    16585: simplify the code: use a context instead of a channel for the
           timer and catching signals.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 7641465aa..0f658b4d4 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -19,6 +19,7 @@
 package main
 
 import (
+	"context"
 	"crypto/rand"
 	"encoding/binary"
 	"flag"
@@ -100,49 +101,86 @@ func main() {
 		},
 	}
 
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+	go func() {
+		<-sigChan
+		fmt.Print("\r") // Suppress the ^C print
+		cancel()
+	}()
+
 	overrideServices(kc, stderr)
 	csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
 
 	for i := 0; i < *Repeat; i++ {
-		runExperiment(kc, stderr)
-		stderr.Printf("*************************** experiment %d complete ******************************\n", i)
-		summary += fmt.Sprintf(",%d\n", i)
+		if ctx.Err() == nil {
+			runExperiment(ctx, kc, stderr)
+			stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+			summary += fmt.Sprintf(",%d\n", i)
+		}
 	}
 	stderr.Println("Summary:")
 	stderr.Println()
+	fmt.Println()
 	fmt.Println(csvHeader + ",Experiment")
 	fmt.Println(summary)
 }
 
-func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+func runExperiment(ctx context.Context, kc *keepclient.KeepClient, stderr *log.Logger) {
 	var wg sync.WaitGroup
 	var nextLocator atomic.Value
 
-	wg.Add(1)
-	stopCh := make(chan struct{})
+	// Start warmup
+	ready := make(chan struct{})
+	var warmup bool
 	if *ReadThreads > 0 {
+		warmup = true
 		stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
 	}
-	for i := 0; i < *WriteThreads; i++ {
-		nextBuf := make(chan []byte, 1)
-		wg.Add(1)
-		go makeBufs(&wg, nextBuf, i, stopCh, stderr)
-		wg.Add(1)
-		go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
-	}
-	if *ReadThreads > 0 {
-		for nextLocator.Load() == nil {
-			select {
-			case _ = <-bytesOutChan:
+	nextBuf := make(chan []byte, 1)
+	go makeBufs(nextBuf, 0, stderr)
+	if warmup {
+		go func() {
+			locator, _, err := kc.PutB(<-nextBuf)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
 			}
+			nextLocator.Store(locator)
+			stderr.Println("Warmup complete!")
+			close(ready)
+		}()
+	} else {
+		close(ready)
+	}
+	select {
+	case <-ctx.Done():
+		return
+	case <-ready:
+	}
+
+	// Warmup complete
+	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*RunTime))
+	defer cancel()
+
+	wg.Add(1)
+	go countBeans(&wg, ctx, stderr)
+
+	for i := 0; i < *WriteThreads; i++ {
+		if i > 0 {
+			// the makeBufs goroutine with index 0 was already started for the warmup phase, above
+			nextBuf := make(chan []byte, 1)
+			go makeBufs(nextBuf, i, stderr)
 		}
-		stderr.Printf("Warmup complete")
+		go doWrites(ctx, kc, nextBuf, &nextLocator, stderr)
 	}
-	go countBeans(&wg, stopCh, stderr)
 	for i := 0; i < *ReadThreads; i++ {
-		wg.Add(1)
-		go doReads(&wg, kc, &nextLocator, stopCh, stderr)
+		go doReads(ctx, kc, &nextLocator, stderr)
 	}
+
+	// Wait for countBeans to complete, so we get the complete summary line for this run
 	wg.Wait()
 }
 
@@ -153,38 +191,29 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, ctx context.Context, stderr *log.Logger) {
 	defer wg.Done()
 	t0 := time.Now()
 	var tickChan <-chan time.Time
-	var endChan <-chan time.Time
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
 	}
-	if *RunTime > 0 {
-		endChan = time.NewTicker(*RunTime).C
-	}
 	var bytesIn uint64
 	var bytesOut uint64
 	var errors uint64
 	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
-	var exit, abort, printCsv bool
+	var exit, printCsv bool
 	csv := log.New(os.Stdout, "", 0)
+	csv.Println()
 	csv.Println(csvHeader)
 	for {
 		select {
-		case <-tickChan:
-			printCsv = true
-		case <-endChan:
+		case <-ctx.Done():
 			printCsv = true
 			exit = true
-		case <-c:
+		case <-tickChan:
 			printCsv = true
-			abort = true
-			fmt.Print("\r") // Suppress the ^C print
 		case i := <-bytesInChan:
 			bytesIn += i
 		case o := <-bytesOutChan:
@@ -203,7 +232,7 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
 				maxRateOut = rateOut
 			}
 			line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
-				time.Now().Format("2006-01-02 15:04:05"),
+				time.Now().Format("2006/01/02 15:04:05"),
 				elapsed,
 				bytesIn, rateIn, maxRateIn,
 				bytesOut, rateOut, maxRateOut,
@@ -223,21 +252,14 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
 			csv.Println(line)
 			if exit {
 				summary += line
+				return
 			}
 			printCsv = false
 		}
-		if abort {
-			os.Exit(0)
-		}
-		if exit {
-			close(stopCh)
-			break
-		}
 	}
 }
 
-func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
+func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -254,44 +276,27 @@ func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
-		select {
-		case <-stopCh:
-			close(nextBuf)
-			return
-		case nextBuf <- buf:
-		}
+		nextBuf <- buf
 	}
 }
 
-func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
-
-	for {
-		select {
-		case <-stopCh:
-			return
-		case buf := <-nextBuf:
-			locator, _, err := kc.PutB(buf)
-			if err != nil {
-				stderr.Print(err)
-				errorsChan <- struct{}{}
-				continue
-			}
-			select {
-			case <-stopCh:
-				return
-			case bytesOutChan <- uint64(len(buf)):
-			}
-			nextLocator.Store(locator)
+func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stderr *log.Logger) {
+	for ctx.Err() == nil {
+		buf := <-nextBuf
+		locator, _, err := kc.PutB(buf)
+		if err != nil {
+			stderr.Print(err)
+			errorsChan <- struct{}{}
+			continue
 		}
+		bytesOutChan <- uint64(len(buf))
+		nextLocator.Store(locator)
 	}
 }
 
-func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
-
+func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, stderr *log.Logger) {
 	var locator string
-	for {
+	for ctx.Err() == nil {
 		locator = nextLocator.Load().(string)
 		rdr, size, url, err := kc.Get(locator)
 		if err != nil {
@@ -309,11 +314,7 @@ func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.
 			// partial/corrupt responses: we are measuring
 			// throughput, not resource consumption.
 		}
-		select {
-		case <-stopCh:
-			return
-		case bytesInChan <- uint64(n):
-		}
+		bytesInChan <- uint64(n)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list