[ARVADOS] updated: c79792f08d8faf712f70c67ed1216feabc5d9408
git at public.curoverse.com
git at public.curoverse.com
Mon Nov 9 10:38:39 EST 2015
Summary of changes:
.../install-crunch-dispatch.html.textile.liquid | 2 +-
doc/install/install-keep-web.html.textile.liquid | 13 +--
doc/install/install-keepproxy.html.textile.liquid | 2 +-
sdk/go/arvadostest/fixtures.go | 2 +
sdk/python/tests/run_test_server.py | 2 +-
.../test/fixtures/api_client_authorizations.yml | 12 +++
services/datamanager/collection/collection.go | 83 ++++++++-------
services/datamanager/datamanager.go | 16 ++-
services/datamanager/datamanager_test.go | 27 +++--
services/datamanager/keep/keep.go | 112 ++++++++++++---------
services/datamanager/loggerutil/loggerutil.go | 9 ++
services/datamanager/summary/pull_list.go | 10 +-
services/keep-web/anonymous.go | 23 +++--
services/keep-web/doc.go | 18 ++--
services/keep-web/main.go | 4 +-
services/keep-web/server.go | 2 +-
services/keepproxy/keepproxy_test.go | 2 +
tools/keep-rsync/keep-rsync_test.go | 10 +-
18 files changed, 213 insertions(+), 136 deletions(-)
via c79792f08d8faf712f70c67ed1216feabc5d9408 (commit)
via af6d31cba8346ac86bc0027eb0f675144fb43056 (commit)
via 4ec9919ea6cce4ada252f1e0a0dc521fe27e508c (commit)
via 49fccd5b00230418a0918a33d4fb9154af63220a (commit)
via 8e1e7e6bf355eb0f1defc7278f1434c393a75a75 (commit)
via 05a0e777cdef513f5365d5319fea7af01ae9e7f7 (commit)
via b85cb1ba40385444f9494bbb88248ab65d700c85 (commit)
via 28425a5cf55a26113bdbea53381f86b779d84bf9 (commit)
via 6a8d940762f1b9be1a3a273e13b070d1f75ef8f1 (commit)
via fa96bcea706312be52f1488da3dad34452f5ccf4 (commit)
from 668cf29c1a7f8270ed5ef0bcfbe693618a8155da (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit c79792f08d8faf712f70c67ed1216feabc5d9408
Author: radhika <radhika at curoverse.com>
Date: Mon Nov 9 10:37:18 2015 -0500
7490: Convert several fatalf statements into returning errors. No new tests are added yet, but all the existing tests are passing.
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
index ca03627..55b0b37 100644
--- a/services/datamanager/collection/collection.go
+++ b/services/datamanager/collection/collection.go
@@ -79,26 +79,29 @@ func init() {
// to call multiple times in a single run.
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
+func WriteHeapProfile() error {
if heapProfileFilename != "" {
-
heapProfile, err := os.Create(heapProfileFilename)
if err != nil {
- log.Fatal(err)
+ return err
}
defer heapProfile.Close()
err = pprof.WriteHeapProfile(heapProfile)
- if err != nil {
- log.Fatal(err)
- }
+ return err
}
+
+ return nil
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
- results = GetCollections(params)
+func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
+ results, err := GetCollections(params)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err))
+ }
+
results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
@@ -116,9 +119,9 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
}
// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetCollections() should " +
+ err = fmt.Errorf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
}
@@ -139,26 +142,25 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
var defaultReplicationLevel int
{
- value, err := params.Client.Discovery("defaultCollectionReplication")
+ var value interface{}
+ value, err = params.Client.Discovery("defaultCollectionReplication")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying default collection replication: %v", err))
+ return
}
defaultReplicationLevel = int(value.(float64))
if defaultReplicationLevel <= 0 {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Default collection replication returned by arvados SDK "+
- "should be a positive integer but instead it was %d.",
- defaultReplicationLevel))
+ err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+ "should be a positive integer but instead it was %d.",
+ defaultReplicationLevel)
+ return
}
}
initialNumberOfCollectionsAvailable, err :=
util.NumberItemsAvailable(params.Client, "collections")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collection count: %v", err))
+ return
}
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
@@ -183,22 +185,28 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
// We're still finding new collections
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
+ if err != nil {
+ return
+ }
// Get next batch of collections.
var collections SdkCollectionList
- err := params.Client.List("collections", sdkParams, &collections)
+ err = params.Client.List("collections", sdkParams, &collections)
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collections: %v", err))
+ return
}
// Process collection and update our date filter.
- sdkParams["filters"].([][]string)[0][2] =
- ProcessCollections(params.Logger,
- collections.Items,
- defaultReplicationLevel,
- results.UUIDToCollection).Format(time.RFC3339)
+ var latestModificationDate time.Time
+ latestModificationDate, err = ProcessCollections(params.Logger,
+ collections.Items,
+ defaultReplicationLevel,
+ results.UUIDToCollection)
+ if err != nil {
+ return
+ }
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
@@ -240,7 +248,7 @@ func StrCopy(s string) string {
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
+ UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
@@ -248,13 +256,12 @@ func ProcessCollections(arvLogger *logger.Logger,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf(
- "Arvados SDK collection returned with unexpected zero "+
- "modification date. This probably means that either we failed to "+
- "parse the modification date or the API server has changed how "+
- "it returns modification dates: %+v",
- collection))
+ return latestModificationDate, fmt.Errorf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
+ collection)
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
@@ -278,13 +285,13 @@ func ProcessCollections(arvLogger *logger.Logger,
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- message := fmt.Sprintf(
+ err = fmt.Errorf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
block.Size,
block.Digest)
- loggerutil.FatalWithMessage(arvLogger, message)
+ return
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index b81cf7e..a19d01f 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -130,8 +130,7 @@ func singlerun(arv arvadosclient.ArvadosClient) error {
kc, err := keepclient.MakeKeepClient(&arv)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error setting up keep client %s", err.Error()))
+ return fmt.Errorf("Error setting up keep client %v", err.Error())
}
// Log that we're finished. We force the recording, since go will
@@ -158,7 +157,10 @@ func singlerun(arv arvadosclient.ArvadosClient) error {
&keepServerInfo,
replicationSummary.KeepBlocksNotInCollections)
- summary.WritePullLists(arvLogger, pullLists)
+ err = summary.WritePullLists(arvLogger, pullLists)
+ if err != nil {
+ return err
+ }
if trashErr != nil {
return err
@@ -177,18 +179,24 @@ func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
go func() {
collectionChannel <- collection.GetCollectionsAndSummarize(
+ arvLogger,
collection.GetCollectionsParams{
Client: arv,
Logger: arvLogger,
BatchSize: 50})
}()
- *keepServerInfo = keep.GetKeepServersAndSummarize(
+ var err error
+ *keepServerInfo, err = keep.GetKeepServersAndSummarize(
keep.GetKeepServersParams{
Client: arv,
Logger: arvLogger,
Limit: 1000})
+ if err != nil {
+ return
+ }
+
*readCollections = <-collectionChannel
}
}
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 3a9c21a..20244fa 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -91,8 +91,8 @@ func (s ServerAddress) URL() string {
}
// GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
- results = GetKeepServers(params)
+func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
+ results, err = GetKeepServers(params)
log.Printf("Returned %d keep disks", len(results.ServerToContents))
results.Summarize(params.Logger)
@@ -103,7 +103,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
}
// GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
+func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
sdkParams := arvadosclient.Dict{
"filters": [][]string{[]string{"service_type", "!=", "proxy"}},
}
@@ -112,18 +112,16 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
}
var sdkResponse ServiceList
- err := params.Client.List("keep_services", sdkParams, &sdkResponse)
+ err = params.Client.List("keep_services", sdkParams, &sdkResponse)
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error requesting keep disks from API server: %v", err))
+ return
}
// Currently, only "disk" types are supported. Stop if any other service types are found.
for _, server := range sdkResponse.KeepServers {
if server.ServiceType != "disk" {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Unsupported service type %q found for: %v", server.ServiceType, server))
+ return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server)
}
}
@@ -139,8 +137,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
log.Printf("Received keep services list: %+v", sdkResponse)
if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
+ return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
}
results.KeepServerIndexToAddress = sdkResponse.KeepServers
@@ -174,6 +171,12 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
for i := range sdkResponse.KeepServers {
_ = i // Here to prevent go from complaining.
response := <-responseChan
+
+ // There might have been an error during GetServerContents; so check if the response is empty
+ if response.Address.Host == "" {
+ return results, fmt.Errorf("Error during GetServerContents; no host info found")
+ }
+
log.Printf("Received channel response from %v containing %d files",
response.Address,
len(response.Contents.BlockDigestToInfo))
@@ -194,25 +197,42 @@ func GetServerContents(arvLogger *logger.Logger,
keepServer ServerAddress,
arv arvadosclient.ArvadosClient) (response ServerResponse) {
- GetServerStatus(arvLogger, keepServer, arv)
+ err := GetServerStatus(arvLogger, keepServer, arv)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetServerStatus: %v", err))
+ return ServerResponse{}
+ }
+
+ req, err := CreateIndexRequest(arvLogger, keepServer, arv)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error building CreateIndexRequest: %v", err))
+ return ServerResponse{}
+ }
- req := CreateIndexRequest(arvLogger, keepServer, arv)
resp, err := arv.Client.Do(req)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
+ loggerutil.LogErrorMessage(arvLogger,
fmt.Sprintf("Error fetching %s: %v. Response was %+v",
req.URL.String(),
err,
resp))
+ return ServerResponse{}
+ }
+
+ response, err = ReadServerResponse(arvLogger, keepServer, resp)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger,
+ fmt.Sprintf("Error during ReadServerResponse %v", err))
+ return ServerResponse{}
}
- return ReadServerResponse(arvLogger, keepServer, resp)
+ return
}
// GetServerStatus get keep server status by invoking /status.json
func GetServerStatus(arvLogger *logger.Logger,
keepServer ServerAddress,
- arv arvadosclient.ArvadosClient) {
+ arv arvadosclient.ArvadosClient) error {
url := fmt.Sprintf("http://%s:%d/status.json",
keepServer.Host,
keepServer.Port)
@@ -232,13 +252,11 @@ func GetServerStatus(arvLogger *logger.Logger,
resp, err := arv.Client.Get(url)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error getting keep status from %s: %v", url, err))
+ return fmt.Errorf("Error getting keep status from %s: %v", url, err)
} else if resp.StatusCode != 200 {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error code %d in response to request "+
- "for %s status: %s",
- resp.StatusCode, url, resp.Status))
+ return fmt.Errorf("Received error code %d in response to request "+
+ "for %s status: %s",
+ resp.StatusCode, url, resp.Status)
}
var keepStatus map[string]interface{}
@@ -246,8 +264,7 @@ func GetServerStatus(arvLogger *logger.Logger,
decoder.UseNumber()
err = decoder.Decode(&keepStatus)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
+ return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
}
if arvLogger != nil {
@@ -259,12 +276,14 @@ func GetServerStatus(arvLogger *logger.Logger,
serverInfo["status"] = keepStatus
})
}
+
+ return nil
}
// CreateIndexRequest to the keep server
func CreateIndexRequest(arvLogger *logger.Logger,
keepServer ServerAddress,
- arv arvadosclient.ArvadosClient) (req *http.Request) {
+ arv arvadosclient.ArvadosClient) (req *http.Request, err error) {
url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
log.Println("About to fetch keep server contents from " + url)
@@ -277,26 +296,24 @@ func CreateIndexRequest(arvLogger *logger.Logger,
})
}
- req, err := http.NewRequest("GET", url, nil)
+ req, err = http.NewRequest("GET", url, nil)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error building http request for %s: %v", url, err))
+ return req, fmt.Errorf("Error building http request for %s: %v", url, err)
}
req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
- return
+ return req, err
}
// ReadServerResponse reads reasponse from keep server
func ReadServerResponse(arvLogger *logger.Logger,
keepServer ServerAddress,
- resp *http.Response) (response ServerResponse) {
+ resp *http.Response) (response ServerResponse, err error) {
if resp.StatusCode != 200 {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error code %d in response to request "+
- "for %s index: %s",
- resp.StatusCode, keepServer.String(), resp.Status))
+ return response, fmt.Errorf("Received error code %d in response to request "+
+ "for %s index: %s",
+ resp.StatusCode, keepServer.String(), resp.Status)
}
if arvLogger != nil {
@@ -317,35 +334,30 @@ func ReadServerResponse(arvLogger *logger.Logger,
numLines++
line, err := reader.ReadString('\n')
if err == io.EOF {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s truncated at line %d",
- keepServer.String(), numLines))
+ return response, fmt.Errorf("Index from %s truncated at line %d",
+ keepServer.String(), numLines)
} else if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error reading index response from %s at line %d: %v",
- keepServer.String(), numLines, err))
+ return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
+ keepServer.String(), numLines, err)
}
if line == "\n" {
if _, err := reader.Peek(1); err == nil {
extra, _ := reader.ReadString('\n')
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
- keepServer.String(), numLines+1, extra))
+ return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
+ keepServer.String(), numLines+1, extra)
} else if err != io.EOF {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
- keepServer.String(), numLines, err))
+ return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
+ keepServer.String(), numLines, err)
}
numLines--
break
}
blockInfo, err := parseBlockInfoFromIndexLine(line)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error parsing BlockInfo from index line "+
- "received from %s: %v",
- keepServer.String(),
- err))
+ return response, fmt.Errorf("Error parsing BlockInfo from index line "+
+ "received from %s: %v",
+ keepServer.String(),
+ err)
}
if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
diff --git a/services/datamanager/loggerutil/loggerutil.go b/services/datamanager/loggerutil/loggerutil.go
index 8c655cd..453f7e1 100644
--- a/services/datamanager/loggerutil/loggerutil.go
+++ b/services/datamanager/loggerutil/loggerutil.go
@@ -50,3 +50,12 @@ func FatalWithMessage(arvLogger *logger.Logger, message string) {
log.Fatalf(message)
}
+
+func LogErrorMessage(arvLogger *logger.Logger, message string) {
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ runInfo := logger.GetOrCreateMap(p, "run_info")
+ runInfo["ERROR"] = message
+ })
+ }
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index cc01249..107abf6 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -9,7 +9,6 @@ import (
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
"strings"
@@ -176,23 +175,22 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
// This is just a hack for prototyping, it is not expected to be used
// in production.
func WritePullLists(arvLogger *logger.Logger,
- pullLists map[string]PullList) {
+ pullLists map[string]PullList) error {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", filename, err))
+ return err
}
defer pullListFile.Close()
enc := json.NewEncoder(pullListFile)
err = enc.Encode(list)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
+ return err
}
log.Printf("Wrote pull list to %s.", filename)
}
+ return nil
}
commit af6d31cba8346ac86bc0027eb0f675144fb43056
Merge: 668cf29 4ec9919
Author: radhika <radhika at curoverse.com>
Date: Fri Nov 6 10:35:54 2015 -0500
Merge branch 'master' into 7490-datamanager-dont-die-return-error
Conflicts:
services/datamanager/datamanager_test.go
diff --cc services/datamanager/datamanager_test.go
index 6ceb1e6,685f94c..26a2fbf
--- a/services/datamanager/datamanager_test.go
+++ b/services/datamanager/datamanager_test.go
@@@ -33,11 -28,8 +28,12 @@@ func SetupDataManagerTest(t *testing.T
arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
- arv = makeArvadosClient()
+ var err error
+ arv, err = makeArvadosClient()
+ if err != nil {
+ t.Fatalf("Error making arvados client: %s", err)
+ }
+ arv.ApiToken = arvadostest.DataManagerToken
// keep client
keepClient = &keepclient.KeepClient{
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list