[ARVADOS] updated: f69395a08509cc8c664c3256019d4d3cdb67db86

git at public.curoverse.com git at public.curoverse.com
Mon Dec 29 12:31:28 EST 2014


Summary of changes:
 sdk/go/keepclient/keepclient.go      | 12 ++++++------
 sdk/go/keepclient/keepclient_test.go | 16 ++++++++--------
 sdk/go/keepclient/support.go         | 33 ++++++++++++++++++++++-----------
 services/keepproxy/keepproxy.go      | 13 +++++++------
 services/keepproxy/keepproxy_test.go | 29 +++++++++++++++++++++++++----
 services/keepstore/handler_test.go   | 12 ------------
 services/keepstore/handlers.go       |  4 ----
 services/keepstore/logging_router.go |  4 ++--
 8 files changed, 70 insertions(+), 53 deletions(-)

       via  f69395a08509cc8c664c3256019d4d3cdb67db86 (commit)
      from  960e574bc6c559e67c12fc6ac844dd7aa6880051 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f69395a08509cc8c664c3256019d4d3cdb67db86
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Dec 29 12:32:38 2014 -0500

    4869: Correctly handle zero-length blocks in Keep client/Keep proxy.  Remove
    X-Block-Size.  Choose default request timeout based on if client is talking to
    a proxy or not.  Use double quotes in logging.  Rename "tag" to "requestId".

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 4733bb7..df6fee1 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -48,7 +48,7 @@ func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error)
 		Arvados:       arv,
 		Want_replicas: 2,
 		Using_proxy:   false,
-		Client:        &http.Client{Transport: &http.Transport{}, Timeout: 10 * time.Minute}}
+		Client:        &http.Client{Transport: &http.Transport{}}}
 
 	err = (&kc).DiscoverKeepServers()
 
@@ -134,7 +134,7 @@ func (this KeepClient) AuthorizedGet(hash string,
 
 	// Take the hash of locator and timestamp in order to identify this
 	// specific transaction in log statements.
-	tag := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
+	requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
 
 	// Calculate the ordering for asking servers
 	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
@@ -155,19 +155,19 @@ func (this KeepClient) AuthorizedGet(hash string,
 
 		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 
-		log.Printf("[%v] Begin download %s", tag, url)
+		log.Printf("[%v] Begin download %s", requestId, url)
 
 		var resp *http.Response
 		if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
 			respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
 			response := strings.TrimSpace(string(respbody))
-			log.Printf("[%v] Download %v status code: %v error: '%v' response: '%v'",
-				tag, url, resp.StatusCode, err, response)
+			log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
+				requestId, url, resp.StatusCode, err, response)
 			continue
 		}
 
 		if resp.StatusCode == http.StatusOK {
-			log.Printf("[%v] Download %v status code: %v", tag, url, resp.StatusCode)
+			log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
 			return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
 		}
 	}
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index 5f9915d..4acaf55 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -1,11 +1,11 @@
 package keepclient
 
 import (
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"crypto/md5"
 	"flag"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	. "gopkg.in/check.v1"
 	"io"
 	"io/ioutil"
@@ -154,7 +154,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
-			go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")))
+			go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
 
 			writer.Write([]byte("foo"))
 			writer.Close()
@@ -186,7 +186,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 
 			br1 := tr.MakeStreamReader()
 
-			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3)
+			go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, "TestUploadToStubKeepServerBufferReader")
 
 			writer.Write([]byte("foo"))
 			writer.Close()
@@ -221,7 +221,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 		func(kc KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
-			go kc.uploadToKeepServer(url, hash, reader, upload_status, 3)
+			go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
 
 			writer.Write([]byte("foo"))
 			writer.Close()
@@ -477,7 +477,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x":url})
+	kc.SetServiceRoots(map[string]string{"x": url})
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -503,7 +503,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x":url})
+	kc.SetServiceRoots(map[string]string{"x": url})
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -533,7 +533,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x":url})
+	kc.SetServiceRoots(map[string]string{"x": url})
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index dc521da..1f2a976 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -32,9 +32,16 @@ func (this *KeepClient) DiscoverKeepServers() error {
 		sr := map[string]string{"proxy": prx}
 		this.SetServiceRoots(sr)
 		this.Using_proxy = true
+		if this.Client.Timeout == 0 {
+			this.Client.Timeout = 10 * time.Minute
+		}
 		return nil
 	}
 
+	if this.Client.Timeout == 0 {
+		this.Client.Timeout = 15 * time.Second
+	}
+
 	type svcList struct {
 		Items []keepDisk `json:"items"`
 	}
@@ -85,21 +92,24 @@ type uploadStatus struct {
 }
 
 func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
-	upload_status chan<- uploadStatus, expectedLength int64, tag string) {
+	upload_status chan<- uploadStatus, expectedLength int64, requestId string) {
 
 	var req *http.Request
 	var err error
 	var url = fmt.Sprintf("%s/%s", host, hash)
 	if req, err = http.NewRequest("PUT", url, nil); err != nil {
-		log.Printf("[%v] Error creating request PUT %v error: %v", tag, url, err.Error())
+		log.Printf("[%v] Error creating request PUT %v error: %v", requestId, url, err.Error())
 		upload_status <- uploadStatus{err, url, 0, 0, ""}
 		body.Close()
 		return
 	}
 
-	if expectedLength > 0 {
+	if expectedLength > -1 {
 		req.ContentLength = expectedLength
 	}
+	if expectedLength == 0 {
+		defer body.Close()
+	}
 
 	req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
 	req.Header.Add("Content-Type", "application/octet-stream")
@@ -112,7 +122,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 
 	var resp *http.Response
 	if resp, err = this.Client.Do(req); err != nil {
-		log.Printf("[%v] Upload failed %v error: %v", tag, url, err.Error())
+		log.Printf("[%v] Upload failed %v error: %v", requestId, url, err.Error())
 		upload_status <- uploadStatus{err, url, 0, 0, ""}
 		return
 	}
@@ -128,13 +138,13 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
 	response := strings.TrimSpace(string(respbody))
 	if err2 != nil && err2 != io.EOF {
-		log.Printf("[%v] Upload %v error: %v response: %v", tag, url, err2.Error(), response)
+		log.Printf("[%v] Upload %v error: %v response: %v", requestId, url, err2.Error(), response)
 		upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, response}
 	} else if resp.StatusCode == http.StatusOK {
-		log.Printf("[%v] Upload %v success", tag, url)
+		log.Printf("[%v] Upload %v success", requestId, url)
 		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
 	} else {
-		log.Printf("[%v] Upload %v error: %v response: %v", tag, url, resp.StatusCode, response)
+		log.Printf("[%v] Upload %v error: %v response: %v", requestId, url, resp.StatusCode, response)
 		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
 	}
 }
@@ -146,7 +156,7 @@ func (this KeepClient) putReplicas(
 
 	// Take the hash of locator and timestamp in order to identify this
 	// specific transaction in log statements.
-	tag := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+	requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
 
 	// Calculate the ordering for uploading to servers
 	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
@@ -168,8 +178,8 @@ func (this KeepClient) putReplicas(
 		for active < remaining_replicas {
 			// Start some upload requests
 			if next_server < len(sv) {
-				log.Printf("[%v] Begin upload %s to %s", tag, hash, sv[next_server])
-				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, tag)
+				log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+				go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
 				next_server += 1
 				active += 1
 			} else {
@@ -181,11 +191,12 @@ func (this KeepClient) putReplicas(
 			}
 		}
 		log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-			tag, remaining_replicas, active)
+			requestId, remaining_replicas, active)
 
 		// Now wait for something to happen.
 		status := <-upload_status
 		active -= 1
+
 		if status.statusCode == 200 {
 			// good news!
 			remaining_replicas -= status.replicas_stored
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 38376c9..ea14c6c 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -30,7 +30,7 @@ func main() {
 		no_get           bool
 		no_put           bool
 		default_replicas int
-		timeout          int
+		timeout          int64
 		pidfile          string
 	)
 
@@ -62,11 +62,11 @@ func main() {
 		2,
 		"Default number of replicas to write if not specified by the client.")
 
-	flagset.IntVar(
+	flagset.Int64Var(
 		&timeout,
 		"timeout",
-		20,
-		"Timeout on requests to internal Keep services")
+		15,
+		"Timeout on requests to internal Keep services (default 15 seconds)")
 
 	flagset.StringVar(
 		&pidfile,
@@ -97,7 +97,8 @@ func main() {
 	}
 
 	kc.Want_replicas = default_replicas
-	kc.Client.Timeout = 20 * time.Second
+
+	kc.Client.Timeout = time.Duration(timeout) * time.Second
 
 	listener, err = net.Listen("tcp", listen)
 	if err != nil {
@@ -377,7 +378,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
 
-	if contentLength < 1 {
+	if contentLength < 0 {
 		http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
 		return
 	}
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 11dd206..8acf43a 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -1,11 +1,11 @@
 package main
 
 import (
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"crypto/md5"
 	"crypto/tls"
 	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	. "gopkg.in/check.v1"
 	"io"
 	"io/ioutil"
@@ -39,7 +39,9 @@ func pythonDir() string {
 // avoids a race condition where we hit a "connection refused" error
 // because we start testing the proxy too soon.
 func waitForListener() {
-	const (ms = 5)
+	const (
+		ms = 5
+	)
 	for i := 0; listener == nil && i < 1000; i += ms {
 		time.Sleep(ms * time.Millisecond)
 	}
@@ -150,7 +152,7 @@ func runProxy(c *C, args []string, token string, port int) keepclient.KeepClient
 	c.Assert(err, Equals, nil)
 	c.Check(kc.Using_proxy, Equals, true)
 	c.Check(len(kc.ServiceRoots()), Equals, 1)
-	for _, root := range(kc.ServiceRoots()) {
+	for _, root := range kc.ServiceRoots() {
 		c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
 	}
 	os.Setenv("ARVADOS_KEEP_PROXY", "")
@@ -221,6 +223,25 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 		log.Print("Get")
 	}
 
+	{
+		var rep int
+		var err error
+		hash2, rep, err = kc.PutB([]byte(""))
+		c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
+		c.Check(rep, Equals, 2)
+		c.Check(err, Equals, nil)
+		log.Print("PutB zero block")
+	}
+
+	{
+		reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
+		c.Assert(err, Equals, nil)
+		all, err := ioutil.ReadAll(reader)
+		c.Check(all, DeepEquals, []byte(""))
+		c.Check(blocklen, Equals, int64(0))
+		log.Print("Get zero block")
+	}
+
 	log.Print("TestPutAndGet done")
 }
 
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 3f0b560..05b410c 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -88,12 +88,6 @@ func TestGetHandler(t *testing.T) {
 		t.Errorf("expected Content-Length %s, got %s", expected_cl, received_cl)
 	}
 
-	received_xbs := response.Header().Get("X-Block-Size")
-	expected_xbs := fmt.Sprintf("%d", len(TEST_BLOCK))
-	if received_xbs != expected_xbs {
-		t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs)
-	}
-
 	// ----------------
 	// Permissions: on.
 	enforce_permissions = true
@@ -110,12 +104,6 @@ func TestGetHandler(t *testing.T) {
 	ExpectBody(t,
 		"Authenticated request, signed locator", string(TEST_BLOCK), response)
 
-	received_xbs = response.Header().Get("X-Block-Size")
-	expected_xbs = fmt.Sprintf("%d", len(TEST_BLOCK))
-	if received_xbs != expected_xbs {
-		t.Errorf("expected X-Block-Size %s, got %s", expected_xbs, received_xbs)
-	}
-
 	received_cl = response.Header().Get("Content-Length")
 	expected_cl = fmt.Sprintf("%d", len(TEST_BLOCK))
 	if received_cl != expected_cl {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 9d49d80..c7559a1 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -177,10 +177,6 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
 	resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
 
-	// If/when we support HTTP Range header (#3734), then Content-Length
-	// could be smaller than Block size
-	resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
-
 	_, err = resp.Write(block)
 
 	return
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
index 0598d4c..d9dfc35 100644
--- a/services/keepstore/logging_router.go
+++ b/services/keepstore/logging_router.go
@@ -43,8 +43,8 @@ func (loggingRouter *LoggingRESTRouter) ServeHTTP(resp http.ResponseWriter, req
 	loggingWriter := LoggingResponseWriter{200, 0, resp, ""}
 	loggingRouter.router.ServeHTTP(&loggingWriter, req)
 	if loggingWriter.Status >= 400 {
-		log.Printf("[%s] %s %s %d %d '%s'", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length, strings.TrimSpace(loggingWriter.Response))
+		log.Printf("[%s] %s %s %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length, strings.TrimSpace(loggingWriter.Response))
 	} else {
-		log.Printf("[%s] %s %s %d %d", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
+		log.Printf("[%s] %s %s %d %d \"OK\"", req.RemoteAddr, req.Method, req.URL.Path[1:], loggingWriter.Status, loggingWriter.Length)
 	}
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list