[ARVADOS] updated: f63e86a3270836ab5e5f1de31e28b686ff809739

git at public.curoverse.com git at public.curoverse.com
Thu Jul 16 16:12:06 EDT 2015


Summary of changes:
 services/datamanager/datamanager.go             | 46 ++++++++-----
 services/datamanager/keep/keep.go               | 20 +++---
 services/datamanager/keep/keep_test.go          | 92 +++++++++++++++++++++++++
 services/datamanager/summary/pull_list_test.go  | 12 ++--
 services/datamanager/summary/trash_list.go      | 30 ++++----
 services/datamanager/summary/trash_list_test.go |  8 +--
 services/keepstore/trash_worker.go              |  2 +-
 7 files changed, 160 insertions(+), 50 deletions(-)
 create mode 100644 services/datamanager/keep/keep_test.go

       via  f63e86a3270836ab5e5f1de31e28b686ff809739 (commit)
      from  7da85b85af2dd9b22186bfd48190de7b68f75837 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f63e86a3270836ab5e5f1de31e28b686ff809739
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jul 16 16:12:47 2015 -0400

    6221: Add SendTrashLists() test.  Propagate errors from BuildTrashList.  Style fixes.

diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index d7ac0d2..70a9ae7 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -41,19 +41,25 @@ func init() {
 func main() {
 	flag.Parse()
 	if minutesBetweenRuns == 0 {
-		singlerun()
+		err := singlerun()
+		if err != nil {
+			log.Fatalf("Got an error: %v", err)
+		}
 	} else {
 		waitTime := time.Minute * time.Duration(minutesBetweenRuns)
 		for {
 			log.Println("Beginning Run")
-			singlerun()
+			err := singlerun()
+			if err != nil {
+				log.Printf("Got an error: %v", err)
+			}
 			log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
 			time.Sleep(waitTime)
 		}
 	}
 }
 
-func singlerun() {
+func singlerun() error {
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -119,31 +125,39 @@ func singlerun() {
 			fmt.Sprintf("Error setting up keep client %s", err.Error()))
 	}
 
+	// Log that we're finished. We force the recording, since go will
+	// not wait for the write timer before exiting.
+	if arvLogger != nil {
+		defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
+			summaryInfo := logger.GetOrCreateMap(p, "summary_info")
+			summaryInfo["block_replication_counts"] = bucketCounts
+			summaryInfo["replication_summary"] = replicationCounts
+			p["summary_info"] = summaryInfo
+
+			p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
+		})
+	}
+
 	pullServers := summary.ComputePullServers(kc,
 		&keepServerInfo,
 		readCollections.BlockToDesiredReplication,
 		replicationSummary.UnderReplicatedBlocks)
 
 	pullLists := summary.BuildPullLists(pullServers)
-	trashLists := summary.BuildTrashLists(kc,
+
+	trashLists, trashErr := summary.BuildTrashLists(kc,
 		&keepServerInfo,
 		replicationSummary.KeepBlocksNotInCollections)
 
 	summary.WritePullLists(arvLogger, pullLists)
-	keep.SendTrashLists(arvLogger, kc, trashLists)
 
-	// Log that we're finished. We force the recording, since go will
-	// not wait for the write timer before exiting.
-	if arvLogger != nil {
-		arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-			summaryInfo := logger.GetOrCreateMap(p, "summary_info")
-			summaryInfo["block_replication_counts"] = bucketCounts
-			summaryInfo["replication_summary"] = replicationCounts
-			p["summary_info"] = summaryInfo
-
-			p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
-		})
+	if trashErr != nil {
+		return err
+	} else {
+		keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists)
 	}
+
+	return nil
 }
 
 // Returns a data fetcher that fetches data from remote servers.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 871acc8..08e54e6 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -98,7 +98,7 @@ func (s ServerAddress) URL() string {
 	}
 }
 
-func getDataManagerToken(arvLogger *logger.Logger) string {
+func GetDataManagerToken(arvLogger *logger.Logger) string {
 	readDataManagerToken := func() {
 		if dataManagerTokenFile == "" {
 			flag.Usage()
@@ -308,7 +308,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
 	}
 
 	req.Header.Add("Authorization",
-		fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+		fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger)))
 	return
 }
 
@@ -461,9 +461,11 @@ type TrashRequest struct {
 
 type TrashList []TrashRequest
 
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) {
 	count := 0
-	rendezvous := make(chan bool)
+	barrier := make(chan bool)
+
+	client := kc.Client
 
 	for url, v := range spl {
 		count += 1
@@ -471,7 +473,7 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
 
 		go (func(url string, v TrashList) {
 			defer (func() {
-				rendezvous <- true
+				barrier <- true
 			})()
 
 			pipeReader, pipeWriter := io.Pipe()
@@ -488,16 +490,16 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
 			}
 
 			// Add api token header
-			req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+			req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken))
 
 			// Make the request
 			var resp *http.Response
-			if resp, err = kc.Client.Do(req); err != nil {
+			if resp, err = client.Do(req); err != nil {
 				log.Printf("Error sending trash list to %v error: %v", url, err.Error())
 				return
 			}
 
-			log.Printf("Sent trash list to %v: response was HTTP %d", url, resp.Status)
+			log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
 
 			io.Copy(ioutil.Discard, resp.Body)
 			resp.Body.Close()
@@ -506,6 +508,6 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map
 	}
 
 	for i := 0; i < count; i += 1 {
-		<-rendezvous
+		<-barrier
 	}
 }
diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go
new file mode 100644
index 0000000..85f704c
--- /dev/null
+++ b/services/datamanager/keep/keep_test.go
@@ -0,0 +1,92 @@
+package keep
+
+import (
+	"encoding/json"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	. "gopkg.in/check.v1"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+	TestingT(t)
+}
+
+type KeepSuite struct{}
+
+var _ = Suite(&KeepSuite{})
+
+type TestHandler struct {
+	request TrashList
+}
+
+func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+	r := json.NewDecoder(req.Body)
+	r.Decode(&this.request)
+}
+
+func (s *KeepSuite) TestSendTrashLists(c *C) {
+	th := TestHandler{}
+	server := httptest.NewServer(&th)
+
+	tl := map[string]TrashList{
+		server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+	kc := keepclient.KeepClient{Client: &http.Client{}}
+	kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+		map[string]string{"xxxx": server.URL},
+		map[string]string{})
+
+	SendTrashLists("", &kc, tl)
+	server.Close()
+
+	c.Check(th.request,
+		DeepEquals,
+		tl[server.URL])
+
+}
+
+type TestHandlerError struct {
+}
+
+func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
+	http.Error(writer, "I'm a teapot", 405)
+}
+
+func (s *KeepSuite) TestSendTrashListError(c *C) {
+	// Server responds with an error
+
+	th := TestHandlerError{}
+	server := httptest.NewServer(&th)
+
+	tl := map[string]TrashList{
+		server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+	kc := keepclient.KeepClient{Client: &http.Client{}}
+	kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+		map[string]string{"xxxx": server.URL},
+		map[string]string{})
+
+	SendTrashLists("", &kc, tl)
+	server.Close()
+}
+
+func (s *KeepSuite) TestSendTrashListError2(c *C) {
+	// Server is not reachable
+
+	th := TestHandler{}
+	server := httptest.NewServer(&th)
+	server.Close()
+
+	tl := map[string]TrashList{
+		server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+
+	kc := keepclient.KeepClient{Client: &http.Client{}}
+	kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
+		map[string]string{"xxxx": server.URL},
+		map[string]string{})
+
+	SendTrashLists("", &kc, tl)
+}
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
index f22d47d..e2050c2 100644
--- a/services/datamanager/summary/pull_list_test.go
+++ b/services/datamanager/summary/pull_list_test.go
@@ -9,13 +9,13 @@ import (
 )
 
 // Gocheck boilerplate
-func Test(t *testing.T) {
+func TestPullLists(t *testing.T) {
 	TestingT(t)
 }
 
-type MySuite struct{}
+type PullSuite struct{}
 
-var _ = Suite(&MySuite{})
+var _ = Suite(&PullSuite{})
 
 // Helper method to declare string sets more succinctly
 // Could be placed somewhere more general.
@@ -27,7 +27,7 @@ func stringSet(slice ...string) (m map[string]struct{}) {
 	return
 }
 
-func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
+func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
 	pl := PullList{PullRequest{
 		Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
 		Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
@@ -41,7 +41,7 @@ func (s *MySuite) TestPullListPrintsJSONCorrectly(c *C) {
 	c.Check(string(b), Equals, expectedOutput)
 }
 
-func (s *MySuite) TestCreatePullServers(c *C) {
+func (s *PullSuite) TestCreatePullServers(c *C) {
 	var cs CanonicalString
 	c.Check(
 		CreatePullServers(cs,
@@ -155,7 +155,7 @@ var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
 	Params: []string{"obtained", "expected"},
 }}
 
-func (s *MySuite) TestBuildPullLists(c *C) {
+func (s *PullSuite) TestBuildPullLists(c *C) {
 	c.Check(
 		BuildPullLists(map[Locator]PullServers{}),
 		PullListMapEquals,
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index 330cd4e..0bedc9c 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -2,15 +2,16 @@
 package summary
 
 import (
+	"errors"
+	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/services/datamanager/keep"
-	"log"
 	"time"
 )
 
 func BuildTrashLists(kc *keepclient.KeepClient,
 	keepServerInfo *keep.ReadServers,
-	keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
+	keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
 
 	// Servers that are writeable
 	writableServers := map[string]struct{}{}
@@ -20,8 +21,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 
 	_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
 	if err != nil {
-		log.Printf("Failed to get blobSignatureTtl: %v", err)
-		return map[string]keep.TrashList{}
+		return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
 	}
 
 	ttl := int64(_ttl.(float64))
@@ -29,28 +29,30 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 	// expire unreferenced blocks more than "ttl" seconds old.
 	expiry := time.Now().UTC().Unix() - ttl
 
-	return BuildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections)
+	return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
 }
 
-func BuildTrashListsInternal(writableServers map[string]struct{},
+func buildTrashListsInternal(writableServers map[string]struct{},
 	keepServerInfo *keep.ReadServers,
 	expiry int64,
 	keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
 
 	m = make(map[string]keep.TrashList)
 
-	for block, _ := range keepBlocksNotInCollections {
+	for block := range keepBlocksNotInCollections {
 		for _, block_on_server := range keepServerInfo.BlockToServers[block] {
-			if block_on_server.Mtime < expiry {
-				// block is older than expire cutoff
-				srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+			if block_on_server.Mtime >= expiry {
+				continue
+			}
 
-				_, writable := writableServers[srv]
+			// block is older than expire cutoff
+			srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
 
-				if writable {
-					m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
-				}
+			if _, writable := writableServers[srv]; !writable {
+				continue
 			}
+
+			m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
 		}
 	}
 	return
diff --git a/services/datamanager/summary/trash_list_test.go b/services/datamanager/summary/trash_list_test.go
index b6f9582..7620631 100644
--- a/services/datamanager/summary/trash_list_test.go
+++ b/services/datamanager/summary/trash_list_test.go
@@ -8,7 +8,7 @@ import (
 )
 
 // Gocheck boilerplate
-func TrashTest(t *testing.T) {
+func TestTrash(t *testing.T) {
 	TestingT(t)
 }
 
@@ -38,7 +38,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
 	bs[block0] = struct{}{}
 
 	// Test trash list where only sv0 is on writable list.
-	c.Check(BuildTrashListsInternal(
+	c.Check(buildTrashListsInternal(
 		map[string]struct{}{
 			sv0.URL(): struct{}{}},
 		&keepServerInfo,
@@ -49,7 +49,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
 			"http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
 
 	// Test trash list where both sv0 and sv1 are on writable list.
-	c.Check(BuildTrashListsInternal(
+	c.Check(buildTrashListsInternal(
 		map[string]struct{}{
 			sv0.URL(): struct{}{},
 			sv1.URL(): struct{}{}},
@@ -62,7 +62,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
 			"http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
 
 	// Test trash list where only block on sv0 is expired
-	c.Check(BuildTrashListsInternal(
+	c.Check(buildTrashListsInternal(
 		map[string]struct{}{
 			sv0.URL(): struct{}{},
 			sv1.URL(): struct{}{}},
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 69fbf74..4fbe4bb 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -41,7 +41,7 @@ func TrashItem(trashRequest TrashRequest) {
 			continue
 		}
 		if trashRequest.BlockMtime != mtime.Unix() {
-			log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+			log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
 			continue
 		}
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list