[ARVADOS] created: bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b

git at public.curoverse.com git at public.curoverse.com
Thu Jun 11 17:42:40 EDT 2015


        at  bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b (commit)


commit bc5680ed6869c6e967ee9a1ba564b7b8e2e0fa3b
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Jun 9 18:28:57 2015 -0400

    6272: Add EOF marker at end of index response. Error out of data manager if not received.

diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index 93246bc..8d0e4ea 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -12,6 +12,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/logger"
 	"git.curoverse.com/arvados.git/sdk/go/manifest"
 	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+	"io"
 	"io/ioutil"
 	"log"
 	"net/http"
@@ -328,11 +329,35 @@ func ReadServerResponse(arvLogger *logger.Logger,
 	response.Address = keepServer
 	response.Contents.BlockDigestToInfo =
 		make(map[blockdigest.BlockDigest]BlockInfo)
-	scanner := bufio.NewScanner(resp.Body)
+	reader := bufio.NewReader(resp.Body)
 	numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
-	for scanner.Scan() {
+	for {
 		numLines++
-		blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
+		line, err := reader.ReadString('\n')
+		if err == io.EOF {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Index from %s truncated at line %d",
+					keepServer.String(), numLines))
+		} else if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Error reading index response from %s at line %d: %v",
+					keepServer.String(), numLines, err))
+		}
+		if line == ".\n" {
+			if _, err := reader.Peek(1); err == nil {
+				extra, _ := reader.ReadString('\n')
+				loggerutil.FatalWithMessage(arvLogger,
+					fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
+						keepServer.String(), numLines+1, extra))
+			} else if err != io.EOF {
+				loggerutil.FatalWithMessage(arvLogger,
+					fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
+						keepServer.String(), numLines, err))
+			}
+			numLines--
+			break
+		}
+		blockInfo, err := parseBlockInfoFromIndexLine(line)
 		if err != nil {
 			loggerutil.FatalWithMessage(arvLogger,
 				fmt.Sprintf("Error parsing BlockInfo from index line "+
@@ -377,31 +402,25 @@ func ReadServerResponse(arvLogger *logger.Logger,
 			response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
 		}
 	}
-	if err := scanner.Err(); err != nil {
-		loggerutil.FatalWithMessage(arvLogger,
-			fmt.Sprintf("Received error scanning index response from %s: %v",
-				keepServer.String(),
-				err))
-	} else {
-		log.Printf("%s index contained %d lines with %d duplicates with "+
-			"%d size disagreements",
-			keepServer.String(),
-			numLines,
-			numDuplicates,
-			numSizeDisagreements)
-
-		if arvLogger != nil {
-			now := time.Now()
-			arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-				keepInfo := p["keep_info"].(map[string]interface{})
-				serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-
-				serverInfo["processing_finished_at"] = now
-				serverInfo["lines_received"] = numLines
-				serverInfo["duplicates_seen"] = numDuplicates
-				serverInfo["size_disagreements_seen"] = numSizeDisagreements
-			})
-		}
+
+	log.Printf("%s index contained %d lines with %d duplicates with "+
+		"%d size disagreements",
+		keepServer.String(),
+		numLines,
+		numDuplicates,
+		numSizeDisagreements)
+
+	if arvLogger != nil {
+		now := time.Now()
+		arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+			keepInfo := p["keep_info"].(map[string]interface{})
+			serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+
+			serverInfo["processing_finished_at"] = now
+			serverInfo["lines_received"] = numLines
+			serverInfo["duplicates_seen"] = numDuplicates
+			serverInfo["size_disagreements_seen"] = numSizeDisagreements
+		})
 	}
 	resp.Body.Close()
 	return
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 5bd14be..37150cf 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -377,7 +377,7 @@ func TestIndexHandler(t *testing.T) {
 		response)
 
 	expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
-		TEST_HASH_2 + `\+\d+ \d+\n$`
+		TEST_HASH_2 + `\+\d+ \d+\n\.\n$`
 	match, _ := regexp.MatchString(expected, response.Body.String())
 	if !match {
 		t.Errorf(
@@ -393,7 +393,7 @@ func TestIndexHandler(t *testing.T) {
 		http.StatusOK,
 		response)
 
-	expected = `^` + TEST_HASH + `\+\d+ \d+\n$`
+	expected = `^` + TEST_HASH + `\+\d+ \d+\n\.\n$`
 	match, _ = regexp.MatchString(expected, response.Body.String())
 	if !match {
 		t.Errorf(
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 8930b79..2c4e8f3 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -198,6 +198,9 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
 			return
 		}
 	}
+	// Presence of this EOF marker is the only way the client can
+	// be assured the entire index was received.
+	resp.Write([]byte(".\n"))
 }
 
 // StatusHandler

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list