[ARVADOS] updated: eb2e48747b0444c1e8019168db319a6e63615e2c

git at public.curoverse.com git at public.curoverse.com
Tue Oct 6 13:43:06 EDT 2015


Summary of changes:
 sdk/go/keepclient/keepclient.go    | 20 +++++++++-----------
 sdk/go/keepclient/support.go       | 12 +++++++++++-
 tools/keepexercise/.gitignore      |  1 +
 tools/keepexercise/keepexercise.go | 27 ++++++++++++++++++---------
 4 files changed, 39 insertions(+), 21 deletions(-)
 create mode 100644 tools/keepexercise/.gitignore

  discards  862e1f6e58c5efd0e25e6e1877929f2e43d1dddc (commit)
       via  eb2e48747b0444c1e8019168db319a6e63615e2c (commit)
       via  30c6d7b342e9eed6757b51c0784238b957e3cfa5 (commit)
       via  aebae0a86c0b911820c43a59265d408603442b3c (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (862e1f6e58c5efd0e25e6e1877929f2e43d1dddc)
            \
             N -- N -- N (eb2e48747b0444c1e8019168db319a6e63615e2c)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit eb2e48747b0444c1e8019168db319a6e63615e2c
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Oct 6 13:48:05 2015 -0400

    7410: Ensure status channel stays open until all upload workers finish.

diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 63800b1..51e3e08 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -228,7 +228,17 @@ func (this KeepClient) putReplicas(
 
 	// Used to communicate status from the upload goroutines
 	upload_status := make(chan uploadStatus)
-	defer close(upload_status)
+	defer func() {
+		// Wait for any abandoned uploads (e.g., we started
+		// two uploads and the first replied with replicas=2)
+		// to finish before closing the status channel.
+		go func() {
+			for active > 0 {
+				<-upload_status
+			}
+			close(upload_status)
+		}()
+	}()
 
 	// Desired number of replicas
 	remaining_replicas := this.Want_replicas

commit 30c6d7b342e9eed6757b51c0784238b957e3cfa5
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Oct 6 13:39:12 2015 -0400

    7410: Fix error handling/reporting in keepclient/GET

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 53dfb2b..8b7cf41 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -140,21 +140,19 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
 		url := host + "/" + locator
 		req, err := http.NewRequest("GET", url, nil)
 		if err != nil {
+			errs = append(errs, fmt.Sprintf("%s: %v", url, err))
 			continue
 		}
 		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
 		resp, err := kc.Client.Do(req)
-		if err != nil || resp.StatusCode != http.StatusOK {
-			if resp != nil {
-				var respbody []byte
-				if resp.Body != nil {
-					respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
-				}
-				errs = append(errs, fmt.Sprintf("%s: %d %s",
-					url, resp.StatusCode, strings.TrimSpace(string(respbody))))
-			} else {
-				errs = append(errs, fmt.Sprintf("%s: %v", url, err))
-			}
+		if err != nil {
+			errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+			continue
+		} else if resp.StatusCode != http.StatusOK {
+			respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+			resp.Body.Close()
+			errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+				url, resp.StatusCode, bytes.TrimSpace(respbody)))
 			continue
 		}
 		return HashCheckingReader{

commit aebae0a86c0b911820c43a59265d408603442b3c
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Oct 2 19:54:36 2015 -0400

    7410: Add keepexercise

diff --git a/tools/keepexercise/.gitignore b/tools/keepexercise/.gitignore
new file mode 100644
index 0000000..f8f29eb
--- /dev/null
+++ b/tools/keepexercise/.gitignore
@@ -0,0 +1 @@
+keepexercise
diff --git a/tools/keepexercise/keepexercise.go b/tools/keepexercise/keepexercise.go
new file mode 100644
index 0000000..48627e3
--- /dev/null
+++ b/tools/keepexercise/keepexercise.go
@@ -0,0 +1,153 @@
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+	"crypto/rand"
+	"encoding/binary"
+	"flag"
+	"io"
+	"io/ioutil"
+	"log"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+var BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+var ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers")
+var WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers")
+var VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+var VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+var Replicas = flag.Int("replicas", 1, "replication level for writing")
+var StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+
+func main() {
+	flag.Parse()
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		log.Fatal(err)
+	}
+	kc, err := keepclient.MakeKeepClient(&arv)
+	if err != nil {
+		log.Fatal(err)
+	}
+	kc.Want_replicas = *Replicas
+
+	nextBuf := make(chan []byte, *WriteThreads)
+	nextLocator := make(chan string, *ReadThreads + *WriteThreads)
+
+	go countBeans(nextLocator)
+	for i := 0; i < *WriteThreads; i++ {
+		go makeBufs(nextBuf, i)
+		go doWrites(kc, nextBuf, nextLocator)
+	}
+	for i := 0; i < *ReadThreads; i++ {
+		go doReads(kc, nextLocator)
+	}
+	<-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+	t0 := time.Now()
+	var tickChan <-chan time.Time
+	if *StatsInterval > 0 {
+		tickChan = time.NewTicker(*StatsInterval).C
+	}
+	var bytesIn uint64
+	var bytesOut uint64
+	var errors uint64
+	for {
+		select {
+		case <-tickChan:
+			elapsed := time.Since(t0)
+			log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+				elapsed,
+				bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+				bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+				errors,
+			)
+		case i := <-bytesInChan:
+			bytesIn += i
+		case o := <-bytesOutChan:
+			bytesOut += o
+		case <-errorsChan:
+			errors++
+		}
+	}
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+	buf := make([]byte, *BlockSize)
+	if *VaryThread {
+		binary.PutVarint(buf, int64(threadID))
+	}
+	for {
+		if *VaryRequest {
+			if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+				log.Fatal(err)
+			}
+		}
+		nextBuf <- buf
+	}
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+	for buf := range nextBuf {
+		locator, _, err := kc.PutB(buf)
+		if err != nil {
+			log.Print(err)
+			errorsChan <- struct{}{}
+			continue
+		}
+		bytesOutChan <- uint64(len(buf))
+		for cap(nextLocator) > len(nextLocator) + *WriteThreads {
+			// Give the readers something to do, unless
+			// they have lots queued up already.
+			nextLocator <- locator
+		}
+	}
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+	for locator := range nextLocator {
+		rdr, size, url, err := kc.Get(locator)
+		if err != nil {
+			log.Print(err)
+			errorsChan <- struct{}{}
+			continue
+		}
+		n, err := io.Copy(ioutil.Discard, rdr)
+		rdr.Close()
+		if n != size || err != nil {
+			log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+			errorsChan <- struct{}{}
+			continue
+			// Note we don't count the bytes received in
+			// partial/corrupt responses: we are measuring
+			// throughput, not resource consumption.
+		}
+		bytesInChan <- uint64(n)
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list