[ARVADOS] created: 1.3.0-2750-g44f7b64ea
Git user
git at public.arvados.org
Mon Jul 6 13:45:53 UTC 2020
at 44f7b64ea522898dcef0e4b096b1f5b98140c322 (commit)
commit 44f7b64ea522898dcef0e4b096b1f5b98140c322
Author: Ward Vandewege <ward at curii.com>
Date: Mon Jul 6 09:37:32 2020 -0400
16585: Add --repeat argument to keep-exercise, which automatically repeats
an expirement N times. Add loud warning when the read thread(s) are starved
for blocks to read.
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>
diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 163291c23..8d0d86653 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -29,6 +29,7 @@ import (
"net/http"
"os"
"os/signal"
+ "sync"
"syscall"
"time"
@@ -51,8 +52,12 @@ var (
ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
getVersion = flag.Bool("version", false, "Print version information and exit.")
RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
+ Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
)
+var summary string
+var csvHeader string
+
func main() {
flag.Parse()
@@ -74,27 +79,51 @@ func main() {
}
kc.Want_replicas = *Replicas
- transport := *(http.DefaultTransport.(*http.Transport))
- transport.TLSClientConfig = arvadosclient.MakeTLSConfig(arv.ApiInsecure)
kc.HTTPClient = &http.Client{
- Timeout: 10 * time.Minute,
- Transport: &transport,
+ Timeout: 10 * time.Minute,
+ // It's not safe to copy *http.DefaultTransport
+ // because it has a mutex (which might be locked)
+ // protecting a private map (which might not be nil).
+ // So we build our own, using the Go 1.12 default
+ // values.
+ Transport: &http.Transport{
+ TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
+ },
}
overrideServices(kc, stderr)
+ csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+
+ for i := 0; i < *Repeat; i++ {
+ runExperiment(kc, stderr)
+ stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+ summary += fmt.Sprintf(",%d\n", i)
+ }
+ stderr.Println("Summary:")
+ stderr.Println()
+ fmt.Println(csvHeader + ",Experiment")
+ fmt.Println(summary)
+}
+func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
+ var wg sync.WaitGroup
nextLocator := make(chan string, *ReadThreads+*WriteThreads)
- go countBeans(nextLocator, stderr)
+ wg.Add(1)
+ stopCh := make(chan struct{})
+ go countBeans(&wg, nextLocator, stopCh, stderr)
for i := 0; i < *WriteThreads; i++ {
nextBuf := make(chan []byte, 1)
- go makeBufs(nextBuf, i, stderr)
- go doWrites(kc, nextBuf, nextLocator, stderr)
+ wg.Add(1)
+ go makeBufs(&wg, nextBuf, i, stopCh, stderr)
+ wg.Add(1)
+ go doWrites(&wg, kc, nextBuf, nextLocator, stopCh, stderr)
}
for i := 0; i < *ReadThreads; i++ {
- go doReads(kc, nextLocator, stderr)
+ wg.Add(1)
+ go doReads(&wg, kc, nextLocator, stopCh, stderr)
}
- <-make(chan struct{})
+ wg.Wait()
}
// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
@@ -104,11 +133,12 @@ var bytesOutChan = make(chan uint64)
// Send struct{}{} to errorsChan when an error happens.
var errorsChan = make(chan struct{})
-func countBeans(nextLocator chan string, stderr *log.Logger) {
+func countBeans(wg *sync.WaitGroup, nextLocator chan string, stopCh chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
t0 := time.Now()
var tickChan <-chan time.Time
var endChan <-chan time.Time
- c := make(chan os.Signal)
+ c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
if *StatsInterval > 0 {
tickChan = time.NewTicker(*StatsInterval).C
@@ -121,16 +151,16 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
var errors uint64
var rateIn, rateOut float64
var maxRateIn, maxRateOut float64
- var abort, printCsv bool
+ var exit, abort, printCsv bool
csv := log.New(os.Stdout, "", 0)
- csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
+ csv.Println(csvHeader)
for {
select {
case <-tickChan:
printCsv = true
case <-endChan:
printCsv = true
- abort = true
+ exit = true
case <-c:
printCsv = true
abort = true
@@ -152,7 +182,7 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
if rateOut > maxRateOut {
maxRateOut = rateOut
}
- csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+ line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
time.Now().Format("2006-01-02 15:04:05"),
elapsed,
bytesIn, rateIn, maxRateIn,
@@ -168,16 +198,26 @@ func countBeans(nextLocator chan string, stderr *log.Logger) {
*ServiceURL,
*ServiceUUID,
*RunTime,
+ *Repeat,
)
+ csv.Println(line)
+ if exit {
+ summary += line
+ }
printCsv = false
}
if abort {
os.Exit(0)
}
+ if exit {
+ close(stopCh)
+ break
+ }
}
}
-func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
+func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
buf := make([]byte, *BlockSize)
if *VaryThread {
binary.PutVarint(buf, int64(threadID))
@@ -194,46 +234,82 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
}
buf = append(rnd, buf[randSize:]...)
}
- nextBuf <- buf
+ select {
+ case <-stopCh:
+ close(nextBuf)
+ goto done
+ case nextBuf <- buf:
+ }
}
+done:
}
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
- for buf := range nextBuf {
- locator, _, err := kc.PutB(buf)
- if err != nil {
- stderr.Print(err)
- errorsChan <- struct{}{}
- continue
- }
- bytesOutChan <- uint64(len(buf))
- for cap(nextLocator) > len(nextLocator)+*WriteThreads {
- // Give the readers something to do, unless
- // they have lots queued up already.
- nextLocator <- locator
+func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
+
+ for {
+ select {
+ case <-stopCh:
+ goto done
+ case buf := <-nextBuf:
+ locator, _, err := kc.PutB(buf)
+ if err != nil {
+ stderr.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ select {
+ case <-stopCh:
+ goto done
+ case bytesOutChan <- uint64(len(buf)):
+ }
+ for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+ select {
+ case <-stopCh:
+ goto done
+ case nextLocator <- locator:
+ // Give the readers something to do, unless
+ // they have lots queued up already.
+ }
+ }
}
}
+done:
}
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
- for locator := range nextLocator {
- rdr, size, url, err := kc.Get(locator)
- if err != nil {
- stderr.Print(err)
- errorsChan <- struct{}{}
- continue
- }
- n, err := io.Copy(ioutil.Discard, rdr)
- rdr.Close()
- if n != size || err != nil {
- stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
- errorsChan <- struct{}{}
- continue
- // Note we don't count the bytes received in
- // partial/corrupt responses: we are measuring
- // throughput, not resource consumption.
+func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator <-chan string, stopCh <-chan struct{}, stderr *log.Logger) {
+ defer wg.Done()
+
+ for {
+ select {
+ case <-stopCh:
+ return
+ case locator := <-nextLocator:
+ rdr, size, url, err := kc.Get(locator)
+ if err != nil {
+ stderr.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ n, err := io.Copy(ioutil.Discard, rdr)
+ rdr.Close()
+ if n != size || err != nil {
+ stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+ errorsChan <- struct{}{}
+ continue
+ // Note we don't count the bytes received in
+ // partial/corrupt responses: we are measuring
+ // throughput, not resource consumption.
+ }
+ select {
+ case <-stopCh:
+ return
+ case bytesInChan <- uint64(n):
+ }
+ default:
+ stderr.Printf("STARVED FOR BLOCKS TO READ - SLEEPING 100ms!")
+ time.Sleep(100 * time.Millisecond)
}
- bytesInChan <- uint64(n)
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list