[ARVADOS] created: 90c7763b1c959da79ac5701247b2fa4824090b28

git at public.curoverse.com git at public.curoverse.com
Sat Mar 28 03:21:33 EDT 2015


        at  90c7763b1c959da79ac5701247b2fa4824090b28 (commit)


commit 90c7763b1c959da79ac5701247b2fa4824090b28
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Mar 27 04:17:48 2015 -0400

    5414: Add client support for Keep service hints.

diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 5d79194..9cf84d9 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -14,11 +14,9 @@ import (
 	"net/http"
 	"os"
 	"regexp"
+	"strconv"
 	"strings"
 	"sync"
-	"sync/atomic"
-	"time"
-	"unsafe"
 )
 
 // A Keep "block" is 64MB.
@@ -26,7 +24,7 @@ const BLOCKSIZE = 64 * 1024 * 1024
 
 var BlockNotFound = errors.New("Block not found")
 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Block too big")
+var OversizeBlockError = errors.New("Exceeded maximum block size ("+strconv.Itoa(BLOCKSIZE)+")")
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
 
@@ -38,34 +36,33 @@ type KeepClient struct {
 	Arvados       *arvadosclient.ArvadosClient
 	Want_replicas int
 	Using_proxy   bool
-	service_roots *map[string]string
-	lock          sync.Mutex
+	localRoots    *map[string]string
+	gatewayRoots  *map[string]string
+	lock          sync.RWMutex
 	Client        *http.Client
 }
 
 // Create a new KeepClient.  This will contact the API server to discover Keep
 // servers.
-func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
+func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
 	var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
 	insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
-	kc = KeepClient{
+	kc := &KeepClient{
 		Arvados:       arv,
 		Want_replicas: 2,
 		Using_proxy:   false,
 		Client: &http.Client{Transport: &http.Transport{
 			TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
 	}
-	_, err = (&kc).DiscoverKeepServers()
-
-	return kc, err
+	return kc, kc.DiscoverKeepServers()
 }
 
 // Put a block given the block hash, a reader with the block data, and the
 // expected length of that data.  The desired number of replicas is given in
 // KeepClient.Want_replicas.  Returns the number of replicas that were written
 // and if there was an error.  Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
+// whenever 0 <= replicas < kc.Wants_replicas.
+func (kc *KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
 
 	// Buffer for reads from 'r'
 	var bufsize int
@@ -81,171 +78,170 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (lo
 	t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
 	defer t.Close()
 
-	return this.putReplicas(hash, t, expectedLength)
+	return kc.putReplicas(hash, t, expectedLength)
 }
 
 // Put a block given the block hash and a byte buffer.  The desired number of
 // replicas is given in KeepClient.Want_replicas.  Returns the number of
 // replicas that were written and if there was an error.  Note this will return
-// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
+// InsufficientReplias whenever 0 <= replicas < kc.Wants_replicas.
+func (kc *KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
 	t := streamer.AsyncStreamFromSlice(buf)
 	defer t.Close()
 
-	return this.putReplicas(hash, t, int64(len(buf)))
+	return kc.putReplicas(hash, t, int64(len(buf)))
 }
 
 // Put a block given a buffer.  The hash will be computed.  The desired number
 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
 // replicas that were written and if there was an error.  Note this will return
 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
+func (kc *KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
 	hash := fmt.Sprintf("%x", md5.Sum(buffer))
-	return this.PutHB(hash, buffer)
+	return kc.PutHB(hash, buffer)
 }
 
 // Put a block, given a Reader.  This will read the entire reader into a buffer
 // to compute the hash.  The desired number of replicas is given in
 // KeepClient.Want_replicas.  Returns the number of replicas that were written
 // and if there was an error.  Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
+// whenever 0 <= replicas < kc.Wants_replicas.  Also nhote that if the block
 // hash and data size are available, PutHR() is more efficient.
-func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
+func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
 	if buffer, err := ioutil.ReadAll(r); err != nil {
 		return "", 0, err
 	} else {
-		return this.PutB(buffer)
+		return kc.PutB(buffer)
 	}
 }
 
-// Get a block given a hash.  Return a reader, the expected data length, the
-// URL the block was fetched from, and if there was an error.  If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) Get(hash string) (reader io.ReadCloser,
-	contentLength int64, url string, err error) {
-	return this.AuthorizedGet(hash, "", "")
-}
-
-// Get a block given a hash, with additional authorization provided by
-// signature and timestamp.  Return a reader, the expected data length, the URL
-// the block was fetched from, and if there was an error.  If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) AuthorizedGet(hash string,
-	signature string,
-	timestamp string) (reader io.ReadCloser,
-	contentLength int64, url string, err error) {
-
-	// Take the hash of locator and timestamp in order to identify this
-	// specific transaction in log statements.
-	requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
-
-	// Calculate the ordering for asking servers
-	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
-	for _, host := range sv {
-		var req *http.Request
-		var err error
-		var url string
-		if signature != "" {
-			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
-				signature, timestamp)
-		} else {
-			url = fmt.Sprintf("%s/%s", host, hash)
-		}
-		if req, err = http.NewRequest("GET", url, nil); err != nil {
+// Get a block given a locator.  Return a reader, the expected data length, the
+// URL the block is being fetched from, and an error.
+//
+// If the block checksum does not match, the final Read() on the
+// reader returned by this method will return a BadChecksum error
+// instead of EOF.
+func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
+
+	var errs []string
+	for _, host := range kc.getSortedRoots(locator) {
+		url := host+"/"+locator
+		req, err := http.NewRequest("GET", url, nil)
+		if err != nil {
 			continue
 		}
-
-		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
-		log.Printf("[%v] Begin download %s", requestId, url)
-
-		var resp *http.Response
-		if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
-			statusCode := -1
-			var respbody []byte
+		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+		resp, err := kc.Client.Do(req)
+		if err != nil || resp.StatusCode != http.StatusOK {
 			if resp != nil {
-				statusCode = resp.StatusCode
+				var respbody []byte
 				if resp.Body != nil {
 					respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
 				}
+				errs = append(errs, fmt.Sprintf("%s: %d %s",
+					url, resp.StatusCode, strings.TrimSpace(string(respbody))))
+			} else {
+				errs = append(errs, fmt.Sprintf("%s: %v", url, err))
 			}
-			response := strings.TrimSpace(string(respbody))
-			log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
-				requestId, url, statusCode, err, response)
 			continue
 		}
-
-		if resp.StatusCode == http.StatusOK {
-			log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
-			return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
-		}
+		return HashCheckingReader{
+			Reader: resp.Body,
+			Hash: md5.New(),
+			Check: locator[0:32],
+		}, resp.ContentLength, url, nil
 	}
-
+	log.Printf("DEBUG: GET %s failed: %v", locator, errs)
 	return nil, 0, "", BlockNotFound
 }
 
-// Determine if a block with the given hash is available and readable, but does
-// not return the block contents.
-func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
-	return this.AuthorizedAsk(hash, "", "")
-}
-
-// Determine if a block with the given hash is available and readable with the
-// given signature and timestamp, but does not return the block contents.
-func (this KeepClient) AuthorizedAsk(hash string, signature string,
-	timestamp string) (contentLength int64, url string, err error) {
-	// Calculate the ordering for asking servers
-	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
-	for _, host := range sv {
-		var req *http.Request
-		var err error
-		if signature != "" {
-			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
-				signature, timestamp)
-		} else {
-			url = fmt.Sprintf("%s/%s", host, hash)
-		}
-
-		if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+// Ask() verifies that a block with the given hash is available and
+// readable, according to at least one Keep service. Unlike Get(), it
+// does not verify, retrieve, or return the data.
+//
+// Returns the data size (content length) reported by the Keep service
+// and the URI reporting the data size.
+func (kc *KeepClient) Ask(locator string) (int64, string, error) {
+	for _, host := range kc.getSortedRoots(locator) {
+		url := host+"/"+locator
+		req, err := http.NewRequest("HEAD", url, nil)
+		if err != nil {
 			continue
 		}
-
-		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
-		var resp *http.Response
-		if resp, err = this.Client.Do(req); err != nil {
-			continue
-		}
-
-		if resp.StatusCode == http.StatusOK {
+		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+		if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
 			return resp.ContentLength, url, nil
 		}
 	}
-
 	return 0, "", BlockNotFound
+}
 
+// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
+// services: uuid -> baseURI.
+func (kc *KeepClient) LocalRoots() map[string]string {
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	return *kc.localRoots
 }
 
-// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() map[string]string {
-	r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
-	return *r
+// GatewayRoots() returns the map of Keep remote gateway services:
+// uuid -> baseURI.
+func (kc *KeepClient) GatewayRoots() map[string]string {
+	kc.lock.RLock()
+	defer kc.lock.RUnlock()
+	return *kc.gatewayRoots
 }
 
-// Atomically update the service_roots field.  Enables you to update
-// service_roots without disrupting any GET or PUT operations that might
-// already be in progress.
-func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
-	roots := make(map[string]string)
-	for uuid, root := range new_roots {
-		roots[uuid] = root
+// SetServiceRoots updates the localRoots and gatewayRoots maps,
+// without risk of disrupting operations that are already in progress.
+//
+// The KeepClient makes its own copy of the supplied maps, so the
+// caller can reuse/modify them after SetServiceRoots returns, but
+// they should not be modified by any other goroutine while
+// SetServiceRoots is running.
+func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+	locals := make(map[string]string)
+	for uuid, root := range newLocals {
+		locals[uuid] = root
+	}
+	gateways := make(map[string]string)
+	for uuid, root := range newGateways {
+		gateways[uuid] = root
 	}
-	atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
-		unsafe.Pointer(&roots))
+	kc.lock.Lock()
+	defer kc.lock.Unlock()
+	kc.localRoots = &locals
+	kc.gatewayRoots = &gateways
+}
+
+// getSortedRoots returns a list of base URIs of Keep services, in the
+// order they should be attempted in order to retrieve content for the
+// given locator.
+func (kc *KeepClient) getSortedRoots(locator string) []string {
+	var found []string
+	for _, hint := range strings.Split(locator, "+") {
+		if len(hint) < 7 || hint[0:2] != "K@" {
+			// Not a service hint.
+			continue
+		}
+		if len(hint) == 7 {
+			// +K at abcde means fetch from proxy at
+			// keep.abcde.arvadosapi.com
+			found = append(found, "https://keep."+hint[2:]+".arvadosapi.com")
+		} else if len(hint) == 29 {
+			// +K at abcde-abcde-abcdeabcdeabcde means fetch
+			// from gateway with given uuid
+			if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok {
+				found = append(found, gwURI)
+			}
+			// else this hint is no use to us; carry on.
+		}
+	}
+	if len(found) == 0 {
+		// No usable service hints were found. Use local roots.
+		found = NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()
+	}
+	return found
 }
 
 type Locator struct {
@@ -279,12 +275,9 @@ func MakeLocator2(hash string, hints string) (locator Locator) {
 	return locator
 }
 
-func MakeLocator(path string) Locator {
-	pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
-	if err != nil {
-		log.Print("Don't like regexp", err)
-	}
+var pathpattern = regexp.MustCompile("^([0-9a-f]{32})([+].*)?$")
 
+func MakeLocator(path string) Locator {
 	sm := pathpattern.FindStringSubmatch(path)
 	if sm == nil {
 		log.Print("Failed match ", path)
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index cbd27d7..db0ab24 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -63,8 +63,8 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
 	kc, err := MakeKeepClient(&arv)
 
 	c.Assert(err, Equals, nil)
-	c.Check(len(kc.ServiceRoots()), Equals, 2)
-	for _, root := range kc.ServiceRoots() {
+	c.Check(len(kc.LocalRoots()), Equals, 2)
+	for _, root := range kc.LocalRoots() {
 		c.Check(root, Matches, "http://localhost:\\d+")
 	}
 }
@@ -98,7 +98,7 @@ func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
 	return
 }
 
-func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
 	io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
 
 	ks := RunFakeKeepServer(st)
@@ -126,7 +126,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
-		func(kc KeepClient, url string, reader io.ReadCloser,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
 			go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
@@ -153,7 +153,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
 		make(chan string)}
 
 	UploadToStubHelper(c, st,
-		func(kc KeepClient, url string, reader io.ReadCloser,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
 			tr := streamer.AsyncStreamFromReader(512, reader)
@@ -193,7 +193,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 	hash := "acbd18db4cc2f85cedef654fccc4a4d8"
 
 	UploadToStubHelper(c, st,
-		func(kc KeepClient, url string, reader io.ReadCloser,
+		func(kc *KeepClient, url string, reader io.ReadCloser,
 			writer io.WriteCloser, upload_status chan uploadStatus) {
 
 			go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
@@ -242,21 +242,21 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5)
 
 	for i, k := range ks {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	kc.PutB([]byte("foo"))
 
 	shuff := NewRootSorter(
-		kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+		kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
 
 	s1 := <-st.handled
 	s2 := <-st.handled
@@ -285,16 +285,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks := RunSomeFakeKeepServers(st, 5)
 
 	for i, k := range ks {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	reader, writer := io.Pipe()
 
@@ -305,7 +305,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 
 	kc.PutHR(hash, reader, 3)
 
-	shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
+	shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
 	log.Print(shuff)
 
 	s1 := <-st.handled
@@ -339,24 +339,24 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 4)
 	ks2 := RunSomeFakeKeepServers(fh, 1)
 
 	for i, k := range ks1 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	shuff := NewRootSorter(
-		kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+		kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
 
 	phash, replicas, err := kc.PutB([]byte("foo"))
 
@@ -395,21 +395,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
 	kc.Want_replicas = 2
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 	ks2 := RunSomeFakeKeepServers(fh, 4)
 
 	for i, k := range ks1 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 
@@ -451,7 +451,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url})
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
 
 	r, n, url2, err := kc.Get(hash)
 	defer r.Close()
@@ -477,7 +477,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url})
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
 
 	r, n, url2, err := kc.Get(hash)
 	c.Check(err, Equals, BlockNotFound)
@@ -486,6 +486,43 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 	c.Check(r, Equals, nil)
 }
 
+func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
+	uuid := "zzzzz-bi6l4-123451234512345"
+	hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+	// This one shouldn't be used:
+	ks0 := RunFakeKeepServer(StubGetHandler{
+		c,
+		"error if used",
+		"abc123",
+		[]byte("foo")})
+	defer ks0.listener.Close()
+	// This one should be used:
+	ks := RunFakeKeepServer(StubGetHandler{
+		c,
+		hash+"+K@"+uuid,
+		"abc123",
+		[]byte("foo")})
+	defer ks.listener.Close()
+
+	arv, err := arvadosclient.MakeArvadosClient()
+	kc, _ := MakeKeepClient(&arv)
+	arv.ApiToken = "abc123"
+	kc.SetServiceRoots(
+		map[string]string{"x": ks0.url},
+		map[string]string{uuid: ks.url})
+
+	r, n, uri, err := kc.Get(hash+"+K@"+uuid)
+	defer r.Close()
+	c.Check(err, Equals, nil)
+	c.Check(n, Equals, int64(3))
+	c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
+
+	content, err := ioutil.ReadAll(r)
+	c.Check(err, Equals, nil)
+	c.Check(content, DeepEquals, []byte("foo"))
+}
+
 type BarHandler struct {
 	handled chan string
 }
@@ -507,7 +544,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	kc.SetServiceRoots(map[string]string{"x": ks.url})
+	kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
 
 	r, n, _, err := kc.Get(barhash)
 	_, err = ioutil.ReadAll(r)
@@ -540,21 +577,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	arv, err := arvadosclient.MakeArvadosClient()
 	kc, _ := MakeKeepClient(&arv)
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 	ks2 := RunSomeFakeKeepServers(fh, 4)
 
 	for i, k := range ks1 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 	for i, k := range ks2 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	// This test works only if one of the failing services is
 	// attempted before the succeeding service. Otherwise,
@@ -562,7 +599,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	// the choice of block content "waz" and the UUIDs of the fake
 	// servers, so we just tried different strings until we found
 	// an example that passes this Assert.)
-	c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
+	c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
 
 	r, n, url2, err := kc.Get(hash)
 
@@ -634,16 +671,16 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
 	kc.Want_replicas = 2
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 
 	for i, k := range ks1 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
 
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
@@ -665,15 +702,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
 	kc.Want_replicas = 3
 	kc.Using_proxy = true
 	arv.ApiToken = "abc123"
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
 
 	ks1 := RunSomeFakeKeepServers(st, 1)
 
 	for i, k := range ks1 {
-		service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+		localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
 		defer k.listener.Close()
 	}
-	kc.SetServiceRoots(service_roots)
+	kc.SetServiceRoots(localRoots, nil)
 
 	_, replicas, err := kc.PutB([]byte("foo"))
 	<-st.handled
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 940a110..8732b19 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -76,7 +76,7 @@ func (this *KeepClient) setClientSettingsStore() {
 	}
 }
 
-func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
+func (this *KeepClient) DiscoverKeepServers() error {
 	type svcList struct {
 		Items []keepDisk `json:"items"`
 	}
@@ -86,29 +86,34 @@ func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
 
 	if err != nil {
 		if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
-			return nil, err
+			return err
 		}
 	}
 
 	listed := make(map[string]bool)
-	service_roots := make(map[string]string)
+	localRoots := make(map[string]string)
+	gatewayRoots := make(map[string]string)
 
-	for _, element := range m.Items {
-		n := ""
-
-		if element.SSL {
-			n = "s"
+	for _, service := range m.Items {
+		scheme := "http"
+		if service.SSL {
+			scheme = "https"
 		}
-
-		// Construct server URL
-		url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+		url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
 
 		// Skip duplicates
-		if !listed[url] {
-			listed[url] = true
-			service_roots[element.Uuid] = url
+		if listed[url] {
+			continue
 		}
-		if element.SvcType == "proxy" {
+		listed[url] = true
+
+		switch service.SvcType {
+		case "gateway":
+			gatewayRoots[service.Uuid] = url
+		case "disk":
+			localRoots[service.Uuid] = url
+		case "proxy":
+			localRoots[service.Uuid] = url
 			this.Using_proxy = true
 		}
 	}
@@ -119,9 +124,8 @@ func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
 		this.setClientSettingsStore()
 	}
 
-	this.SetServiceRoots(service_roots)
-
-	return service_roots, nil
+	this.SetServiceRoots(localRoots, gatewayRoots)
+	return nil
 }
 
 type uploadStatus struct {
@@ -204,7 +208,7 @@ func (this KeepClient) putReplicas(
 	requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
 
 	// Calculate the ordering for uploading to servers
-	sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
+	sv := NewRootSorter(this.LocalRoots(), hash).GetSortedRoots()
 
 	// The next server to try contacting
 	next_server := 0
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6196b50..e702005 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -518,6 +518,7 @@ class KeepClient(object):
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
+                self._gateway_services = None
                 self._keep_services = [{
                     'uuid': 'proxy',
                     '_service_root': proxy,
@@ -531,6 +532,7 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
+                self._gateway_services = None
                 self._keep_services = None
                 self.using_proxy = None
                 self._static_services_list = False
@@ -560,21 +562,30 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            self._keep_services = keep_services.execute().get('items')
-            if not self._keep_services:
+            accessible = keep_services.execute().get('items')
+            if not accessible:
                 raise arvados.errors.NoKeepServersError()
 
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
-
             # Precompute the base URI for each service.
-            for r in self._keep_services:
+            for r in accessible:
                 r['_service_root'] = "{}://[{}]:{:d}/".format(
                     'https' if r['service_ssl_flag'] else 'http',
                     r['service_host'],
                     r['service_port'])
+
+            self._gateway_services = {
+                ks.get('uuid'): ks
+                for ks in accessible if ks.get('service_type') == 'gateway'}
+            _logger.debug(str(self._gateway_services))
+
+            self._keep_services = [
+                ks for ks in accessible
+                if ks.get('service_type') in ['disk', 'proxy']]
             _logger.debug(str(self._keep_services))
 
+            self.using_proxy = any(ks.get('service_type') == 'proxy'
+                                   for ks in self._keep_services)
+
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
         block with a known hash.
@@ -670,15 +681,26 @@ class KeepClient(object):
             v = slot.get()
             return v
 
+        # If the locator has hints specifying a prefix (indicating a
+        # remote keepproxy) or the UUID of a local gateway service,
+        # read data from the indicated service(s) instead of the usual
+        # list of local disk services.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                           for hint in locator.hints if (
+                                   hint.startswith('K@') and
+                                   len(hint) == 29 and
+                                   self._gateway_services.get(hint[2:])
+                                   )])
+        # Map root URLs to their KeepService objects.
+        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+
         # See #3147 for a discussion of the loop implementation.  Highlights:
         # * Refresh the list of Keep services after each failure, in case
         #   it's being updated.
         # * Retry until we succeed, we're out of retries, or every available
         #   service has returned permanent failure.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@')]
-        # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 581f7f4..1ee6dab 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"errors"
 	"flag"
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -105,7 +106,7 @@ func main() {
 		log.Fatalf("Could not listen on %v", listen)
 	}
 
-	go RefreshServicesList(&kc)
+	go RefreshServicesList(kc)
 
 	// Shut down the server gracefully (by closing the listener)
 	// if SIGTERM is received.
@@ -118,10 +119,10 @@ func main() {
 	signal.Notify(term, syscall.SIGTERM)
 	signal.Notify(term, syscall.SIGINT)
 
-	log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+	log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
 
 	// Start listening for requests.
-	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+	http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
 
 	log.Println("shutting down")
 }
@@ -134,27 +135,24 @@ type ApiTokenCache struct {
 
 // Refresh the keep service list every five minutes.
 func RefreshServicesList(kc *keepclient.KeepClient) {
-	var sleeptime time.Duration
+	previousRoots := ""
 	for {
-		oldservices := kc.ServiceRoots()
-		newservices, err := kc.DiscoverKeepServers()
-		if err == nil && len(newservices) > 0 {
-			s1 := fmt.Sprint(oldservices)
-			s2 := fmt.Sprint(newservices)
-			if s1 != s2 {
-				log.Printf("Updated server list to %v", s2)
-			}
-			sleeptime = 300 * time.Second
+		if err := kc.DiscoverKeepServers(); err != nil {
+			log.Println("Error retrieving services list:", err)
+			time.Sleep(3*time.Second)
+			previousRoots = ""
+		} else if len(kc.LocalRoots()) == 0 {
+			log.Println("Received empty services list")
+			time.Sleep(3*time.Second)
+			previousRoots = ""
 		} else {
-			// There was an error, or the list is empty, so wait 3 seconds and try again.
-			if err != nil {
-				log.Printf("Error retrieving server list: %v", err)
-			} else {
-				log.Printf("Retrieved an empty server list")
+			newRoots := fmt.Sprint("Locals ", kc.LocalRoots(), ", gateways ", kc.GatewayRoots())
+			if newRoots != previousRoots {
+				log.Println("Updated services list:", newRoots)
+				previousRoots = newRoots
 			}
-			sleeptime = 3 * time.Second
+			time.Sleep(300*time.Second)
 		}
-		time.Sleep(sleeptime)
 	}
 }
 
@@ -258,14 +256,14 @@ func MakeRESTRouter(
 	rest := mux.NewRouter()
 
 	if enable_get {
-		rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+		rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
 			GetBlockHandler{kc, t}).Methods("GET", "HEAD")
-		rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+		rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
 	}
 
 	if enable_put {
-		rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
-		rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
+		rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
+		rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
 		rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
 		rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
 		rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
@@ -293,22 +291,29 @@ func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
 	SetCorsHeaders(resp)
 }
 
+var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
+var ContentLengthMismatch = errors.New("Actual length != expected content length")
+
 func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	SetCorsHeaders(resp)
 
-	kc := *this.KeepClient
-
-	hash := mux.Vars(req)["hash"]
-	hints := mux.Vars(req)["hints"]
+	locator := mux.Vars(req)["locator"]
+	var err error
+	var status int
+	var expectLength, responseLength int64
+	var proxiedURI = "-"
 
-	locator := keepclient.MakeLocator2(hash, hints)
+	defer func() {
+		log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
+	}()
 
-	log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
+	kc := *this.KeepClient
 
 	var pass bool
 	var tok string
 	if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
-		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+		err = BadAuthorizationHeader
+		http.Error(resp, err.Error(), http.StatusForbidden)
 		return
 	}
 
@@ -318,42 +323,36 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 	kc.Arvados = &arvclient
 
 	var reader io.ReadCloser
-	var err error
-	var blocklen int64
 
-	if req.Method == "GET" {
-		reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
+	switch req.Method {
+	case "HEAD":
+		expectLength, proxiedURI, err = kc.Ask(locator)
+	case "GET":
+		reader, expectLength, proxiedURI, err = kc.Get(locator)
 		if reader != nil {
 			defer reader.Close()
 		}
-	} else if req.Method == "HEAD" {
-		blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
+	default:
+		err = errors.New("Unsupported method")
+		return
 	}
 
-	if blocklen == -1 {
-		log.Printf("%s: %s %s Keep server did not return Content-Length",
-			GetRemoteAddress(req), req.Method, hash)
+	if expectLength == -1 {
+		log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
 	}
 
-	var status = 0
 	switch err {
 	case nil:
 		status = http.StatusOK
-		resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
-		if reader != nil {
-			n, err2 := io.Copy(resp, reader)
-			if blocklen > -1 && n != blocklen {
-				log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
-					GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
-			} else if err2 == nil {
-				log.Printf("%s: %s %s %v %v",
-					GetRemoteAddress(req), req.Method, hash, status, n)
-			} else {
-				log.Printf("%s: %s %s %v %v copy error: %v",
-					GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
+		resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
+		switch req.Method {
+		case "HEAD":
+			responseLength = 0
+		case "GET":
+			responseLength, err = io.Copy(resp, reader)
+			if err == nil && expectLength > -1 && responseLength != expectLength {
+				err = ContentLengthMismatch
 			}
-		} else {
-			log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
 		}
 	case keepclient.BlockNotFound:
 		status = http.StatusNotFound
@@ -362,48 +361,56 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 		status = http.StatusBadGateway
 		http.Error(resp, err.Error(), http.StatusBadGateway)
 	}
-
-	if err != nil {
-		log.Printf("%s: %s %s %v error: %v",
-			GetRemoteAddress(req), req.Method, hash, status, err.Error())
-	}
 }
 
+var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
+var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
+
 func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 	SetCorsHeaders(resp)
 
-	kc := *this.KeepClient
-
-	hash := mux.Vars(req)["hash"]
-	hints := mux.Vars(req)["hints"]
+	var err error
+	var expectLength int64 = -1
+	var status = http.StatusInternalServerError
+	var wantReplicas, wroteReplicas int
+	var locatorOut string = "-"
+
+	defer func() {
+		if status != http.StatusOK {
+			http.Error(resp, err.Error(), status)
+		}
+		log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, wantReplicas, wroteReplicas, locatorOut, err)
+	}()
 
-	locator := keepclient.MakeLocator2(hash, hints)
+	locatorIn := mux.Vars(req)["locator"]
+	kc := *this.KeepClient
+	wantReplicas = kc.Want_replicas
 
-	var contentLength int64 = -1
 	if req.Header.Get("Content-Length") != "" {
-		_, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
+		_, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
 		if err != nil {
-			resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
+			resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
 		}
 
 	}
 
-	log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
-
-	if contentLength < 0 {
-		http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
+	if expectLength < 0 {
+		err = LengthRequiredError
+		status = http.StatusLengthRequired
 		return
 	}
 
-	if locator.Size > 0 && int64(locator.Size) != contentLength {
-		http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
+	if loc := keepclient.MakeLocator(locatorIn); loc.Size > 0 && int64(loc.Size) != expectLength {
+		err = LengthMismatchError
+		status = http.StatusBadRequest
 		return
 	}
 
 	var pass bool
 	var tok string
 	if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
-		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+		err = BadAuthorizationHeader
+		status = http.StatusForbidden
 		return
 	}
 
@@ -414,65 +421,54 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
 
 	// Check if the client specified the number of replicas
 	if req.Header.Get("X-Keep-Desired-Replicas") != "" {
-		var r int
-		_, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
+		_, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &wantReplicas)
 		if err != nil {
-			kc.Want_replicas = r
+			defer func(w int) {
+				// Restore kc's previous Want_replicas
+				// back to its default for next time.
+				kc.Want_replicas = w
+			}(kc.Want_replicas)
+			kc.Want_replicas = wantReplicas
 		}
 	}
 
 	// Now try to put the block through
-	var replicas int
-	var put_err error
-	if hash == "" {
+	if locatorIn == "" {
 		if bytes, err := ioutil.ReadAll(req.Body); err != nil {
-			msg := fmt.Sprintf("Error reading request body: %s", err)
-			log.Printf(msg)
-			http.Error(resp, msg, http.StatusInternalServerError)
+			err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
+			status = http.StatusInternalServerError
 			return
 		} else {
-			hash, replicas, put_err = kc.PutB(bytes)
+			locatorOut, wroteReplicas, err = kc.PutB(bytes)
 		}
 	} else {
-		hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
+		locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
 	}
 
 	// Tell the client how many successful PUTs we accomplished
-	resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
+	resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
 
-	switch put_err {
+	switch err {
 	case nil:
-		// Default will return http.StatusOK
-		log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
-		n, err2 := io.WriteString(resp, hash)
-		if err2 != nil {
-			log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
-		}
+		status = http.StatusOK
+		_, err = io.WriteString(resp, locatorOut)
 
 	case keepclient.OversizeBlockError:
 		// Too much data
-		http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
+		status = http.StatusRequestEntityTooLarge
 
 	case keepclient.InsufficientReplicasError:
-		if replicas > 0 {
+		if wroteReplicas > 0 {
 			// At least one write is considered success.  The
 			// client can decide if getting less than the number of
 			// replications it asked for is a fatal error.
-			// Default will return http.StatusOK
-			n, err2 := io.WriteString(resp, hash)
-			if err2 != nil {
-				log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
-			}
+			status = http.StatusOK
+			_, err = io.WriteString(resp, locatorOut)
 		} else {
-			http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
+			status = http.StatusServiceUnavailable
 		}
 
 	default:
-		http.Error(resp, put_err.Error(), http.StatusBadGateway)
-	}
-
-	if put_err != nil {
-		log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())
+		status = http.StatusBadGateway
 	}
-
 }
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index e3b4e36..5f6e2b9 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -117,10 +117,10 @@ func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.K
 	}
 	kc.SetServiceRoots(map[string]string{
 		"proxy": fmt.Sprintf("http://localhost:%v", port),
-	})
+	}, nil)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.ServiceRoots()), Equals, 1)
-	for _, root := range kc.ServiceRoots() {
+	c.Check(len(kc.LocalRoots()), Equals, 1)
+	for _, root := range kc.LocalRoots() {
 		c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
 	}
 	log.Print("keepclient created")
@@ -154,8 +154,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
 	c.Assert(err, Equals, nil)
 	c.Check(kc.Arvados.External, Equals, true)
 	c.Check(kc.Using_proxy, Equals, true)
-	c.Check(len(kc.ServiceRoots()), Equals, 1)
-	for _, root := range kc.ServiceRoots() {
+	c.Check(len(kc.LocalRoots()), Equals, 1)
+	for _, root := range kc.LocalRoots() {
 		c.Check(root, Equals, "http://localhost:29950")
 	}
 	os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index a363bac..c1371dc 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -276,7 +276,7 @@ func main() {
 	}
 
 	// Initialize Pull queue and worker
-	keepClient := keepclient.KeepClient{
+	keepClient := &keepclient.KeepClient{
 		Arvados:       nil,
 		Want_replicas: 1,
 		Using_proxy:   true,
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index fac4bb1..d85458a 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -19,7 +19,7 @@ import (
 			Skip the rest of the servers if no errors
 		Repeat
 */
-func RunPullWorker(pullq *WorkQueue, keepClient keepclient.KeepClient) {
+func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
 	nextItem := pullq.NextItem
 	for item := range nextItem {
 		pullRequest := item.(PullRequest)
@@ -39,14 +39,14 @@ func RunPullWorker(pullq *WorkQueue, keepClient keepclient.KeepClient) {
 		Using this token & signature, retrieve the given block.
 		Write to storage
 */
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient keepclient.KeepClient) (err error) {
+func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
 	keepClient.Arvados.ApiToken = token
 
 	service_roots := make(map[string]string)
 	for _, addr := range pullRequest.Servers {
 		service_roots[addr] = addr
 	}
-	keepClient.SetServiceRoots(service_roots)
+	keepClient.SetServiceRoots(service_roots, nil)
 
 	// Generate signature with a random token
 	expires_at := time.Now().Add(60 * time.Second)
@@ -75,7 +75,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient keepcl
 }
 
 // Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
 	reader io.ReadCloser, contentLength int64, url string, err error) {
 	reader, blocklen, url, err := keepClient.Get(signedLocator)
 	return reader, blocklen, url, err
diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go
index b293cf9..7a930d5 100644
--- a/services/keepstore/pull_worker_integration_test.go
+++ b/services/keepstore/pull_worker_integration_test.go
@@ -10,7 +10,7 @@ import (
 	"testing"
 )
 
-var keepClient keepclient.KeepClient
+var keepClient *keepclient.KeepClient
 
 type PullWorkIntegrationTestData struct {
 	Name     string
@@ -33,7 +33,7 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
 	}
 
 	// keep client
-	keepClient = keepclient.KeepClient{
+	keepClient = &keepclient.KeepClient{
 		Arvados:       &arv,
 		Want_replicas: 1,
 		Using_proxy:   true,
@@ -42,17 +42,15 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
 
 	// discover keep services
 	var servers []string
-	service_roots, err := keepClient.DiscoverKeepServers()
-	if err != nil {
+	if err := keepClient.DiscoverKeepServers(); err != nil {
 		t.Error("Error discovering keep services")
 	}
-	for _, host := range service_roots {
+	for _, host := range keepClient.LocalRoots() {
 		servers = append(servers, host)
 	}
 
 	// Put content if the test needs it
 	if wantData {
-		keepClient.SetServiceRoots(service_roots)
 		locator, _, err := keepClient.PutB([]byte(testData.Content))
 		if err != nil {
 			t.Errorf("Error putting test data in setup for %s %s %v", testData.Content, locator, err)
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index f0e9e65..124c9b8 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -244,7 +244,7 @@ func performTest(testData PullWorkerTestData, c *C) {
 	testPullLists[testData.name] = testData.response_body
 
 	// Override GetContent to mock keepclient Get functionality
-	GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+	GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
 		reader io.ReadCloser, contentLength int64, url string, err error) {
 
 		processedPullLists[testData.name] = testData.response_body

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list