[ARVADOS] updated: f63e86a3270836ab5e5f1de31e28b686ff809739
git at public.curoverse.com
git at public.curoverse.com
Thu Jul 16 16:12:06 EDT 2015
Summary of changes:
services/datamanager/datamanager.go | 46 ++++++++-----
services/datamanager/keep/keep.go | 20 +++---
services/datamanager/keep/keep_test.go | 92 +++++++++++++++++++++++++
services/datamanager/summary/pull_list_test.go | 12 ++--
services/datamanager/summary/trash_list.go | 30 ++++----
services/datamanager/summary/trash_list_test.go | 8 +--
services/keepstore/trash_worker.go | 2 +-
7 files changed, 160 insertions(+), 50 deletions(-)
create mode 100644 services/datamanager/keep/keep_test.go
via f63e86a3270836ab5e5f1de31e28b686ff809739 (commit)
from 7da85b85af2dd9b22186bfd48190de7b68f75837 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit f63e86a3270836ab5e5f1de31e28b686ff809739
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jul 16 16:12:47 2015 -0400
6221: Add SendTrashLists() test. Propagate errors from BuildTrashList. Style fixes.
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index d7ac0d2..70a9ae7 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -41,19 +41,25 @@ func init() {
func main() {
flag.Parse()
if minutesBetweenRuns == 0 {
- singlerun()
+ err := singlerun()
+ if err != nil {
+ log.Fatalf("Got an error: %v", err)
+ }
} else {
waitTime := time.Minute * time.Duration(minutesBetweenRuns)
for {
log.Println("Beginning Run")
- singlerun()
+ err := singlerun()
+ if err != nil {
+ log.Printf("Got an error: %v", err)
+ }
log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
time.Sleep(waitTime)
}
}
}
-func singlerun() {
+func singlerun() error {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -119,31 +125,39 @@ func singlerun() {
fmt.Sprintf("Error setting up keep client %s", err.Error()))
}
+ // Log that we're finished. We force the recording, since go will
+ // not wait for the write timer before exiting.
+ if arvLogger != nil {
+ defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+ summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+ summaryInfo["block_replication_counts"] = bucketCounts
+ summaryInfo["replication_summary"] = replicationCounts
+ p["summary_info"] = summaryInfo
+
+ p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
+ })
+ }
+
pullServers := summary.ComputePullServers(kc,
&keepServerInfo,
readCollections.BlockToDesiredReplication,
replicationSummary.UnderReplicatedBlocks)
pullLists := summary.BuildPullLists(pullServers)
- trashLists := summary.BuildTrashLists(kc,
+
+ trashLists, trashErr := summary.BuildTrashLists(kc,
&keepServerInfo,
replicationSummary.KeepBlocksNotInCollections)
summary.WritePullLists(arvLogger, pullLists)
- keep.SendTrashLists(arvLogger, kc, trashLists)
- // Log that we're finished. We force the recording, since go will
- // not wait for the write timer before exiting.
- if arvLogger != nil {
- arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
- summaryInfo := logger.GetOrCreateMap(p, "summary_info")
- summaryInfo["block_replication_counts"] = bucketCounts
- summaryInfo["replication_summary"] = replicationCounts
- p["summary_info"] = summaryInfo
-
- p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
- })
+ if trashErr != nil {
+ return err
+ } else {
+ keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
}
+
+ return nil
}
// Returns a data fetcher that fetches data from remote servers.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 871acc8..08e54e6 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -98,7 +98,7 @@ func (s ServerAddress) URL() string {
}
}
-func getDataManagerToken(arvLogger *logger.Logger) string {
+func GetDataManagerToken(arvLogger *logger.Logger) string {
readDataManagerToken := func() {
if dataManagerTokenFile == "" {
flag.Usage()
@@ -308,7 +308,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
}
req.Header.Add("Authorization",
- fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+ fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
return
}
@@ -461,9 +461,11 @@ type TrashRequest struct {
type TrashList []TrashRequest
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) {
count := 0
- rendezvous := make(chan bool)
+ barrier := make(chan bool)
+
+ client := kc.Client
for url, v := range spl {
count += 1
@@ -471,7 +473,7 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
go (func(url string, v TrashList) {
defer (func() {
- rendezvous <- true
+ barrier <- true
})()
pipeReader, pipeWriter := io.Pipe()
@@ -488,16 +490,16 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
}
// Add api token header
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
// Make the request
var resp *http.Response
- if resp, err = kc.Client.Do(req); err != nil {
+ if resp, err = client.Do(req); err != nil {
log.Printf("Error sending trash list to %v error: %v", url, err.Error())
return
}
- log.Printf("Sent trash list to %v: response was HTTP %d", url, resp.Status)
+ log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
@@ -506,6 +508,6 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
}
for i := 0; i < count; i += 1 {
- <-rendezvous
+ <-barrier
}
}
diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go
new file mode 100644
index 0000000..85f704c
--- /dev/null
+++ b/services/datamanager/keep/keep_test.go
@@ -0,0 +1,92 @@
+package keep
+
+import (
+ "encoding/json"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ . "gopkg.in/check.v1"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type KeepSuite struct{}
+
+var _ = Suite(&KeepSuite{})
+
+type TestHandler struct {
+ request TrashList
+}
+
+func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+ r := json.NewDecoder(req.Body)
+ r.Decode(&this.request)
+}
+
+func (s *KeepSuite) TestSendTrashLists(c *C) {
+ th := TestHandler{}
+ server := httptest.NewServer(&th)
+
+ tl := map[string]TrashList{
+ server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+ kc := keepclient.KeepClient{Client: &http.Client{}}
+ kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+ map[string]string{"xxxx": server.URL},
+ map[string]string{})
+
+ SendTrashLists("", &kc, tl)
+ server.Close()
+
+ c.Check(th.request,
+ DeepEquals,
+ tl[server.URL])
+
+}
+
+type TestHandlerError struct {
+}
+
+func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+ http.Error(writer, "I'm a teapot", 405)
+}
+
+func (s *KeepSuite) TestSendTrashListError(c *C) {
+ // Server responds with an error
+
+ th := TestHandlerError{}
+ server := httptest.NewServer(&th)
+
+ tl := map[string]TrashList{
+ server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+ kc := keepclient.KeepClient{Client: &http.Client{}}
+ kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+ map[string]string{"xxxx": server.URL},
+ map[string]string{})
+
+ SendTrashLists("", &kc, tl)
+ server.Close()
+}
+
+func (s *KeepSuite) TestSendTrashListError2(c *C) {
+ // Server is not reachable
+
+ th := TestHandler{}
+ server := httptest.NewServer(&th)
+ server.Close()
+
+ tl := map[string]TrashList{
+ server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+ kc := keepclient.KeepClient{Client: &http.Client{}}
+ kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+ map[string]string{"xxxx": server.URL},
+ map[string]string{})
+
+ SendTrashLists("", &kc, tl)
+}
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
index f22d47d..e2050c2 100644
--- a/services/datamanager/summary/pull_list_test.go
+++ b/services/datamanager/summary/pull_list_test.go
@@ -9,13 +9,13 @@ import (
)
// Gocheck boilerplate
-func Test(t *testing.T) {
+func TestPullLists(t *testing.T) {
TestingT(t)
}
-type MySuite struct{}
+type PullSuite struct{}
-var _ = Suite(&MySuite{})
+var _ = Suite(&PullSuite{})
// Helper method to declare string sets more succinctly
// Could be placed somewhere more general.
@@ -27,7 +27,7 @@ func stringSet(slice ...string) (m map[string]struct{}) {
return
}
-func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
+func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
pl := PullList{PullRequest{
Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
@@ -41,7 +41,7 @@ func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
c.Check(string(b), Equals, expectedOutput)
}
-func (s *MySuite) TestCreatePullServers(c *C) {
+func (s *PullSuite) TestCreatePullServers(c *C) {
var cs CanonicalString
c.Check(
CreatePullServers(cs,
@@ -155,7 +155,7 @@ var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
Params: []string{"obtained", "expected"},
}}
-func (s *MySuite) TestBuildPullLists(c *C) {
+func (s *PullSuite) TestBuildPullLists(c *C) {
c.Check(
BuildPullLists(map[Locator]PullServers{}),
PullListMapEquals,
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index 330cd4e..0bedc9c 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -2,15 +2,16 @@
package summary
import (
+ "errors"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/services/datamanager/keep"
- "log"
"time"
)
func BuildTrashLists(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
- keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
// Servers that are writeable
writableServers := map[string]struct{}{}
@@ -20,8 +21,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
if err != nil {
- log.Printf("Failed to get blobSignatureTtl: %v", err)
- return map[string]keep.TrashList{}
+ return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
}
ttl := int64(_ttl.(float64))
@@ -29,28 +29,30 @@ func BuildTrashLists(kc *keepclient.KeepClient,
// expire unreferenced blocks more than "ttl" seconds old.
expiry := time.Now().UTC().Unix() - ttl
- return BuildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections)
+ return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
}
-func BuildTrashListsInternal(writableServers map[string]struct{},
+func buildTrashListsInternal(writableServers map[string]struct{},
keepServerInfo *keep.ReadServers,
expiry int64,
keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
m = make(map[string]keep.TrashList)
- for block, _ := range keepBlocksNotInCollections {
+ for block := range keepBlocksNotInCollections {
for _, block_on_server := range keepServerInfo.BlockToServers[block] {
- if block_on_server.Mtime < expiry {
- // block is older than expire cutoff
- srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+ if block_on_server.Mtime >= expiry {
+ continue
+ }
- _, writable := writableServers[srv]
+ // block is older than expire cutoff
+ srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
- if writable {
- m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
- }
+ if _, writable := writableServers[srv]; !writable {
+ continue
}
+
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
}
}
return
diff --git a/services/datamanager/summary/trash_list_test.go b/services/datamanager/summary/trash_list_test.go
index b6f9582..7620631 100644
--- a/services/datamanager/summary/trash_list_test.go
+++ b/services/datamanager/summary/trash_list_test.go
@@ -8,7 +8,7 @@ import (
)
// Gocheck boilerplate
-func TrashTest(t *testing.T) {
+func TestTrash(t *testing.T) {
TestingT(t)
}
@@ -38,7 +38,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
bs[block0] = struct{}{}
// Test trash list where only sv0 is on writable list.
- c.Check(BuildTrashListsInternal(
+ c.Check(buildTrashListsInternal(
map[string]struct{}{
sv0.URL(): struct{}{}},
&keepServerInfo,
@@ -49,7 +49,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
"http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
// Test trash list where both sv0 and sv1 are on writable list.
- c.Check(BuildTrashListsInternal(
+ c.Check(buildTrashListsInternal(
map[string]struct{}{
sv0.URL(): struct{}{},
sv1.URL(): struct{}{}},
@@ -62,7 +62,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
"http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
// Test trash list where only block on sv0 is expired
- c.Check(BuildTrashListsInternal(
+ c.Check(buildTrashListsInternal(
map[string]struct{}{
sv0.URL(): struct{}{},
sv1.URL(): struct{}{}},
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 69fbf74..4fbe4bb 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -41,7 +41,7 @@ func TrashItem(trashRequest TrashRequest) {
continue
}
if trashRequest.BlockMtime != mtime.Unix() {
- log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+ log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
continue
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list