[ARVADOS] updated: 1.3.0-2655-gcba1b4145

Git user git at public.arvados.org
Wed Jun 17 16:36:18 UTC 2020


Summary of changes:
 tools/keep-exercise/keep-exercise.go | 96 +++++++++++++++++++++---------------
 1 file changed, 55 insertions(+), 41 deletions(-)

       via  cba1b4145e8fcc57a851839f77fd020e5aaff722 (commit)
      from  e710f1b2da3095d6152ac7f6ed1ffab8bfc2c0c7 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit cba1b4145e8fcc57a851839f77fd020e5aaff722
Author: Ward Vandewege <ward at curii.com>
Date:   Wed Jun 17 12:31:39 2020 -0400

    16513: switch to CSV output only, send CSV to stdout and logs to stderr,
           abort on ctrl-c, print final CSV line when ending.
    
    Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward at curii.com>

diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index d06b1eb18..163291c23 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -28,6 +28,8 @@ import (
 	"log"
 	"net/http"
 	"os"
+	"os/signal"
+	"syscall"
 	"time"
 
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -60,15 +62,15 @@ func main() {
 		os.Exit(0)
 	}
 
-	log.Printf("keep-exercise %s started", version)
+	stderr := log.New(os.Stderr, "", log.LstdFlags)
 
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
-		log.Fatal(err)
+		stderr.Fatal(err)
 	}
 	kc, err := keepclient.MakeKeepClient(arv)
 	if err != nil {
-		log.Fatal(err)
+		stderr.Fatal(err)
 	}
 	kc.Want_replicas = *Replicas
 
@@ -79,18 +81,18 @@ func main() {
 		Transport: &transport,
 	}
 
-	overrideServices(kc)
+	overrideServices(kc, stderr)
 
 	nextLocator := make(chan string, *ReadThreads+*WriteThreads)
 
-	go countBeans(nextLocator)
+	go countBeans(nextLocator, stderr)
 	for i := 0; i < *WriteThreads; i++ {
 		nextBuf := make(chan []byte, 1)
-		go makeBufs(nextBuf, i)
-		go doWrites(kc, nextBuf, nextLocator)
+		go makeBufs(nextBuf, i, stderr)
+		go doWrites(kc, nextBuf, nextLocator, stderr)
 	}
 	for i := 0; i < *ReadThreads; i++ {
-		go doReads(kc, nextLocator)
+		go doReads(kc, nextLocator, stderr)
 	}
 	<-make(chan struct{})
 }
@@ -102,10 +104,12 @@ var bytesOutChan = make(chan uint64)
 // Send struct{}{} to errorsChan when an error happens.
 var errorsChan = make(chan struct{})
 
-func countBeans(nextLocator chan string) {
+func countBeans(nextLocator chan string, stderr *log.Logger) {
 	t0 := time.Now()
 	var tickChan <-chan time.Time
 	var endChan <-chan time.Time
+	c := make(chan os.Signal)
+	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 	if *StatsInterval > 0 {
 		tickChan = time.NewTicker(*StatsInterval).C
 	}
@@ -115,31 +119,44 @@ func countBeans(nextLocator chan string) {
 	var bytesIn uint64
 	var bytesOut uint64
 	var errors uint64
+	var rateIn, rateOut float64
 	var maxRateIn, maxRateOut float64
+	var abort, printCsv bool
+	csv := log.New(os.Stdout, "", 0)
+	csv.Println("Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime")
 	for {
 		select {
 		case <-tickChan:
+			printCsv = true
+		case <-endChan:
+			printCsv = true
+			abort = true
+		case <-c:
+			printCsv = true
+			abort = true
+			fmt.Print("\r") // Suppress the ^C print
+		case i := <-bytesInChan:
+			bytesIn += i
+		case o := <-bytesOutChan:
+			bytesOut += o
+		case <-errorsChan:
+			errors++
+		}
+		if printCsv {
 			elapsed := time.Since(t0)
-			if float64(bytesIn)/elapsed.Seconds()/1048576 > maxRateIn {
-				maxRateIn = float64(bytesIn) / elapsed.Seconds() / 1048576
+			rateIn = float64(bytesIn) / elapsed.Seconds() / 1048576
+			if rateIn > maxRateIn {
+				maxRateIn = rateIn
 			}
-			if float64(bytesOut)/elapsed.Seconds()/1048576 > maxRateOut {
-				maxRateOut = float64(bytesOut) / elapsed.Seconds() / 1048576
+			rateOut = float64(bytesOut) / elapsed.Seconds() / 1048576
+			if rateOut > maxRateOut {
+				maxRateOut = rateOut
 			}
-			log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
-				elapsed,
-				bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
-				bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
-				errors,
-			)
-		case <-endChan:
-			elapsed := time.Since(t0)
-			log.Println("\nSummary:")
-			log.Println("Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime\n")
-			log.Printf("%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+			csv.Printf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s",
+				time.Now().Format("2006-01-02 15:04:05"),
 				elapsed,
-				bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576), maxRateIn,
-				bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576), maxRateOut,
+				bytesIn, rateIn, maxRateIn,
+				bytesOut, rateOut, maxRateOut,
 				errors,
 				*ReadThreads,
 				*WriteThreads,
@@ -152,18 +169,15 @@ func countBeans(nextLocator chan string) {
 				*ServiceUUID,
 				*RunTime,
 			)
+			printCsv = false
+		}
+		if abort {
 			os.Exit(0)
-		case i := <-bytesInChan:
-			bytesIn += i
-		case o := <-bytesOutChan:
-			bytesOut += o
-		case <-errorsChan:
-			errors++
 		}
 	}
 }
 
-func makeBufs(nextBuf chan<- []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) {
 	buf := make([]byte, *BlockSize)
 	if *VaryThread {
 		binary.PutVarint(buf, int64(threadID))
@@ -176,7 +190,7 @@ func makeBufs(nextBuf chan<- []byte, threadID int) {
 		if *VaryRequest {
 			rnd := make([]byte, randSize)
 			if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
-				log.Fatal(err)
+				stderr.Fatal(err)
 			}
 			buf = append(rnd, buf[randSize:]...)
 		}
@@ -184,11 +198,11 @@ func makeBufs(nextBuf chan<- []byte, threadID int) {
 	}
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string, stderr *log.Logger) {
 	for buf := range nextBuf {
 		locator, _, err := kc.PutB(buf)
 		if err != nil {
-			log.Print(err)
+			stderr.Print(err)
 			errorsChan <- struct{}{}
 			continue
 		}
@@ -201,18 +215,18 @@ func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan
 	}
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string, stderr *log.Logger) {
 	for locator := range nextLocator {
 		rdr, size, url, err := kc.Get(locator)
 		if err != nil {
-			log.Print(err)
+			stderr.Print(err)
 			errorsChan <- struct{}{}
 			continue
 		}
 		n, err := io.Copy(ioutil.Discard, rdr)
 		rdr.Close()
 		if n != size || err != nil {
-			log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+			stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
 			errorsChan <- struct{}{}
 			continue
 			// Note we don't count the bytes received in
@@ -223,7 +237,7 @@ func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
 	}
 }
 
-func overrideServices(kc *keepclient.KeepClient) {
+func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) {
 	roots := make(map[string]string)
 	if *ServiceURL != "" {
 		roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
@@ -235,7 +249,7 @@ func overrideServices(kc *keepclient.KeepClient) {
 			}
 		}
 		if len(roots) == 0 {
-			log.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
+			stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
 		}
 	} else {
 		return

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list