[ARVADOS] created: 960e574bc6c559e67c12fc6ac844dd7aa6880051

git at public.curoverse.com git at public.curoverse.com
Mon Dec 29 09:22:21 EST 2014


        at  960e574bc6c559e67c12fc6ac844dd7aa6880051 (commit)


commit 960e574bc6c559e67c12fc6ac844dd7aa6880051
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 29 09:23:45 2014 -0500

    4869: Keepstore now returns Content-Length headers, and logs the error message
    sent to the client on errors.

diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a954d2b..3f0b560 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -81,6 +81,13 @@ func TestGetHandler(t *testing.T) {
 		"Unauthenticated request, unsigned locator",
 		string(TEST_BLOCK),
 		response)
+
+	received_cl := response.Header().Get("Content-Length")
+	expected_cl := fmt.Sprintf("%d", len(TEST_BLOCK))
+	if received_cl != expected_cl {
+		t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+	}
+
 	received_xbs := response.Header().Get("X-Block-Size")
 	expected_xbs := fmt.Sprintf("%d", len(TEST_BLOCK))
 	if received_xbs != expected_xbs {
@@ -102,12 +109,19 @@ func TestGetHandler(t *testing.T) {
 		"Authenticated request, signed locator", http.StatusOK, response)
 	ExpectBody(t,
 		"Authenticated request, signed locator", string(TEST_BLOCK), response)
+
 	received_xbs = response.Header().Get("X-Block-Size")
 	expected_xbs = fmt.Sprintf("%d", len(TEST_BLOCK))
 	if received_xbs != expected_xbs {
 		t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs)
 	}
 
+	received_cl = response.Header().Get("Content-Length")
+	expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK))
+	if received_cl != expected_cl {
+		t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+	}
+
 	// Authenticated request, unsigned locator
 	// => PermissionError
 	response = IssueRequest(&RequestTester{
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index bd1ca67..9d49d80 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -175,6 +175,10 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
+	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
+
+	// If/when we support HTTP Range header (#3734), then Content-Length
+	// could be smaller than Block size
 	resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
 
 	_, err = resp.Write(block)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index fd4e234..0598d4c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -7,12 +7,14 @@ import (
 	"github.com/gorilla/mux"
 	"log"
 	"net/http"
+	"strings"
 )
 
 type LoggingResponseWriter struct {
 	Status int
 	Length int
 	http.ResponseWriter
+	Response string
 }
 
 func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
@@ -22,6 +24,9 @@ func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
 
 func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
 	loggingWriter.Length += len(data)
+	if loggingWriter.Status >= 400 {
+		loggingWriter.Response += string(data)
+	}
 	return loggingWriter.ResponseWriter.Write(data)
 }
 
@@ -35,7 +40,11 @@ func MakeLoggingRESTRouter() *LoggingRESTRouter {
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-	loggingWriter := LoggingResponseWriter{200, 0, resp}
+	loggingWriter := LoggingResponseWriter{200, 0, resp, ""}
 	loggingRouter.router.ServeHTTP(&loggingWriter, req)
-	log.Printf("[%s] %s %s %d %d", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
+	if loggingWriter.Status >= 400 {
+		log.Printf("[%s] %s %s %d %d '%s'", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length, strings.TrimSpace(loggingWriter.Response))
+	} else {
+		log.Printf("[%s] %s %s %d %d", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
+	}
 }

commit 51bae325be042ecfc2469eb9838671e1663e6a99
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 29 09:09:13 2014 -0500

    4869: KeepClient now has a default timeout per block request (10 minutes).  In
    keepproxy, the timeout is set to 20 seconds per block.  Also rearranged some
    keepclient and keepproxy logging to provide better information.

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 326c2a0..4733bb7 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -2,11 +2,11 @@
 package keepclient
 
 import (
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"crypto/md5"
 	"errors"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"io"
 	"io/ioutil"
 	"log"
@@ -15,6 +15,7 @@ import (
 	"strings"
 	"sync"
 	"sync/atomic"
+	"time"
 	"unsafe"
 )
 
@@ -47,7 +48,7 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error)
 		Arvados:       arv,
 		Want_replicas: 2,
 		Using_proxy:   false,
-		Client:        &http.Client{Transport: &http.Transport{}}}
+		Client:        &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
 
 	err = (&kc).DiscoverKeepServers()
 
@@ -131,6 +132,10 @@ func (this KeepClient) AuthorizedGet(hash string,
 	timestamp string) (reader io.ReadCloser,
 	contentLength int64, url string, err error) {
 
+	// Take the hash of locator and timestamp in order to identify this
+	// specific transaction in log statements.
+	tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
 	// Calculate the ordering for asking servers
 	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
@@ -150,12 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string,
 
 		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 
+		log.Printf("[%v] Begin download %s", tag, url)
+
 		var resp *http.Response
-		if resp, err = this.Client.Do(req); err != nil {
+		if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+			respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+			response := strings.TrimSpace(string(respbody))
+			log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
+				tag, url, resp.StatusCode, err, response)
 			continue
 		}
 
 		if resp.StatusCode == http.StatusOK {
+			log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
 			return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
 		}
 	}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 26338ca..dc521da 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -87,12 +87,11 @@ type uploadStatus struct {
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
 	upload_status chan<- uploadStatus, expectedLength int64, tag string) {
 
-	log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
-
 	var req *http.Request
 	var err error
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
+		log.Printf("[%v] Error creating request PUT %v error: %v", tag, url, err.Error())
 		upload_status <- uploadStatus{err, url, 0, 0, ""}
 		body.Close()
 		return
@@ -113,8 +112,8 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 	var resp *http.Response
 	if resp, err = this.Client.Do(req); err != nil {
+		log.Printf("[%v] Upload failed %v error: %v", tag, url, err.Error())
 		upload_status <- uploadStatus{err, url, 0, 0, ""}
-		body.Close()
 		return
 	}
 
@@ -127,17 +126,16 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	defer io.Copy(ioutil.Discard, resp.Body)
 
 	respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+	response := strings.TrimSpace(string(respbody))
 	if err2 != nil && err2 != io.EOF {
-		upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
-		return
-	}
-
-	locator := strings.TrimSpace(string(respbody))
-
-	if resp.StatusCode == http.StatusOK {
-		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
+		log.Printf("[%v] Upload %v error: %v response: %v", tag, url, err2.Error(), response)
+		upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+	} else if resp.StatusCode == http.StatusOK {
+		log.Printf("[%v] Upload %v success", tag, url)
+		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
 	} else {
-		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
+		log.Printf("[%v] Upload %v error: %v response: %v", tag, url, resp.StatusCode, response)
+		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
 	}
 }
 
@@ -170,6 +168,7 @@ func (this KeepClient) putReplicas(
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
+				log.Printf("[%v] Begin upload %s to %s", tag, hash, sv[next_server])
 				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
 				next_server += 1
 				active += 1
@@ -181,22 +180,17 @@ func (this KeepClient) putReplicas(
 				}
 			}
 		}
+		log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+			tag, remaining_replicas, active)
 
 		// Now wait for something to happen.
 		status := <-upload_status
-		log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
-			tag, status.url, status.statusCode, remaining_replicas, active)
+		active -= 1
 		if status.statusCode == 200 {
 			// good news!
 			remaining_replicas -= status.replicas_stored
 			locator = status.response
-		} else {
-			// writing to keep server failed for some reason
-			log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
-				tag, status.url, status.statusCode, status.err, status.response)
 		}
-		active -= 1
-
 	}
 
 	return locator, this.Want_replicas, nil
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 888db73..38376c9 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -1,10 +1,10 @@
 package main
 
 import (
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"github.com/gorilla/mux"
 	"io"
 	"io/ioutil"
@@ -30,6 +30,7 @@ func main() {
 		no_get           bool
 		no_put           bool
 		default_replicas int
+		timeout          int
 		pidfile          string
 	)
 
@@ -61,6 +62,12 @@ func main() {
 		2,
 		"Default number of replicas to write if not specified by the client.")
 
+	flagset.IntVar(
+		&timeout,
+		"timeout",
+		20,
+		"Timeout on requests to internal Keep services")
+
 	flagset.StringVar(
 		&pidfile,
 		"pid",
@@ -90,6 +97,7 @@ func main() {
 	}
 
 	kc.Want_replicas = default_replicas
+	kc.Client.Timeout = 20 * time.Second
 
 	listener, err = net.Listen("tcp", listen)
 	if err != nil {
@@ -283,7 +291,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	locator := keepclient.MakeLocator2(hash, hints)
 
-	log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+	log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
 
 	var pass bool
 	var tok string
@@ -308,32 +316,43 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
 	}
 
-	if blocklen > 0 {
+	if blocklen > -1 {
 		resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+	} else {
+		log.Printf("%s: %s %s Keep server did not return Content-Length",
+			GetRemoteAddress(req), req.Method, hash)
 	}
 
+	var status = 0
 	switch err {
 	case nil:
+		status = http.StatusOK
 		if reader != nil {
 			n, err2 := io.Copy(resp, reader)
-			if n != blocklen {
-				log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
+			if blocklen > -1 && n != blocklen {
+				log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
+					GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
 			} else if err2 == nil {
-				log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+				log.Printf("%s: %s %s %v %v",
+					GetRemoteAddress(req), req.Method, hash, status, n)
 			} else {
-				log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+				log.Printf("%s: %s %s %v %v copy error: %v",
+					GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
 			}
 		} else {
-			log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
+			log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
 		}
 	case keepclient.BlockNotFound:
+		status = http.StatusNotFound
 		http.Error(resp, "Not found", http.StatusNotFound)
 	default:
+		status = http.StatusBadGateway
 		http.Error(resp, err.Error(), http.StatusBadGateway)
 	}
 
 	if err != nil {
-		log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+		log.Printf("%s: %s %s %v error: %v",
+			GetRemoteAddress(req), req.Method, hash, status, err.Error())
 	}
 }
 

commit 3c92fedddb8ee6f804940a52955fce72311bac92
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Dec 23 09:55:05 2014 -0500

    4869: Improve logging

diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index e122144..26338ca 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -3,15 +3,16 @@ package keepclient
 
 import (
 	"crypto/md5"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"errors"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"io"
 	"io/ioutil"
 	"log"
 	"net/http"
 	"os"
 	"strings"
+	"time"
 )
 
 type keepDisk struct {
@@ -22,13 +23,13 @@ type keepDisk struct {
 	SvcType  string `json:"service_type"`
 }
 
-func Md5String(s string) (string) {
+func Md5String(s string) string {
 	return fmt.Sprintf("%x", md5.Sum([]byte(s)))
 }
 
 func (this *KeepClient) DiscoverKeepServers() error {
 	if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
-		sr := map[string]string{"proxy":prx}
+		sr := map[string]string{"proxy": prx}
 		this.SetServiceRoots(sr)
 		this.Using_proxy = true
 		return nil
@@ -84,9 +85,9 @@ type uploadStatus struct {
 }
 
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
-	upload_status chan<- uploadStatus, expectedLength int64) {
+	upload_status chan<- uploadStatus, expectedLength int64, tag string) {
 
-	log.Printf("Uploading %s to %s", hash, host)
+	log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
 
 	var req *http.Request
 	var err error
@@ -136,7 +137,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	if resp.StatusCode == http.StatusOK {
 		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
 	} else {
-		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
+		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
 	}
 }
 
@@ -145,6 +146,10 @@ func (this KeepClient) putReplicas(
 	tr *streamer.AsyncStream,
 	expectedLength int64) (locator string, replicas int, err error) {
 
+	// Take the hash of locator and timestamp in order to identify this
+	// specific transaction in log statements.
+	tag := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+
 	// Calculate the ordering for uploading to servers
 	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
 
@@ -159,14 +164,13 @@ func (this KeepClient) putReplicas(
 	defer close(upload_status)
 
 	// Desired number of replicas
-
 	remaining_replicas := this.Want_replicas
 
 	for remaining_replicas > 0 {
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
+				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
 				next_server += 1
 				active += 1
 			} else {
@@ -180,17 +184,19 @@ func (this KeepClient) putReplicas(
 
 		// Now wait for something to happen.
 		status := <-upload_status
+		log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
+			tag, status.url, status.statusCode, remaining_replicas, active)
 		if status.statusCode == 200 {
 			// good news!
 			remaining_replicas -= status.replicas_stored
 			locator = status.response
 		} else {
 			// writing to keep server failed for some reason
-			log.Printf("Keep server put to %v failed with '%v'",
-				status.url, status.err)
+			log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
+				tag, status.url, status.statusCode, status.err, status.response)
 		}
 		active -= 1
-		log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
+
 	}
 
 	return locator, this.Want_replicas, nil

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list