[ARVADOS] updated: 1.3.0-2770-gd7b9e50e1

Git user git at public.arvados.org
Tue Jul 7 21:37:43 UTC 2020


Summary of changes:
 tools/keep-exercise/keep-exercise.go | 37 +++++++++++++-----------------------
 1 file changed, 13 insertions(+), 24 deletions(-)

  discards  47021b72b86fcf0a29305c20549f6169eb1ec734 (commit)
       via  d7b9e50e1ac6c3432f9e5c87bffd872b5f6fd572 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (47021b72b86fcf0a29305c20549f6169eb1ec734)
            \
             N -- N -- N (d7b9e50e1ac6c3432f9e5c87bffd872b5f6fd572)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d7b9e50e1ac6c3432f9e5c87bffd872b5f6fd572
Author: Ward Vandewege <ward at curii.com>
Date:   Tue Jul 7 16:56:21 2020 -0400

    16585: simplify the code: use a context instead of a channel for the
           timer and catching signals.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 7641465aa..19d46efbd 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -19,6 +19,7 @@
 package main
 
 import (
+	"context"
 	"crypto/rand"
 	"encoding/binary"
 	"flag"
@@ -29,7 +30,6 @@ import (
 	"net/http"
 	"os"
 	"os/signal"
-	"sync"
 	"sync/atomic"
 	"syscall"
 	"time"
@@ -56,8 +56,12 @@ var (
 	Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
 )
 
-var summary string
-var csvHeader string
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
 
 func main() {
 	flag.Parse()
@@ -100,91 +104,104 @@ func main() {
 		},
 	}
 
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	sigChan := make(chan os.Signal, 1)
+	signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+	go func() {
+		<-sigChan
+		fmt.Print("\r") // Suppress the ^C print
+		cancel()
+	}()
+
 	overrideServices(kc, stderr)
-	csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+	csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+	var summary string
 
 	for i := 0; i < *Repeat; i++ {
-		runExperiment(kc, stderr)
-		stderr.Printf("*************************** experiment %d complete ******************************\n", i)
-		summary += fmt.Sprintf(",%d\n", i)
+		if ctx.Err() == nil {
+			summary = runExperiment(ctx, kc, summary, csvHeader, stderr)
+			stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+			summary += fmt.Sprintf(",%d\n", i)
+		}
 	}
 	stderr.Println("Summary:")
 	stderr.Println()
+	fmt.Println()
 	fmt.Println(csvHeader + ",Experiment")
 	fmt.Println(summary)
 }
 
-func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
-	var wg sync.WaitGroup
+func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary string, csvHeader string, stderr *log.Logger) (newSummary string) {
+	newSummary = summary
 	var nextLocator atomic.Value
 
-	wg.Add(1)
-	stopCh := make(chan struct{})
+	// Start warmup
+	ready := make(chan struct{})
+	var warmup bool
 	if *ReadThreads > 0 {
+		warmup = true
 		stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
 	}
-	for i := 0; i < *WriteThreads; i++ {
-		nextBuf := make(chan []byte, 1)
-		wg.Add(1)
-		go makeBufs(&wg, nextBuf, i, stopCh, stderr)
-		wg.Add(1)
-		go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
-	}
-	if *ReadThreads > 0 {
-		for nextLocator.Load() == nil {
-			select {
-			case _ = <-bytesOutChan:
+	nextBuf := make(chan []byte, 1)
+	go makeBufs(nextBuf, 0, stderr)
+	if warmup {
+		go func() {
+			locator, _, err := kc.PutB(<-nextBuf)
+			if err != nil {
+				stderr.Print(err)
+				errorsChan <- struct{}{}
 			}
-		}
-		stderr.Printf("Warmup complete")
+			nextLocator.Store(locator)
+			stderr.Println("Warmup complete!")
+			close(ready)
+		}()
+	} else {
+		close(ready)
 	}
-	go countBeans(&wg, stopCh, stderr)
-	for i := 0; i < *ReadThreads; i++ {
-		wg.Add(1)
-		go doReads(&wg, kc, &nextLocator, stopCh, stderr)
+	select {
+	case <-ctx.Done():
+		return
+	case <-ready:
 	}
-	wg.Wait()
-}
 
-// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
-var bytesInChan = make(chan uint64)
-var bytesOutChan = make(chan uint64)
+	// Warmup complete
+	ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*RunTime))
+	defer cancel()
 
-// Send struct{}{} to errorsChan when an error happens.
-var errorsChan = make(chan struct{})
+	for i := 0; i < *WriteThreads; i++ {
+		if i > 0 {
+			// the makeBufs goroutine with index 0 was already started for the warmup phase, above
+			nextBuf := make(chan []byte, 1)
+			go makeBufs(nextBuf, i, stderr)
+		}
+		go doWrites(ctx, kc, nextBuf, &nextLocator, stderr)
+	}
+	for i := 0; i < *ReadThreads; i++ {
+		go doReads(ctx, kc, &nextLocator, stderr)
+	}
 
-func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
 	t0 := time.Now()
 	var tickChan <-chan time.Time
-	var endChan <-chan time.Time
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
 	}
-	if *RunTime > 0 {
-		endChan = time.NewTicker(*RunTime).C
-	}
 	var bytesIn uint64
 	var bytesOut uint64
 	var errors uint64
 	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
-	var exit, abort, printCsv bool
+	var exit, printCsv bool
 	csv := log.New(os.Stdout, "", 0)
+	csv.Println()
 	csv.Println(csvHeader)
 	for {
 		select {
-		case <-tickChan:
-			printCsv = true
-		case <-endChan:
+		case <-ctx.Done():
 			printCsv = true
 			exit = true
-		case <-c:
+		case <-tickChan:
 			printCsv = true
-			abort = true
-			fmt.Print("\r") // Suppress the ^C print
 		case i := <-bytesInChan:
 			bytesIn += i
 		case o := <-bytesOutChan:
@@ -203,7 +220,7 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
 				maxRateOut = rateOut
 			}
 			line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
-				time.Now().Format("2006-01-02 15:04:05"),
+				time.Now().Format("2006/01/02 15:04:05"),
 				elapsed,
 				bytesIn, rateIn, maxRateIn,
 				bytesOut, rateOut, maxRateOut,
@@ -222,22 +239,16 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
 			)
 			csv.Println(line)
 			if exit {
-				summary += line
+				newSummary += line
+				return
 			}
 			printCsv = false
 		}
-		if abort {
-			os.Exit(0)
-		}
-		if exit {
-			close(stopCh)
-			break
-		}
 	}
+	return
 }
 
-func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
+func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -254,44 +265,27 @@ func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
-		select {
-		case <-stopCh:
-			close(nextBuf)
-			return
-		case nextBuf <- buf:
-		}
+		nextBuf <- buf
 	}
 }
 
-func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
-
-	for {
-		select {
-		case <-stopCh:
-			return
-		case buf := <-nextBuf:
-			locator, _, err := kc.PutB(buf)
-			if err != nil {
-				stderr.Print(err)
-				errorsChan <- struct{}{}
-				continue
-			}
-			select {
-			case <-stopCh:
-				return
-			case bytesOutChan <- uint64(len(buf)):
-			}
-			nextLocator.Store(locator)
+func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stderr *log.Logger) {
+	for ctx.Err() == nil {
+		buf := <-nextBuf
+		locator, _, err := kc.PutB(buf)
+		if err != nil {
+			stderr.Print(err)
+			errorsChan <- struct{}{}
+			continue
 		}
+		bytesOutChan <- uint64(len(buf))
+		nextLocator.Store(locator)
 	}
 }
 
-func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-	defer wg.Done()
-
+func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, stderr *log.Logger) {
 	var locator string
-	for {
+	for ctx.Err() == nil {
 		locator = nextLocator.Load().(string)
 		rdr, size, url, err := kc.Get(locator)
 		if err != nil {
@@ -309,11 +303,7 @@ func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.
 			// partial/corrupt responses: we are measuring
 			// throughput, not resource consumption.
 		}
-		select {
-		case <-stopCh:
-			return
-		case bytesInChan <- uint64(n):
-		}
+		bytesInChan <- uint64(n)
 	}
 }
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list