[ARVADOS] created: bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b
git at public.curoverse.com
git at public.curoverse.com
Thu Jun 11 17:42:40 EDT 2015
at bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b (commit)
commit bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jun 9 18:28:57 2015 -0400
6272: Add EOF marker at end of index response. Error out of data manager if not received.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 93246bc..8d0e4ea 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -12,6 +12,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "io"
"io/ioutil"
"log"
"net/http"
@@ -328,11 +329,35 @@ func ReadServerResponse(arvLogger *logger.Logger,
response.Address = keepServer
response.Contents.BlockDigestToInfo =
make(map[blockdigest.BlockDigest]BlockInfo)
- scanner := bufio.NewScanner(resp.Body)
+ reader := bufio.NewReader(resp.Body)
numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
- for scanner.Scan() {
+ for {
numLines++
- blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
+ line, err := reader.ReadString('\n')
+ if err == io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s truncated at line %d",
+ keepServer.String(), numLines))
+ } else if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Error reading index response from %s at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ if line == ".\n" {
+ if _, err := reader.Peek(1); err == nil {
+ extra, _ := reader.ReadString('\n')
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
+ keepServer.String(), numLines+1, extra))
+ } else if err != io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ numLines--
+ break
+ }
+ blockInfo, err := parseBlockInfoFromIndexLine(line)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Error parsing BlockInfo from index line "+
@@ -377,31 +402,25 @@ func ReadServerResponse(arvLogger *logger.Logger,
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
}
}
- if err := scanner.Err(); err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error scanning index response from %s: %v",
- keepServer.String(),
- err))
- } else {
- log.Printf("%s index contained %d lines with %d duplicates with "+
- "%d size disagreements",
- keepServer.String(),
- numLines,
- numDuplicates,
- numSizeDisagreements)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-
- serverInfo["processing_finished_at"] = now
- serverInfo["lines_received"] = numLines
- serverInfo["duplicates_seen"] = numDuplicates
- serverInfo["size_disagreements_seen"] = numSizeDisagreements
- })
- }
+
+ log.Printf("%s index contained %d lines with %d duplicates with "+
+ "%d size disagreements",
+ keepServer.String(),
+ numLines,
+ numDuplicates,
+ numSizeDisagreements)
+
+ if arvLogger != nil {
+ now := time.Now()
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ keepInfo := p["keep_info"].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+
+ serverInfo["processing_finished_at"] = now
+ serverInfo["lines_received"] = numLines
+ serverInfo["duplicates_seen"] = numDuplicates
+ serverInfo["size_disagreements_seen"] = numSizeDisagreements
+ })
}
resp.Body.Close()
return
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5bd14be..37150cf 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -377,7 +377,7 @@ func TestIndexHandler(t *testing.T) {
response)
expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
- TEST_HASH_2 + `\+\d+ \d+\n$`
+ TEST_HASH_2 + `\+\d+ \d+\n\.\n$`
match, _ := regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
@@ -393,7 +393,7 @@ func TestIndexHandler(t *testing.T) {
http.StatusOK,
response)
- expected = `^` + TEST_HASH + `\+\d+ \d+\n$`
+ expected = `^` + TEST_HASH + `\+\d+ \d+\n\.\n$`
match, _ = regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 8930b79..2c4e8f3 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -198,6 +198,9 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
return
}
}
+ // Presence of this EOF marker is the only way the client can
+ // be assured the entire index was received.
+ resp.Write([]byte(".\n"))
}
// StatusHandler
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list