[ARVADOS] updated: 7a6a9670682aa6532457b9e0bba7e21a3e4cfb3d
git at public.curoverse.com
git at public.curoverse.com
Thu Oct 8 15:07:23 EDT 2015
Summary of changes:
.../nodemanager/arvnodeman/computenode/__init__.py | 13 ++++
.../arvnodeman/computenode/dispatch/__init__.py | 21 ++++---
.../arvnodeman/computenode/dispatch/slurm.py | 8 ++-
.../arvnodeman/computenode/driver/__init__.py | 4 ++
.../arvnodeman/computenode/driver/azure.py | 7 +++
services/nodemanager/arvnodeman/daemon.py | 20 +++++-
services/nodemanager/setup.py | 4 +-
.../nodemanager/tests/test_computenode_dispatch.py | 51 ++++++++++++----
.../tests/test_computenode_dispatch_slurm.py | 2 +-
services/nodemanager/tests/test_daemon.py | 71 +++++++++++++++++++++-
10 files changed, 174 insertions(+), 27 deletions(-)
discards 05512d3b4fab8ea17bd66cb92fffefeabefa5d86 (commit)
discards eb2e48747b0444c1e8019168db319a6e63615e2c (commit)
discards 30c6d7b342e9eed6757b51c0784238b957e3cfa5 (commit)
discards aebae0a86c0b911820c43a59265d408603442b3c (commit)
via 7a6a9670682aa6532457b9e0bba7e21a3e4cfb3d (commit)
via ee5d4be791ff85a5eb446d3e0dbaaae87f69e868 (commit)
via 3a7dea8c7a0f0cd98d258a3815b0f012a9ca08a0 (commit)
via 71db992331f357fdb3a4fdbca42a9952b7e9ae2c (commit)
via 053de78cd0599647dfc40bff3252d2f17d959217 (commit)
via 05b52b297b30d075ef2409a123f7d096c1156cf8 (commit)
via 1f8d81d0eeda07c3cedcaad3e942ec8dedd461cb (commit)
via d5f5f869d46f9096c7c680d608c1cc654d1d7fa0 (commit)
via 11df73b96ae395fca11b4006253475046e3b74cc (commit)
via 2e919859109fe27d552b81b13d47aed61e80eca6 (commit)
via be81c03a3c26f365eba35b91e4f0827244a02ef7 (commit)
via c0f33379c7fd062fc097ecef92808334e821cb6b (commit)
via f6aa7c0c8c84b85b550d73117c6fdbd663a38c4c (commit)
via 560c318fdc49835b03f96af35774fbbfa7984fe7 (commit)
via 72e3566f2cdacd44f095183ebf88f7aab8b0d8dc (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (05512d3b4fab8ea17bd66cb92fffefeabefa5d86)
\
N -- N -- N (7a6a9670682aa6532457b9e0bba7e21a3e4cfb3d)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7a6a9670682aa6532457b9e0bba7e21a3e4cfb3d
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Oct 6 13:48:05 2015 -0400
7410: Ensure status channel stays open until all upload workers finish.
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 63800b1..51e3e08 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -228,7 +228,17 @@ func (this KeepClient) putReplicas(
// Used to communicate status from the upload goroutines
upload_status := make(chan uploadStatus)
- defer close(upload_status)
+ defer func() {
+ // Wait for any abandoned uploads (e.g., we started
+ // two uploads and the first replied with replicas=2)
+ // to finish before closing the status channel.
+ go func() {
+ for active > 0 {
+ <-upload_status
+ }
+ close(upload_status)
+ }()
+ }()
// Desired number of replicas
remaining_replicas := this.Want_replicas
commit ee5d4be791ff85a5eb446d3e0dbaaae87f69e868
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Oct 6 13:39:12 2015 -0400
7410: Fix error handling/reporting in keepclient/GET
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 53dfb2b..8b7cf41 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -140,21 +140,19 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
url := host + "/" + locator
req, err := http.NewRequest("GET", url, nil)
if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
continue
}
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
resp, err := kc.Client.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- if resp != nil {
- var respbody []byte
- if resp.Body != nil {
- respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
- }
- errs = append(errs, fmt.Sprintf("%s: %d %s",
- url, resp.StatusCode, strings.TrimSpace(string(respbody))))
- } else {
- errs = append(errs, fmt.Sprintf("%s: %v", url, err))
- }
+ if err != nil {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
+ continue
+ } else if resp.StatusCode != http.StatusOK {
+ respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ resp.Body.Close()
+ errs = append(errs, fmt.Sprintf("%s: HTTP %d %q",
+ url, resp.StatusCode, bytes.TrimSpace(respbody)))
continue
}
return HashCheckingReader{
commit 3a7dea8c7a0f0cd98d258a3815b0f012a9ca08a0
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Oct 2 19:54:36 2015 -0400
7410: Add keepexercise
diff --git a/tools/keepexercise/.gitignore b/tools/keepexercise/.gitignore
new file mode 100644
index 0000000..f8f29eb
--- /dev/null
+++ b/tools/keepexercise/.gitignore
@@ -0,0 +1 @@
+keepexercise
diff --git a/tools/keepexercise/keepexercise.go b/tools/keepexercise/keepexercise.go
new file mode 100644
index 0000000..fda0d5f
--- /dev/null
+++ b/tools/keepexercise/keepexercise.go
@@ -0,0 +1,154 @@
+// Testing tool for Keep services.
+//
+// keepexercise helps measure throughput and test reliability under
+// various usage patterns.
+//
+// By default, it reads and writes blocks containing 2^26 NUL
+// bytes. This generates network traffic without consuming much disk
+// space.
+//
+// For a more realistic test, enable -vary-request. Warning: this will
+// fill your storage volumes with random data if you leave it running,
+// which can cost you money or leave you with too little room for
+// useful data.
+//
+package main
+
+import (
+ "crypto/rand"
+ "encoding/binary"
+ "flag"
+ "io"
+ "io/ioutil"
+ "log"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+var BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
+var ReadThreads = flag.Int("rthreads", 1, "number of concurrent readers")
+var WriteThreads = flag.Int("wthreads", 1, "number of concurrent writers")
+var VaryRequest = flag.Bool("vary-request", false, "vary the data for each request: consumes disk space, exercises write behavior")
+var VaryThread = flag.Bool("vary-thread", false, "use -wthreads different data blocks")
+var Replicas = flag.Int("replicas", 1, "replication level for writing")
+var StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
+
+func main() {
+ flag.Parse()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc, err := keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ log.Fatal(err)
+ }
+ kc.Want_replicas = *Replicas
+ kc.Client.Timeout = 10 * time.Minute
+
+ nextBuf := make(chan []byte, *WriteThreads)
+ nextLocator := make(chan string, *ReadThreads+*WriteThreads)
+
+ go countBeans(nextLocator)
+ for i := 0; i < *WriteThreads; i++ {
+ go makeBufs(nextBuf, i)
+ go doWrites(kc, nextBuf, nextLocator)
+ }
+ for i := 0; i < *ReadThreads; i++ {
+ go doReads(kc, nextLocator)
+ }
+ <-make(chan struct{})
+}
+
+// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+var bytesInChan = make(chan uint64)
+var bytesOutChan = make(chan uint64)
+
+// Send struct{}{} to errorsChan when an error happens.
+var errorsChan = make(chan struct{})
+
+func countBeans(nextLocator chan string) {
+ t0 := time.Now()
+ var tickChan <-chan time.Time
+ if *StatsInterval > 0 {
+ tickChan = time.NewTicker(*StatsInterval).C
+ }
+ var bytesIn uint64
+ var bytesOut uint64
+ var errors uint64
+ for {
+ select {
+ case <-tickChan:
+ elapsed := time.Since(t0)
+ log.Printf("%v elapsed: read %v bytes (%.1f MiB/s), wrote %v bytes (%.1f MiB/s), errors %d",
+ elapsed,
+ bytesIn, (float64(bytesIn) / elapsed.Seconds() / 1048576),
+ bytesOut, (float64(bytesOut) / elapsed.Seconds() / 1048576),
+ errors,
+ )
+ case i := <-bytesInChan:
+ bytesIn += i
+ case o := <-bytesOutChan:
+ bytesOut += o
+ case <-errorsChan:
+ errors++
+ }
+ }
+}
+
+func makeBufs(nextBuf chan []byte, threadID int) {
+ buf := make([]byte, *BlockSize)
+ if *VaryThread {
+ binary.PutVarint(buf, int64(threadID))
+ }
+ for {
+ if *VaryRequest {
+ if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ log.Fatal(err)
+ }
+ }
+ nextBuf <- buf
+ }
+}
+
+func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+ for buf := range nextBuf {
+ locator, _, err := kc.PutB(buf)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ bytesOutChan <- uint64(len(buf))
+ for cap(nextLocator) > len(nextLocator)+*WriteThreads {
+ // Give the readers something to do, unless
+ // they have lots queued up already.
+ nextLocator <- locator
+ }
+ }
+}
+
+func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+ for locator := range nextLocator {
+ rdr, size, url, err := kc.Get(locator)
+ if err != nil {
+ log.Print(err)
+ errorsChan <- struct{}{}
+ continue
+ }
+ n, err := io.Copy(ioutil.Discard, rdr)
+ rdr.Close()
+ if n != size || err != nil {
+ log.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+ errorsChan <- struct{}{}
+ continue
+ // Note we don't count the bytes received in
+ // partial/corrupt responses: we are measuring
+ // throughput, not resource consumption.
+ }
+ bytesInChan <- uint64(n)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list