[ARVADOS] updated: f7c0c2137ea1035fe46d77e604bb936ba407ddb7

Git user git at public.curoverse.com
Thu Apr 27 14:29:36 EDT 2017


Summary of changes:
 sdk/go/arvadosclient/arvadosclient_test.go         |   7 +-
 sdk/python/tests/run_test_server.py                |   5 +-
 sdk/ruby/lib/arvados.rb                            |  23 +-
 sdk/ruby/lib/arvados/keep.rb                       |   6 +-
 services/api/.gitignore                            |   3 +-
 services/api/Gemfile                               |  51 +-
 services/api/Gemfile.lock                          | 276 ++++----
 services/api/Rakefile                              |   6 -
 .../api/app/controllers/application_controller.rb  |  14 +-
 .../app/controllers/arvados/v1/nodes_controller.rb |   6 +-
 .../controllers/arvados/v1/schema_controller.rb    |   7 +-
 .../app/controllers/arvados/v1/users_controller.rb |   2 +-
 .../api/app/controllers/database_controller.rb     |   4 +-
 .../app/controllers/user_sessions_controller.rb    |   3 +-
 .../api/app/models/api_client_authorization.rb     |   8 +-
 services/api/app/models/arvados_model.rb           |  99 ++-
 services/api/app/models/collection.rb              |   2 +-
 services/api/app/models/link.rb                    |   5 -
 services/api/app/models/log.rb                     |   2 +-
 services/api/app/models/user.rb                    |   8 +-
 services/api/app/models/virtual_machine.rb         |   6 +-
 services/api/config/application.default.yml        |  36 +-
 services/api/config/application.rb                 |  29 +-
 .../api/config/environments/development.rb.example |   3 -
 .../api/config/environments/production.rb.example  |   2 +-
 services/api/config/environments/test.rb.example   |   5 +-
 services/api/config/initializers/eventbus.rb       |  38 +-
 services/api/config/initializers/load_config.rb    |   1 +
 .../api/config/initializers/noop_deep_munge.rb     |   1 +
 .../config/initializers/permit_all_parameters.rb   |   1 +
 services/api/config/initializers/time_format.rb    |   2 +
 services/api/config/routes.rb                      |  28 +-
 ...0170319063406_serialized_columns_accept_null.rb |   5 +
 services/api/db/structure.sql                      |   9 +-
 services/api/lib/can_be_an_owner.rb                |   2 +-
 services/api/lib/create_superuser_token.rb         |   4 +-
 services/api/lib/eventbus.rb                       | 358 ----------
 services/api/lib/has_uuid.rb                       |  14 +-
 services/api/lib/load_param.rb                     |   2 +-
 services/api/lib/serializers.rb                    |   6 +
 .../api/lib/tasks/delete_old_container_logs.rake   |   2 +-
 services/api/lib/tasks/delete_old_job_logs.rake    |   2 +-
 services/api/lib/whitelist_update.rb               |  15 +-
 services/api/log/.gitkeep                          |   0
 .../arvados/v1/collections_controller_test.rb      |   6 +-
 .../functional/arvados/v1/users_controller_test.rb |   1 +
 .../arvados/v1/virtual_machines_controller_test.rb |   4 +-
 .../test/functional/database_controller_test.rb    |   2 +-
 .../api_client_authorizations_scopes_test.rb       |   6 +-
 .../api/test/integration/crunch_dispatch_test.rb   |   2 +-
 .../api/test/integration/database_reset_test.rb    |   2 -
 services/api/test/integration/errors_test.rb       |   2 +-
 services/api/test/integration/pipeline_test.rb     |   2 +-
 .../api/test/integration/reader_tokens_test.rb     |   2 +-
 services/api/test/integration/websocket_test.rb    | 742 ---------------------
 services/api/test/test_helper.rb                   |  26 +-
 services/api/test/unit/arvados_model_test.rb       |  76 ++-
 .../api/test/unit/create_superuser_token_test.rb   |   2 +-
 services/api/test/unit/job_test.rb                 |  23 +-
 services/api/test/unit/user_test.rb                |  12 +-
 services/api/test/unit/workflow_test.rb            |   8 +-
 services/keepproxy/keepproxy.go                    |   2 +
 services/keepproxy/keepproxy_test.go               |  29 +-
 63 files changed, 564 insertions(+), 1493 deletions(-)
 create mode 100644 services/api/config/initializers/permit_all_parameters.rb
 create mode 100644 services/api/db/migrate/20170319063406_serialized_columns_accept_null.rb
 delete mode 100644 services/api/lib/eventbus.rb
 delete mode 100644 services/api/log/.gitkeep
 delete mode 100644 services/api/test/integration/websocket_test.rb

  discards  fb2fd7c8f1f46ee80f658cdc832081bbbd72d1ab (commit)
       via  f7c0c2137ea1035fe46d77e604bb936ba407ddb7 (commit)
       via  e6201e511f5fa8a7e7a3b8049bb6298692f4d800 (commit)
       via  318c49002aea966128a9d37ab29e601a104d79bb (commit)
       via  137ebf94ff14837c9df773533ea86e821469bda9 (commit)
       via  3ef580c47029ff0fbf959b044f29c183f41cb609 (commit)
       via  de083a9fec0ca08afda5a9369c6cd32dbdcd0965 (commit)
       via  f5d09a4904e609b5df20edd0194a9f1ade40c28a (commit)
       via  0d5be4fa36006459e1579f087b695904e4f32ee3 (commit)
       via  77d9c05d89dabc9e9e9a15f46cd12c8ad61ed64e (commit)
       via  e1e05845b74ce70712e414830f992ce57d7a8453 (commit)
       via  78ff2a600b29f05f522f8e8818967dac88394fd6 (commit)
       via  7fd4aa96f997a133d31b3df88a8d2f4820c5b881 (commit)
       via  fb7bb4c8f17a49abab40e42b7a0101cac7478d60 (commit)
       via  8d9b12f2a87ffd7183d3a36ca32ee1c7e701a0e2 (commit)
       via  d42ec212e992a83f8e7fc48b59fb3daf58a62787 (commit)
       via  c066a2e6d064a270638baea8f8b0d106f5903e0f (commit)
       via  c36272a5f83ba70980160c4cb205bdfb8a1c660b (commit)
       via  045bace65c2395b6efe9f3d8c93bec74196f58e1 (commit)
       via  7c6852e1b3675b3c1de7e9792373333cb752d40d (commit)
       via  dfd8c4bbd6f126b90d436e9d242cd30e15e70d2e (commit)
       via  de4df7f80c531ab16e59ea36671a8efa9e6ff33d (commit)
       via  126dd750f48654cb3b1a4e53c5b7d337003e112f (commit)
       via  30dbddd3b311653652fa731dbff950aba0712301 (commit)
       via  ab314b9ea3618c71556a6a5f6dd7c769beaf2737 (commit)
       via  390af6a13f7c8974329aecc2f23fbfa81f8e298b (commit)
       via  9090c60b28de593b8bb2ce606a9ab35b62b57608 (commit)
       via  84ad215752fde4291070143411a945fa7a94241c (commit)
       via  c9a361f7fd3b1cf7f4959e9b0292d0f495d82771 (commit)
       via  05d453ec38b10a022ea6db77867957e7115b9b35 (commit)
       via  09dcf71e59907c2eaf4b94918c63da07193481a4 (commit)
       via  099a8c62fcb0905855ddf243a3deddc7398c3c10 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (fb2fd7c8f1f46ee80f658cdc832081bbbd72d1ab)
            \
             N -- N -- N (f7c0c2137ea1035fe46d77e604bb936ba407ddb7)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f7c0c2137ea1035fe46d77e604bb936ba407ddb7
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Apr 27 14:29:10 2017 -0400

    11537: Add Via header to get/head/post/put responses.

diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 7a673ae..f8ec95f 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -320,6 +320,7 @@ func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 	SetCorsHeaders(resp)
+	resp.Header().Set("Via", req.Proto+" "+viaAlias)
 
 	locator := mux.Vars(req)["locator"]
 	var err error
@@ -404,6 +405,7 @@ func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 	SetCorsHeaders(resp)
+	resp.Header().Set("Via", req.Proto+" "+viaAlias)
 
 	kc := *h.KeepClient
 	kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index e4ba22a..4e85626 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -113,6 +113,31 @@ func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient
 	return kc
 }
 
+func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
+	runProxy(c, nil, false)
+	defer closeListener()
+
+	req, err := http.NewRequest("POST",
+		"http://"+listener.Addr().String()+"/",
+		strings.NewReader("TestViaHeader"))
+	req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+	resp, err := (&http.Client{}).Do(req)
+	c.Assert(err, Equals, nil)
+	c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+	locator, err := ioutil.ReadAll(resp.Body)
+	c.Assert(err, Equals, nil)
+	resp.Body.Close()
+
+	req, err = http.NewRequest("GET",
+		"http://"+listener.Addr().String()+"/"+string(locator),
+		nil)
+	c.Assert(err, Equals, nil)
+	resp, err = (&http.Client{}).Do(req)
+	c.Assert(err, Equals, nil)
+	c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+	resp.Body.Close()
+}
+
 func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
 	kc := runProxy(c, nil, false)
 	defer closeListener()
@@ -178,7 +203,7 @@ func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
 			bytes.NewReader(content))
 		c.Assert(err, IsNil)
 		req.Header.Set("Content-Length", t.sendLength)
-		req.Header.Set("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+		req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
 		req.Header.Set("Content-Type", "application/octet-stream")
 
 		resp := httptest.NewRecorder()
@@ -392,7 +417,7 @@ func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
 		req, err := http.NewRequest("POST",
 			"http://"+listener.Addr().String()+"/",
 			strings.NewReader("qux"))
-		req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+		req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
 		req.Header.Add("Content-Type", "application/octet-stream")
 		resp, err := client.Do(req)
 		c.Check(err, Equals, nil)

commit e6201e511f5fa8a7e7a3b8049bb6298692f4d800
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu Apr 27 10:51:49 2017 -0400

    11537: Add Via header to proxied keepstore requests.
    
    Propagate keepclient PUT error messages to caller.

diff --git a/sdk/go/keepclient/discover.go b/sdk/go/keepclient/discover.go
index 2892031..f3e3960 100644
--- a/sdk/go/keepclient/discover.go
+++ b/sdk/go/keepclient/discover.go
@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"log"
+	"net/http"
 	"os"
 	"os/signal"
 	"reflect"
@@ -22,7 +23,9 @@ func (this *KeepClient) DiscoverKeepServers() error {
 	if this.Arvados.KeepServiceURIs != nil {
 		this.foundNonDiskSvc = true
 		this.replicasPerService = 0
-		this.setClientSettingsNonDisk()
+		if c, ok := this.Client.(*http.Client); ok {
+			this.setClientSettingsNonDisk(c)
+		}
 		roots := make(map[string]string)
 		for i, uri := range this.Arvados.KeepServiceURIs {
 			roots[fmt.Sprintf("00000-bi6l4-%015d", i)] = uri
@@ -134,10 +137,12 @@ func (this *KeepClient) loadKeepServers(list svcList) error {
 		gatewayRoots[service.Uuid] = url
 	}
 
-	if this.foundNonDiskSvc {
-		this.setClientSettingsNonDisk()
-	} else {
-		this.setClientSettingsDisk()
+	if client, ok := this.Client.(*http.Client); ok {
+		if this.foundNonDiskSvc {
+			this.setClientSettingsNonDisk(client)
+		} else {
+			this.setClientSettingsDisk(client)
+		}
 	}
 
 	this.SetServiceRoots(localRoots, writableLocalRoots, gatewayRoots)
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 4f84afc..b56cc7f 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -6,8 +6,6 @@ import (
 	"crypto/md5"
 	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"io"
 	"io/ioutil"
 	"net/http"
@@ -15,6 +13,9 @@ import (
 	"strconv"
 	"strings"
 	"sync"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // A Keep "block" is 64MB.
@@ -47,8 +48,11 @@ type ErrNotFound struct {
 	multipleResponseError
 }
 
-var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
+type InsufficientReplicasError error
+
+type OversizeBlockError error
+
+var ErrOversizeBlock = OversizeBlockError(errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")"))
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 var InvalidLocatorError = errors.New("Invalid locator")
@@ -62,6 +66,10 @@ var ErrIncompleteIndex = errors.New("Got incomplete index")
 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
 
+type HTTPClient interface {
+	Do(*http.Request) (*http.Response, error)
+}
+
 // Information about Arvados and Keep servers.
 type KeepClient struct {
 	Arvados            *arvadosclient.ArvadosClient
@@ -70,7 +78,7 @@ type KeepClient struct {
 	writableLocalRoots *map[string]string
 	gatewayRoots       *map[string]string
 	lock               sync.RWMutex
-	Client             *http.Client
+	Client             HTTPClient
 	Retries            int
 	BlockCache         *BlockCache
 
@@ -115,14 +123,14 @@ func New(arv *arvadosclient.ArvadosClient) *KeepClient {
 // Returns the locator for the written block, the number of replicas
 // written, and an error.
 //
-// Returns an InsufficientReplicas error if 0 <= replicas <
+// Returns an InsufficientReplicasError if 0 <= replicas <
 // kc.Wants_replicas.
 func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
 	// Buffer for reads from 'r'
 	var bufsize int
 	if dataBytes > 0 {
 		if dataBytes > BLOCKSIZE {
-			return "", 0, OversizeBlockError
+			return "", 0, ErrOversizeBlock
 		}
 		bufsize = int(dataBytes)
 	} else {
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index f0da600..fcae413 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -2,11 +2,8 @@ package keepclient
 
 import (
 	"crypto/md5"
+	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
-	. "gopkg.in/check.v1"
 	"io"
 	"io/ioutil"
 	"log"
@@ -16,6 +13,11 @@ import (
 	"strings"
 	"testing"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
+	. "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -435,7 +437,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 
-	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
 	c.Check(replicas, Equals, 1)
 	c.Check(<-st.handled, Equals, ks1[0].url)
 }
@@ -921,7 +923,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
 
-	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
 	c.Check(replicas, Equals, 2)
 }
 
@@ -996,7 +998,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 
-	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
 	c.Check(replicas, Equals, 1)
 
 	c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
@@ -1031,7 +1033,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 
-	c.Check(err, Equals, InsufficientReplicasError)
+	c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
 	c.Check(replicas, Equals, 0)
 }
 
@@ -1263,5 +1265,5 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
 
 	c.Assert(kc.replicasPerService, Equals, 0)
 	c.Assert(kc.foundNonDiskSvc, Equals, true)
-	c.Assert(kc.Client.Timeout, Equals, 300*time.Second)
+	c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second)
 }
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 9adbb48..33ba872 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -4,7 +4,6 @@ import (
 	"crypto/md5"
 	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/streamer"
 	"io"
 	"io/ioutil"
 	"log"
@@ -15,6 +14,8 @@ import (
 	"regexp"
 	"strings"
 	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/streamer"
 )
 
 // Function used to emit debug messages. The easiest way to enable
@@ -45,49 +46,45 @@ func Md5String(s string) string {
 
 // Set timeouts applicable when connecting to non-disk services
 // (assumed to be over the Internet).
-func (this *KeepClient) setClientSettingsNonDisk() {
-	if this.Client.Timeout == 0 {
-		// Maximum time to wait for a complete response
-		this.Client.Timeout = 300 * time.Second
-
-		// TCP and TLS connection settings
-		this.Client.Transport = &http.Transport{
-			Dial: (&net.Dialer{
-				// The maximum time to wait to set up
-				// the initial TCP connection.
-				Timeout: 30 * time.Second,
-
-				// The TCP keep alive heartbeat
-				// interval.
-				KeepAlive: 120 * time.Second,
-			}).Dial,
-
-			TLSHandshakeTimeout: 10 * time.Second,
-		}
+func (*KeepClient) setClientSettingsNonDisk(client *http.Client) {
+	// Maximum time to wait for a complete response
+	client.Timeout = 300 * time.Second
+
+	// TCP and TLS connection settings
+	client.Transport = &http.Transport{
+		Dial: (&net.Dialer{
+			// The maximum time to wait to set up
+			// the initial TCP connection.
+			Timeout: 30 * time.Second,
+
+			// The TCP keep alive heartbeat
+			// interval.
+			KeepAlive: 120 * time.Second,
+		}).Dial,
+
+		TLSHandshakeTimeout: 10 * time.Second,
 	}
 }
 
 // Set timeouts applicable when connecting to keepstore services directly
 // (assumed to be on the local network).
-func (this *KeepClient) setClientSettingsDisk() {
-	if this.Client.Timeout == 0 {
-		// Maximum time to wait for a complete response
-		this.Client.Timeout = 20 * time.Second
-
-		// TCP and TLS connection timeouts
-		this.Client.Transport = &http.Transport{
-			Dial: (&net.Dialer{
-				// The maximum time to wait to set up
-				// the initial TCP connection.
-				Timeout: 2 * time.Second,
-
-				// The TCP keep alive heartbeat
-				// interval.
-				KeepAlive: 180 * time.Second,
-			}).Dial,
-
-			TLSHandshakeTimeout: 4 * time.Second,
-		}
+func (*KeepClient) setClientSettingsDisk(client *http.Client) {
+	// Maximum time to wait for a complete response
+	client.Timeout = 20 * time.Second
+
+	// TCP and TLS connection timeouts
+	client.Transport = &http.Transport{
+		Dial: (&net.Dialer{
+			// The maximum time to wait to set up
+			// the initial TCP connection.
+			Timeout: 2 * time.Second,
+
+			// The TCP keep alive heartbeat
+			// interval.
+			KeepAlive: 180 * time.Second,
+		}).Dial,
+
+		TLSHandshakeTimeout: 4 * time.Second,
 	}
 }
 
@@ -157,6 +154,9 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
 		DebugPrintf("DEBUG: [%08x] Upload %v success", requestID, url)
 		upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, response}
 	} else {
+		if resp.StatusCode >= 300 && response == "" {
+			response = resp.Status
+		}
 		DebugPrintf("DEBUG: [%08x] Upload %v error: %v response: %v", requestID, url, resp.StatusCode, response)
 		upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, response}
 	}
@@ -206,6 +206,8 @@ func (this *KeepClient) putReplicas(
 	retriesRemaining := 1 + this.Retries
 	var retryServers []string
 
+	lastError := make(map[string]string)
+
 	for retriesRemaining > 0 {
 		retriesRemaining -= 1
 		next_server = 0
@@ -220,7 +222,12 @@ func (this *KeepClient) putReplicas(
 					active += 1
 				} else {
 					if active == 0 && retriesRemaining == 0 {
-						return locator, replicasDone, InsufficientReplicasError
+						msg := "Could not write sufficient replicas: "
+						for _, resp := range lastError {
+							msg += resp + "; "
+						}
+						msg = msg[:len(msg)-2]
+						return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
 					} else {
 						break
 					}
@@ -239,7 +246,16 @@ func (this *KeepClient) putReplicas(
 					replicasDone += status.replicas_stored
 					replicasTodo -= status.replicas_stored
 					locator = status.response
-				} else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+					delete(lastError, status.url)
+				} else {
+					msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
+					if len(msg) > 100 {
+						msg = msg[:100]
+					}
+					lastError[status.url] = msg
+				}
+
+				if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
 					(status.statusCode >= 500 && status.statusCode != 503) {
 					// Timeout, too many requests, or other server side failure
 					// Do not retry when status code is 503, which means the keep server is full
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index 620ed9c..5e3e4af 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -335,9 +335,9 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 		statusCode, statusText = http.StatusInternalServerError, err.Error()
 		return
 	}
-	if kc.Client != nil && kc.Client.Transport != nil {
+	if client, ok := kc.Client.(*http.Client); ok && client.Transport != nil {
 		// Workaround for https://dev.arvados.org/issues/9005
-		if t, ok := kc.Client.Transport.(*http.Transport); ok {
+		if t, ok := client.Transport.(*http.Transport); ok {
 			defer t.CloseIdleConnections()
 		}
 	}
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 76a8a15..7a673ae 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -12,6 +12,7 @@ import (
 	"os"
 	"os/signal"
 	"regexp"
+	"strings"
 	"sync"
 	"syscall"
 	"time"
@@ -43,7 +44,10 @@ func DefaultConfig() *Config {
 	}
 }
 
-var listener net.Listener
+var (
+	listener net.Listener
+	router   http.Handler
+)
 
 func main() {
 	cfg := DefaultConfig()
@@ -129,7 +133,7 @@ func main() {
 	if cfg.DefaultReplicas > 0 {
 		kc.Want_replicas = cfg.DefaultReplicas
 	}
-	kc.Client.Timeout = time.Duration(cfg.Timeout)
+	kc.Client.(*http.Client).Timeout = time.Duration(cfg.Timeout)
 	go kc.RefreshServices(5*time.Minute, 3*time.Second)
 
 	listener, err = net.Listen("tcp", cfg.Listen)
@@ -153,7 +157,8 @@ func main() {
 	signal.Notify(term, syscall.SIGINT)
 
 	// Start serving requests.
-	http.Serve(listener, MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc))
+	router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc)
+	http.Serve(listener, router)
 
 	log.Println("shutting down")
 }
@@ -232,61 +237,57 @@ func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *ApiTokenCache, r
 	return true, tok
 }
 
-type GetBlockHandler struct {
+type proxyHandler struct {
+	http.Handler
 	*keepclient.KeepClient
 	*ApiTokenCache
 }
 
-type PutBlockHandler struct {
-	*keepclient.KeepClient
-	*ApiTokenCache
-}
-
-type IndexHandler struct {
-	*keepclient.KeepClient
-	*ApiTokenCache
-}
-
-type InvalidPathHandler struct{}
-
-type OptionsHandler struct{}
-
-// MakeRESTRouter
-//     Returns a mux.Router that passes GET and PUT requests to the
-//     appropriate handlers.
-//
-func MakeRESTRouter(
-	enable_get bool,
-	enable_put bool,
-	kc *keepclient.KeepClient) *mux.Router {
-
-	t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
-
+// MakeRESTRouter returns an http.Handler that passes GET and PUT
+// requests to the appropriate handlers.
+func MakeRESTRouter(enable_get bool, enable_put bool, kc *keepclient.KeepClient) http.Handler {
 	rest := mux.NewRouter()
+	h := &proxyHandler{
+		Handler:    rest,
+		KeepClient: kc,
+		ApiTokenCache: &ApiTokenCache{
+			tokens:     make(map[string]int64),
+			expireTime: 300,
+		},
+	}
 
 	if enable_get {
-		rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
-			GetBlockHandler{kc, t}).Methods("GET", "HEAD")
-		rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+		rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD")
+		rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Get).Methods("GET", "HEAD")
 
 		// List all blocks
-		rest.Handle(`/index`, IndexHandler{kc, t}).Methods("GET")
+		rest.HandleFunc(`/index`, h.Index).Methods("GET")
 
 		// List blocks whose hash has the given prefix
-		rest.Handle(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler{kc, t}).Methods("GET")
+		rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, h.Index).Methods("GET")
 	}
 
 	if enable_put {
-		rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
-		rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
-		rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
-		rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
-		rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
+		rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Put).Methods("PUT")
+		rest.HandleFunc(`/{locator:[0-9a-f]{32}}`, h.Put).Methods("PUT")
+		rest.HandleFunc(`/`, h.Put).Methods("POST")
+		rest.HandleFunc(`/{any}`, h.Options).Methods("OPTIONS")
+		rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
 	}
 
 	rest.NotFoundHandler = InvalidPathHandler{}
+	return h
+}
 
-	return rest
+var errLoopDetected = errors.New("loop detected")
+
+func (*proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error {
+	if via := req.Header.Get("Via"); strings.Index(via, " "+viaAlias) >= 0 {
+		log.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via)
+		http.Error(resp, errLoopDetected.Error(), http.StatusInternalServerError)
+		return errLoopDetected
+	}
+	return nil
 }
 
 func SetCorsHeaders(resp http.ResponseWriter) {
@@ -296,12 +297,14 @@ func SetCorsHeaders(resp http.ResponseWriter) {
 	resp.Header().Set("Access-Control-Max-Age", "86486400")
 }
 
-func (this InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+type InvalidPathHandler struct{}
+
+func (InvalidPathHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	log.Printf("%s: %s %s unroutable", GetRemoteAddress(req), req.Method, req.URL.Path)
 	http.Error(resp, "Bad request", http.StatusBadRequest)
 }
 
-func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Options(resp http.ResponseWriter, req *http.Request) {
 	log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, req.URL.Path)
 	SetCorsHeaders(resp)
 }
@@ -312,7 +315,10 @@ var MethodNotSupported = errors.New("Method not supported")
 
 var removeHint, _ = regexp.Compile("\\+K@[a-z0-9]{5}(\\+|$)")
 
-func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Get(resp http.ResponseWriter, req *http.Request) {
+	if err := h.checkLoop(resp, req); err != nil {
+		return
+	}
 	SetCorsHeaders(resp)
 
 	locator := mux.Vars(req)["locator"]
@@ -328,11 +334,12 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		}
 	}()
 
-	kc := *this.KeepClient
+	kc := *h.KeepClient
+	kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
 
 	var pass bool
 	var tok string
-	if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
+	if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
 		status, err = http.StatusForbidden, BadAuthorizationHeader
 		return
 	}
@@ -392,10 +399,15 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
 var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
 
-func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Put(resp http.ResponseWriter, req *http.Request) {
+	if err := h.checkLoop(resp, req); err != nil {
+		return
+	}
 	SetCorsHeaders(resp)
 
-	kc := *this.KeepClient
+	kc := *h.KeepClient
+	kc.Client = &proxyClient{client: kc.Client, proto: req.Proto}
+
 	var err error
 	var expectLength int64
 	var status = http.StatusInternalServerError
@@ -432,7 +444,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	var pass bool
 	var tok string
-	if pass, tok = CheckAuthorizationHeader(&kc, this.ApiTokenCache, req); !pass {
+	if pass, tok = CheckAuthorizationHeader(&kc, h.ApiTokenCache, req); !pass {
 		err = BadAuthorizationHeader
 		status = http.StatusForbidden
 		return
@@ -468,7 +480,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	// Tell the client how many successful PUTs we accomplished
 	resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
 
-	switch err {
+	switch err.(type) {
 	case nil:
 		status = http.StatusOK
 		_, err = io.WriteString(resp, locatorOut)
@@ -500,7 +512,7 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 //   Expects "complete" response (terminating with blank new line)
 //   Aborts on any errors
 // Concatenates responses from all those keep servers and returns
-func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h *proxyHandler) Index(resp http.ResponseWriter, req *http.Request) {
 	SetCorsHeaders(resp)
 
 	prefix := mux.Vars(req)["prefix"]
@@ -513,9 +525,9 @@ func (handler IndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		}
 	}()
 
-	kc := *handler.KeepClient
+	kc := *h.KeepClient
 
-	ok, token := CheckAuthorizationHeader(&kc, handler.ApiTokenCache, req)
+	ok, token := CheckAuthorizationHeader(&kc, h.ApiTokenCache, req)
 	if !ok {
 		status, err = http.StatusForbidden, BadAuthorizationHeader
 		return
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index 6a349da..e4ba22a 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -3,10 +3,8 @@ package main
 import (
 	"bytes"
 	"crypto/md5"
+	"errors"
 	"fmt"
-	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
-	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"io/ioutil"
 	"log"
 	"net/http"
@@ -16,6 +14,10 @@ import (
 	"testing"
 	"time"
 
+	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+
 	. "gopkg.in/check.v1"
 )
 
@@ -111,6 +113,24 @@ func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient
 	return kc
 }
 
+func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
+	kc := runProxy(c, nil, false)
+	defer closeListener()
+
+	sr := map[string]string{
+		TestProxyUUID: "http://" + listener.Addr().String(),
+	}
+	router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+	content := []byte("TestLoopDetection")
+	_, _, err := kc.PutB(content)
+	c.Check(err, ErrorMatches, `.*loop detected.*`)
+
+	hash := fmt.Sprintf("%x", md5.Sum(content))
+	_, _, _, err = kc.Get(hash)
+	c.Check(err, ErrorMatches, `.*loop detected.*`)
+}
+
 func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
 	kc := runProxy(c, nil, false)
 	defer closeListener()
@@ -260,7 +280,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 		hash2, rep, err := kc.PutB([]byte("bar"))
 		c.Check(hash2, Equals, "")
 		c.Check(rep, Equals, 0)
-		c.Check(err, Equals, keepclient.InsufficientReplicasError)
+		c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
 		log.Print("PutB")
 	}
 
@@ -331,7 +351,7 @@ func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
 	hash2, rep, err := kc.PutB([]byte("quux"))
 	c.Check(hash2, Equals, "")
 	c.Check(rep, Equals, 0)
-	c.Check(err, Equals, keepclient.InsufficientReplicasError)
+	c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
 }
 
 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go
index 6d791bf..706664c 100644
--- a/tools/keep-exercise/keep-exercise.go
+++ b/tools/keep-exercise/keep-exercise.go
@@ -21,6 +21,7 @@ import (
 	"io"
 	"io/ioutil"
 	"log"
+	"net/http"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -52,7 +53,7 @@ func main() {
 		log.Fatal(err)
 	}
 	kc.Want_replicas = *Replicas
-	kc.Client.Timeout = 10 * time.Minute
+	kc.Client.(*http.Client).Timeout = 10 * time.Minute
 
 	overrideServices(kc)
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list