[ARVADOS] created: 960e574bc6c559e67c12fc6ac844dd7aa6880051
git at public.curoverse.com
git at public.curoverse.com
Mon Dec 29 09:22:21 EST 2014
at 960e574bc6c559e67c12fc6ac844dd7aa6880051 (commit)
commit 960e574bc6c559e67c12fc6ac844dd7aa6880051
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 29 09:23:45 2014 -0500
4869: Keepstore now returns Content-Length headers, and logs the error message
sent to the client on errors.
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index a954d2b..3f0b560 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -81,6 +81,13 @@ func TestGetHandler(t *testing.T) {
"Unauthenticated request, unsigned locator",
string(TEST_BLOCK),
response)
+
+ received_cl := response.Header().Get("Content-Length")
+ expected_cl := fmt.Sprintf("%d", len(TEST_BLOCK))
+ if received_cl != expected_cl {
+ t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+ }
+
received_xbs := response.Header().Get("X-Block-Size")
expected_xbs := fmt.Sprintf("%d", len(TEST_BLOCK))
if received_xbs != expected_xbs {
@@ -102,12 +109,19 @@ func TestGetHandler(t *testing.T) {
"Authenticated request, signed locator", http.StatusOK, response)
ExpectBody(t,
"Authenticated request, signed locator", string(TEST_BLOCK), response)
+
received_xbs = response.Header().Get("X-Block-Size")
expected_xbs = fmt.Sprintf("%d", len(TEST_BLOCK))
if received_xbs != expected_xbs {
t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs)
}
+ received_cl = response.Header().Get("Content-Length")
+ expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK))
+ if received_cl != expected_cl {
+ t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
+ }
+
// Authenticated request, unsigned locator
// => PermissionError
response = IssueRequest(&RequestTester{
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index bd1ca67..9d49d80 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -175,6 +175,10 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
return
}
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
+
+ // If/when we support HTTP Range header (#3734), then Content-Length
+ // could be smaller than Block size
resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
_, err = resp.Write(block)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index fd4e234..0598d4c 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -7,12 +7,14 @@ import (
"github.com/gorilla/mux"
"log"
"net/http"
+ "strings"
)
type LoggingResponseWriter struct {
Status int
Length int
http.ResponseWriter
+ Response string
}
func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
@@ -22,6 +24,9 @@ func (loggingWriter *LoggingResponseWriter) WriteHeader(code int) {
func (loggingWriter *LoggingResponseWriter) Write(data []byte) (int, error) {
loggingWriter.Length += len(data)
+ if loggingWriter.Status >= 400 {
+ loggingWriter.Response += string(data)
+ }
return loggingWriter.ResponseWriter.Write(data)
}
@@ -35,7 +40,11 @@ func MakeLoggingRESTRouter() *LoggingRESTRouter {
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- loggingWriter := LoggingResponseWriter{200, 0, resp}
+ loggingWriter := LoggingResponseWriter{200, 0, resp, ""}
loggingRouter.router.ServeHTTP(&loggingWriter, req)
- log.Printf("[%s] %s %s %d %d", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
+ if loggingWriter.Status >= 400 {
+ log.Printf("[%s] %s %s %d %d '%s'", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length, strings.TrimSpace(loggingWriter.Response))
+ } else {
+ log.Printf("[%s] %s %s %d %d", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
+ }
}
commit 51bae325be042ecfc2469eb9838671e1663e6a99
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Dec 29 09:09:13 2014 -0500
4869: KeepClient now has a default timeout per block request (10 minutes). In
keepproxy, the timeout is set to 20 seconds per block. Also rearranged some
keepclient and keepproxy logging to provide better information.
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 326c2a0..4733bb7 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -2,11 +2,11 @@
package keepclient
import (
- "git.curoverse.com/arvados.git/sdk/go/streamer"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"crypto/md5"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"log"
@@ -15,6 +15,7 @@ import (
"strings"
"sync"
"sync/atomic"
+ "time"
"unsafe"
)
@@ -47,7 +48,7 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error)
Arvados: arv,
Want_replicas: 2,
Using_proxy: false,
- Client: &http.Client{Transport: &http.Transport{}}}
+ Client: &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
err = (&kc).DiscoverKeepServers()
@@ -131,6 +132,10 @@ func (this KeepClient) AuthorizedGet(hash string,
timestamp string) (reader io.ReadCloser,
contentLength int64, url string, err error) {
+ // Take the hash of locator and timestamp in order to identify this
+ // specific transaction in log statements.
+ tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+
// Calculate the ordering for asking servers
sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
@@ -150,12 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string,
req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
+ log.Printf("[%v] Begin download %s", tag, url)
+
var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
+ if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
+ respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ response := strings.TrimSpace(string(respbody))
+ log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
+ tag, url, resp.StatusCode, err, response)
continue
}
if resp.StatusCode == http.StatusOK {
+ log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
}
}
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 26338ca..dc521da 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -87,12 +87,11 @@ type uploadStatus struct {
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
upload_status chan<- uploadStatus, expectedLength int64, tag string) {
- log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
-
var req *http.Request
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
+ log.Printf("[%v] Error creating request PUT %v error: %v", tag, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
@@ -113,8 +112,8 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
+ log.Printf("[%v] Upload failed %v error: %v", tag, url, err.Error())
upload_status <- uploadStatus{err, url, 0, 0, ""}
- body.Close()
return
}
@@ -127,17 +126,16 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
defer io.Copy(ioutil.Discard, resp.Body)
respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
+ response := strings.TrimSpace(string(respbody))
if err2 != nil && err2 != io.EOF {
- upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
- return
- }
-
- locator := strings.TrimSpace(string(respbody))
-
- if resp.StatusCode == http.StatusOK {
- upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
+ log.Printf("[%v] Upload %v error: %v response: %v", tag, url, err2.Error(), response)
+ upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
+ } else if resp.StatusCode == http.StatusOK {
+ log.Printf("[%v] Upload %v success", tag, url)
+ upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
} else {
- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
+ log.Printf("[%v] Upload %v error: %v response: %v", tag, url, resp.StatusCode, response)
+ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
}
}
@@ -170,6 +168,7 @@ func (this KeepClient) putReplicas(
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
+ log.Printf("[%v] Begin upload %s to %s", tag, hash, sv[next_server])
go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
next_server += 1
active += 1
@@ -181,22 +180,17 @@ func (this KeepClient) putReplicas(
}
}
}
+ log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+ tag, remaining_replicas, active)
// Now wait for something to happen.
status := <-upload_status
- log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
- tag, status.url, status.statusCode, remaining_replicas, active)
+ active -= 1
if status.statusCode == 200 {
// good news!
remaining_replicas -= status.replicas_stored
locator = status.response
- } else {
- // writing to keep server failed for some reason
- log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
- tag, status.url, status.statusCode, status.err, status.response)
}
- active -= 1
-
}
return locator, this.Want_replicas, nil
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 888db73..38376c9 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -1,10 +1,10 @@
package main
import (
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/gorilla/mux"
"io"
"io/ioutil"
@@ -30,6 +30,7 @@ func main() {
no_get bool
no_put bool
default_replicas int
+ timeout int
pidfile string
)
@@ -61,6 +62,12 @@ func main() {
2,
"Default number of replicas to write if not specified by the client.")
+ flagset.IntVar(
+ &timeout,
+ "timeout",
+ 20,
+ "Timeout on requests to internal Keep services")
+
flagset.StringVar(
&pidfile,
"pid",
@@ -90,6 +97,7 @@ func main() {
}
kc.Want_replicas = default_replicas
+ kc.Client.Timeout = 20 * time.Second
listener, err = net.Listen("tcp", listen)
if err != nil {
@@ -283,7 +291,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
locator := keepclient.MakeLocator2(hash, hints)
- log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
+ log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
var pass bool
var tok string
@@ -308,32 +316,43 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
}
- if blocklen > 0 {
+ if blocklen > -1 {
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
+ } else {
+ log.Printf("%s: %s %s Keep server did not return Content-Length",
+ GetRemoteAddress(req), req.Method, hash)
}
+ var status = 0
switch err {
case nil:
+ status = http.StatusOK
if reader != nil {
n, err2 := io.Copy(resp, reader)
- if n != blocklen {
- log.Printf("%s: %s %s mismatched return %v with Content-Length %v error %v", GetRemoteAddress(req), req.Method, hash, n, blocklen, err2)
+ if blocklen > -1 && n != blocklen {
+ log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
+ GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
} else if err2 == nil {
- log.Printf("%s: %s %s success returned %v bytes", GetRemoteAddress(req), req.Method, hash, n)
+ log.Printf("%s: %s %s %v %v",
+ GetRemoteAddress(req), req.Method, hash, status, n)
} else {
- log.Printf("%s: %s %s returned %v bytes error %v", GetRemoteAddress(req), req.Method, hash, n, err.Error())
+ log.Printf("%s: %s %s %v %v copy error: %v",
+ GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
}
} else {
- log.Printf("%s: %s %s success", GetRemoteAddress(req), req.Method, hash)
+ log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
}
case keepclient.BlockNotFound:
+ status = http.StatusNotFound
http.Error(resp, "Not found", http.StatusNotFound)
default:
+ status = http.StatusBadGateway
http.Error(resp, err.Error(), http.StatusBadGateway)
}
if err != nil {
- log.Printf("%s: %s %s error %s", GetRemoteAddress(req), req.Method, hash, err.Error())
+ log.Printf("%s: %s %s %v error: %v",
+ GetRemoteAddress(req), req.Method, hash, status, err.Error())
}
}
commit 3c92fedddb8ee6f804940a52955fce72311bac92
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Dec 23 09:55:05 2014 -0500
4869: Improve logging
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index e122144..26338ca 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -3,15 +3,16 @@ package keepclient
import (
"crypto/md5"
- "git.curoverse.com/arvados.git/sdk/go/streamer"
"errors"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
+ "time"
)
type keepDisk struct {
@@ -22,13 +23,13 @@ type keepDisk struct {
SvcType string `json:"service_type"`
}
-func Md5String(s string) (string) {
+func Md5String(s string) string {
return fmt.Sprintf("%x", md5.Sum([]byte(s)))
}
func (this *KeepClient) DiscoverKeepServers() error {
if prx := os.Getenv("ARVADOS_KEEP_PROXY"); prx != "" {
- sr := map[string]string{"proxy":prx}
+ sr := map[string]string{"proxy": prx}
this.SetServiceRoots(sr)
this.Using_proxy = true
return nil
@@ -84,9 +85,9 @@ type uploadStatus struct {
}
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
- upload_status chan<- uploadStatus, expectedLength int64) {
+ upload_status chan<- uploadStatus, expectedLength int64, tag string) {
- log.Printf("Uploading %s to %s", hash, host)
+ log.Printf("[%v] Begin upload %s to %s", tag, hash, host)
var req *http.Request
var err error
@@ -136,7 +137,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
if resp.StatusCode == http.StatusOK {
upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
} else {
- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
+ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, string(respbody)}
}
}
@@ -145,6 +146,10 @@ func (this KeepClient) putReplicas(
tr *streamer.AsyncStream,
expectedLength int64) (locator string, replicas int, err error) {
+ // Take the hash of locator and timestamp in order to identify this
+ // specific transaction in log statements.
+ tag := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+
// Calculate the ordering for uploading to servers
sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
@@ -159,14 +164,13 @@ func (this KeepClient) putReplicas(
defer close(upload_status)
// Desired number of replicas
-
remaining_replicas := this.Want_replicas
for remaining_replicas > 0 {
for active < remaining_replicas {
// Start some upload requests
if next_server < len(sv) {
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength)
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
next_server += 1
active += 1
} else {
@@ -180,17 +184,19 @@ func (this KeepClient) putReplicas(
// Now wait for something to happen.
status := <-upload_status
+ log.Printf("[%v] Upload to %v status code: %v remaining replicas: %v active: %v",
+ tag, status.url, status.statusCode, remaining_replicas, active)
if status.statusCode == 200 {
// good news!
remaining_replicas -= status.replicas_stored
locator = status.response
} else {
// writing to keep server failed for some reason
- log.Printf("Keep server put to %v failed with '%v'",
- status.url, status.err)
+ log.Printf("[%v] Upload to %v failed with error '%v', response '%v'",
+ tag, status.url, status.statusCode, status.err, status.response)
}
active -= 1
- log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
+
}
return locator, this.Want_replicas, nil
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list