[ARVADOS] updated: 4d0e87db2bef1f30634d28338e89227b985e6485
git at public.curoverse.com
git at public.curoverse.com
Tue Jun 16 22:30:42 EDT 2015
Summary of changes:
services/datamanager/keep/keep.go | 75 ++++++++++++++++++++++++--------------
services/keepstore/handler_test.go | 4 +-
services/keepstore/handlers.go | 3 ++
3 files changed, 52 insertions(+), 30 deletions(-)
via 4d0e87db2bef1f30634d28338e89227b985e6485 (commit)
via 6f652a6a75b6c17e55cf62a3e0c047595e3aa035 (commit)
from a4d63932d669acd5011a7fa5afcbeec513acfe2c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 4d0e87db2bef1f30634d28338e89227b985e6485
Merge: a4d6393 6f652a6
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jun 16 22:30:03 2015 -0400
Merge branch '6272-index-eof' closes #6272
commit 6f652a6a75b6c17e55cf62a3e0c047595e3aa035
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Jun 16 16:14:58 2015 -0400
6272: Add blank line to indicate index response EOF. Error out of data manager if not received.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 93246bc..912fcf5 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -12,6 +12,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "io"
"io/ioutil"
"log"
"net/http"
@@ -328,11 +329,35 @@ func ReadServerResponse(arvLogger *logger.Logger,
response.Address = keepServer
response.Contents.BlockDigestToInfo =
make(map[blockdigest.BlockDigest]BlockInfo)
- scanner := bufio.NewScanner(resp.Body)
+ reader := bufio.NewReader(resp.Body)
numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
- for scanner.Scan() {
+ for {
numLines++
- blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
+ line, err := reader.ReadString('\n')
+ if err == io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s truncated at line %d",
+ keepServer.String(), numLines))
+ } else if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Error reading index response from %s at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ if line == "\n" {
+ if _, err := reader.Peek(1); err == nil {
+ extra, _ := reader.ReadString('\n')
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
+ keepServer.String(), numLines+1, extra))
+ } else if err != io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ numLines--
+ break
+ }
+ blockInfo, err := parseBlockInfoFromIndexLine(line)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Error parsing BlockInfo from index line "+
@@ -377,31 +402,25 @@ func ReadServerResponse(arvLogger *logger.Logger,
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
}
}
- if err := scanner.Err(); err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error scanning index response from %s: %v",
- keepServer.String(),
- err))
- } else {
- log.Printf("%s index contained %d lines with %d duplicates with "+
- "%d size disagreements",
- keepServer.String(),
- numLines,
- numDuplicates,
- numSizeDisagreements)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-
- serverInfo["processing_finished_at"] = now
- serverInfo["lines_received"] = numLines
- serverInfo["duplicates_seen"] = numDuplicates
- serverInfo["size_disagreements_seen"] = numSizeDisagreements
- })
- }
+
+ log.Printf("%s index contained %d lines with %d duplicates with "+
+ "%d size disagreements",
+ keepServer.String(),
+ numLines,
+ numDuplicates,
+ numSizeDisagreements)
+
+ if arvLogger != nil {
+ now := time.Now()
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ keepInfo := p["keep_info"].(map[string]interface{})
+ serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+
+ serverInfo["processing_finished_at"] = now
+ serverInfo["lines_received"] = numLines
+ serverInfo["duplicates_seen"] = numDuplicates
+ serverInfo["size_disagreements_seen"] = numSizeDisagreements
+ })
}
resp.Body.Close()
return
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 074e499..c181982 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -377,7 +377,7 @@ func TestIndexHandler(t *testing.T) {
response)
expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
- TEST_HASH_2 + `\+\d+ \d+\n$`
+ TEST_HASH_2 + `\+\d+ \d+\n\n$`
match, _ := regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
@@ -393,7 +393,7 @@ func TestIndexHandler(t *testing.T) {
http.StatusOK,
response)
- expected = `^` + TEST_HASH + `\+\d+ \d+\n$`
+ expected = `^` + TEST_HASH + `\+\d+ \d+\n\n$`
match, _ = regexp.MatchString(expected, response.Body.String())
if !match {
t.Errorf(
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 2b437e7..3898b55 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -161,6 +161,9 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
return
}
}
+ // An empty line at EOF is the only way the client can be
+ // assured the entire index was received.
+ resp.Write([]byte{'\n'})
}
// StatusHandler
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list