[ARVADOS] created: 90c7763b1c959da79ac5701247b2fa4824090b28
git at public.curoverse.com
git at public.curoverse.com
Sat Mar 28 03:21:33 EDT 2015
at 90c7763b1c959da79ac5701247b2fa4824090b28 (commit)
commit 90c7763b1c959da79ac5701247b2fa4824090b28
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Mar 27 04:17:48 2015 -0400
5414: Add client support for Keep service hints.
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index 5d79194..9cf84d9 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -14,11 +14,9 @@ import (
"net/http"
"os"
"regexp"
+ "strconv"
"strings"
"sync"
- "sync/atomic"
- "time"
- "unsafe"
)
// A Keep "block" is 64MB.
@@ -26,7 +24,7 @@ const BLOCKSIZE = 64 * 1024 * 1024
var BlockNotFound = errors.New("Block not found")
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
-var OversizeBlockError = errors.New("Block too big")
+var OversizeBlockError = errors.New("Exceeded maximum block size ("+strconv.Itoa(BLOCKSIZE)+")")
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
@@ -38,34 +36,33 @@ type KeepClient struct {
Arvados *arvadosclient.ArvadosClient
Want_replicas int
Using_proxy bool
- service_roots *map[string]string
- lock sync.Mutex
+ localRoots *map[string]string
+ gatewayRoots *map[string]string
+ lock sync.RWMutex
Client *http.Client
}
// Create a new KeepClient. This will contact the API server to discover Keep
// servers.
-func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
+func MakeKeepClient(arv *arvadosclient.ArvadosClient) (*KeepClient, error) {
var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
- kc = KeepClient{
+ kc := &KeepClient{
Arvados: arv,
Want_replicas: 2,
Using_proxy: false,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
}
- _, err = (&kc).DiscoverKeepServers()
-
- return kc, err
+ return kc, kc.DiscoverKeepServers()
}
// Put a block given the block hash, a reader with the block data, and the
// expected length of that data. The desired number of replicas is given in
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
+// whenever 0 <= replicas < kc.Wants_replicas.
+func (kc *KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
// Buffer for reads from 'r'
var bufsize int
@@ -81,171 +78,170 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (lo
t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
defer t.Close()
- return this.putReplicas(hash, t, expectedLength)
+ return kc.putReplicas(hash, t, expectedLength)
}
// Put a block given the block hash and a byte buffer. The desired number of
// replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
-// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
+// InsufficientReplias whenever 0 <= replicas < kc.Wants_replicas.
+func (kc *KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
- return this.putReplicas(hash, t, int64(len(buf)))
+ return kc.putReplicas(hash, t, int64(len(buf)))
}
// Put a block given a buffer. The hash will be computed. The desired number
// of replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
-func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
+func (kc *KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
hash := fmt.Sprintf("%x", md5.Sum(buffer))
- return this.PutHB(hash, buffer)
+ return kc.PutHB(hash, buffer)
}
// Put a block, given a Reader. This will read the entire reader into a buffer
// to compute the hash. The desired number of replicas is given in
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
-// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block
+// whenever 0 <= replicas < kc.Wants_replicas. Also nhote that if the block
// hash and data size are available, PutHR() is more efficient.
-func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
+func (kc *KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
if buffer, err := ioutil.ReadAll(r); err != nil {
return "", 0, err
} else {
- return this.PutB(buffer)
+ return kc.PutB(buffer)
}
}
-// Get a block given a hash. Return a reader, the expected data length, the
-// URL the block was fetched from, and if there was an error. If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) Get(hash string) (reader io.ReadCloser,
- contentLength int64, url string, err error) {
- return this.AuthorizedGet(hash, "", "")
-}
-
-// Get a block given a hash, with additional authorization provided by
-// signature and timestamp. Return a reader, the expected data length, the URL
-// the block was fetched from, and if there was an error. If the block
-// checksum does not match, the final Read() on the reader returned by this
-// method will return a BadChecksum error instead of EOF.
-func (this KeepClient) AuthorizedGet(hash string,
- signature string,
- timestamp string) (reader io.ReadCloser,
- contentLength int64, url string, err error) {
-
- // Take the hash of locator and timestamp in order to identify this
- // specific transaction in log statements.
- requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
-
- // Calculate the ordering for asking servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
- for _, host := range sv {
- var req *http.Request
- var err error
- var url string
- if signature != "" {
- url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
- signature, timestamp)
- } else {
- url = fmt.Sprintf("%s/%s", host, hash)
- }
- if req, err = http.NewRequest("GET", url, nil); err != nil {
+// Get a block given a locator. Return a reader, the expected data length, the
+// URL the block is being fetched from, and an error.
+//
+// If the block checksum does not match, the final Read() on the
+// reader returned by this method will return a BadChecksum error
+// instead of EOF.
+func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
+
+ var errs []string
+ for _, host := range kc.getSortedRoots(locator) {
+ url := host+"/"+locator
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
continue
}
-
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
- log.Printf("[%v] Begin download %s", requestId, url)
-
- var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
- statusCode := -1
- var respbody []byte
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ resp, err := kc.Client.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
if resp != nil {
- statusCode = resp.StatusCode
+ var respbody []byte
if resp.Body != nil {
respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
}
+ errs = append(errs, fmt.Sprintf("%s: %d %s",
+ url, resp.StatusCode, strings.TrimSpace(string(respbody))))
+ } else {
+ errs = append(errs, fmt.Sprintf("%s: %v", url, err))
}
- response := strings.TrimSpace(string(respbody))
- log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
- requestId, url, statusCode, err, response)
continue
}
-
- if resp.StatusCode == http.StatusOK {
- log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
- return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
- }
+ return HashCheckingReader{
+ Reader: resp.Body,
+ Hash: md5.New(),
+ Check: locator[0:32],
+ }, resp.ContentLength, url, nil
}
-
+ log.Printf("DEBUG: GET %s failed: %v", locator, errs)
return nil, 0, "", BlockNotFound
}
-// Determine if a block with the given hash is available and readable, but does
-// not return the block contents.
-func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
- return this.AuthorizedAsk(hash, "", "")
-}
-
-// Determine if a block with the given hash is available and readable with the
-// given signature and timestamp, but does not return the block contents.
-func (this KeepClient) AuthorizedAsk(hash string, signature string,
- timestamp string) (contentLength int64, url string, err error) {
- // Calculate the ordering for asking servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
-
- for _, host := range sv {
- var req *http.Request
- var err error
- if signature != "" {
- url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
- signature, timestamp)
- } else {
- url = fmt.Sprintf("%s/%s", host, hash)
- }
-
- if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+// Ask() verifies that a block with the given hash is available and
+// readable, according to at least one Keep service. Unlike Get(), it
+// does not verify, retrieve, or return the data.
+//
+// Returns the data size (content length) reported by the Keep service
+// and the URI reporting the data size.
+func (kc *KeepClient) Ask(locator string) (int64, string, error) {
+ for _, host := range kc.getSortedRoots(locator) {
+ url := host+"/"+locator
+ req, err := http.NewRequest("HEAD", url, nil)
+ if err != nil {
continue
}
-
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
-
- var resp *http.Response
- if resp, err = this.Client.Do(req); err != nil {
- continue
- }
-
- if resp.StatusCode == http.StatusOK {
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", kc.Arvados.ApiToken))
+ if resp, err := kc.Client.Do(req); err == nil && resp.StatusCode == http.StatusOK {
return resp.ContentLength, url, nil
}
}
-
return 0, "", BlockNotFound
+}
+// LocalRoots() returns the map of local (i.e., disk and proxy) Keep
+// services: uuid -> baseURI.
+func (kc *KeepClient) LocalRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.localRoots
}
-// Atomically read the service_roots field.
-func (this *KeepClient) ServiceRoots() map[string]string {
- r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
- return *r
+// GatewayRoots() returns the map of Keep remote gateway services:
+// uuid -> baseURI.
+func (kc *KeepClient) GatewayRoots() map[string]string {
+ kc.lock.RLock()
+ defer kc.lock.RUnlock()
+ return *kc.gatewayRoots
}
-// Atomically update the service_roots field. Enables you to update
-// service_roots without disrupting any GET or PUT operations that might
-// already be in progress.
-func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
- roots := make(map[string]string)
- for uuid, root := range new_roots {
- roots[uuid] = root
+// SetServiceRoots updates the localRoots and gatewayRoots maps,
+// without risk of disrupting operations that are already in progress.
+//
+// The KeepClient makes its own copy of the supplied maps, so the
+// caller can reuse/modify them after SetServiceRoots returns, but
+// they should not be modified by any other goroutine while
+// SetServiceRoots is running.
+func (kc *KeepClient) SetServiceRoots(newLocals, newGateways map[string]string) {
+ locals := make(map[string]string)
+ for uuid, root := range newLocals {
+ locals[uuid] = root
+ }
+ gateways := make(map[string]string)
+ for uuid, root := range newGateways {
+ gateways[uuid] = root
}
- atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
- unsafe.Pointer(&roots))
+ kc.lock.Lock()
+ defer kc.lock.Unlock()
+ kc.localRoots = &locals
+ kc.gatewayRoots = &gateways
+}
+
+// getSortedRoots returns a list of base URIs of Keep services, in the
+// order they should be attempted in order to retrieve content for the
+// given locator.
+func (kc *KeepClient) getSortedRoots(locator string) []string {
+ var found []string
+ for _, hint := range strings.Split(locator, "+") {
+ if len(hint) < 7 || hint[0:2] != "K@" {
+ // Not a service hint.
+ continue
+ }
+ if len(hint) == 7 {
+ // +K at abcde means fetch from proxy at
+ // keep.abcde.arvadosapi.com
+ found = append(found, "https://keep."+hint[2:]+".arvadosapi.com")
+ } else if len(hint) == 29 {
+ // +K at abcde-abcde-abcdeabcdeabcde means fetch
+ // from gateway with given uuid
+ if gwURI, ok := kc.GatewayRoots()[hint[2:]]; ok {
+ found = append(found, gwURI)
+ }
+ // else this hint is no use to us; carry on.
+ }
+ }
+ if len(found) == 0 {
+ // No usable service hints were found. Use local roots.
+ found = NewRootSorter(kc.LocalRoots(), locator[0:32]).GetSortedRoots()
+ }
+ return found
}
type Locator struct {
@@ -279,12 +275,9 @@ func MakeLocator2(hash string, hints string) (locator Locator) {
return locator
}
-func MakeLocator(path string) Locator {
- pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
- if err != nil {
- log.Print("Don't like regexp", err)
- }
+var pathpattern = regexp.MustCompile("^([0-9a-f]{32})([+].*)?$")
+func MakeLocator(path string) Locator {
sm := pathpattern.FindStringSubmatch(path)
if sm == nil {
log.Print("Failed match ", path)
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index cbd27d7..db0ab24 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -63,8 +63,8 @@ func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
kc, err := MakeKeepClient(&arv)
c.Assert(err, Equals, nil)
- c.Check(len(kc.ServiceRoots()), Equals, 2)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 2)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Matches, "http://localhost:\\d+")
}
}
@@ -98,7 +98,7 @@ func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
return
}
-func UploadToStubHelper(c *C, st http.Handler, f func(KeepClient, string,
+func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
io.ReadCloser, io.WriteCloser, chan uploadStatus)) {
ks := RunFakeKeepServer(st)
@@ -126,7 +126,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
make(chan string)}
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), "TestUploadToStubKeepServer")
@@ -153,7 +153,7 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
make(chan string)}
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
tr := streamer.AsyncStreamFromReader(512, reader)
@@ -193,7 +193,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
hash := "acbd18db4cc2f85cedef654fccc4a4d8"
UploadToStubHelper(c, st,
- func(kc KeepClient, url string, reader io.ReadCloser,
+ func(kc *KeepClient, url string, reader io.ReadCloser,
writer io.WriteCloser, upload_status chan uploadStatus) {
go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, "TestFailedUploadToStubKeepServer")
@@ -242,21 +242,21 @@ func (s *StandaloneSuite) TestPutB(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
kc.PutB([]byte("foo"))
shuff := NewRootSorter(
- kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+ kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
s1 := <-st.handled
s2 := <-st.handled
@@ -285,16 +285,16 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks := RunSomeFakeKeepServers(st, 5)
for i, k := range ks {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
reader, writer := io.Pipe()
@@ -305,7 +305,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
kc.PutHR(hash, reader, 3)
- shuff := NewRootSorter(kc.ServiceRoots(), hash).GetSortedRoots()
+ shuff := NewRootSorter(kc.LocalRoots(), hash).GetSortedRoots()
log.Print(shuff)
s1 := <-st.handled
@@ -339,24 +339,24 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 4)
ks2 := RunSomeFakeKeepServers(fh, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
shuff := NewRootSorter(
- kc.ServiceRoots(), Md5String("foo")).GetSortedRoots()
+ kc.LocalRoots(), Md5String("foo")).GetSortedRoots()
phash, replicas, err := kc.PutB([]byte("foo"))
@@ -395,21 +395,21 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
kc.Want_replicas = 2
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
@@ -451,7 +451,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, url2, err := kc.Get(hash)
defer r.Close()
@@ -477,7 +477,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, url2, err := kc.Get(hash)
c.Check(err, Equals, BlockNotFound)
@@ -486,6 +486,43 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
c.Check(r, Equals, nil)
}
+func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
+ uuid := "zzzzz-bi6l4-123451234512345"
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+
+ // This one shouldn't be used:
+ ks0 := RunFakeKeepServer(StubGetHandler{
+ c,
+ "error if used",
+ "abc123",
+ []byte("foo")})
+ defer ks0.listener.Close()
+ // This one should be used:
+ ks := RunFakeKeepServer(StubGetHandler{
+ c,
+ hash+"+K@"+uuid,
+ "abc123",
+ []byte("foo")})
+ defer ks.listener.Close()
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+ arv.ApiToken = "abc123"
+ kc.SetServiceRoots(
+ map[string]string{"x": ks0.url},
+ map[string]string{uuid: ks.url})
+
+ r, n, uri, err := kc.Get(hash+"+K@"+uuid)
+ defer r.Close()
+ c.Check(err, Equals, nil)
+ c.Check(n, Equals, int64(3))
+ c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
+
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, Equals, nil)
+ c.Check(content, DeepEquals, []byte("foo"))
+}
+
type BarHandler struct {
handled chan string
}
@@ -507,7 +544,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- kc.SetServiceRoots(map[string]string{"x": ks.url})
+ kc.SetServiceRoots(map[string]string{"x": ks.url}, nil)
r, n, _, err := kc.Get(barhash)
_, err = ioutil.ReadAll(r)
@@ -540,21 +577,21 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
ks2 := RunSomeFakeKeepServers(fh, 4)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
for i, k := range ks2 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i+len(ks1))] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
// This test works only if one of the failing services is
// attempted before the succeeding service. Otherwise,
@@ -562,7 +599,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
// the choice of block content "waz" and the UUIDs of the fake
// servers, so we just tried different strings until we found
// an example that passes this Assert.)
- c.Assert(NewRootSorter(service_roots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
+ c.Assert(NewRootSorter(localRoots, hash).GetSortedRoots()[0], Not(Equals), ks1[0].url)
r, n, url2, err := kc.Get(hash)
@@ -634,16 +671,16 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
kc.Want_replicas = 2
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
@@ -665,15 +702,15 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
kc.Want_replicas = 3
kc.Using_proxy = true
arv.ApiToken = "abc123"
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
ks1 := RunSomeFakeKeepServers(st, 1)
for i, k := range ks1 {
- service_roots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
defer k.listener.Close()
}
- kc.SetServiceRoots(service_roots)
+ kc.SetServiceRoots(localRoots, nil)
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
diff --git a/sdk/go/keepclient/support.go b/sdk/go/keepclient/support.go
index 940a110..8732b19 100644
--- a/sdk/go/keepclient/support.go
+++ b/sdk/go/keepclient/support.go
@@ -76,7 +76,7 @@ func (this *KeepClient) setClientSettingsStore() {
}
}
-func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
+func (this *KeepClient) DiscoverKeepServers() error {
type svcList struct {
Items []keepDisk `json:"items"`
}
@@ -86,29 +86,34 @@ func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
if err != nil {
if err := this.Arvados.List("keep_disks", nil, &m); err != nil {
- return nil, err
+ return err
}
}
listed := make(map[string]bool)
- service_roots := make(map[string]string)
+ localRoots := make(map[string]string)
+ gatewayRoots := make(map[string]string)
- for _, element := range m.Items {
- n := ""
-
- if element.SSL {
- n = "s"
+ for _, service := range m.Items {
+ scheme := "http"
+ if service.SSL {
+ scheme = "https"
}
-
- // Construct server URL
- url := fmt.Sprintf("http%s://%s:%d", n, element.Hostname, element.Port)
+ url := fmt.Sprintf("%s://%s:%d", scheme, service.Hostname, service.Port)
// Skip duplicates
- if !listed[url] {
- listed[url] = true
- service_roots[element.Uuid] = url
+ if listed[url] {
+ continue
}
- if element.SvcType == "proxy" {
+ listed[url] = true
+
+ switch service.SvcType {
+ case "gateway":
+ gatewayRoots[service.Uuid] = url
+ case "disk":
+ localRoots[service.Uuid] = url
+ case "proxy":
+ localRoots[service.Uuid] = url
this.Using_proxy = true
}
}
@@ -119,9 +124,8 @@ func (this *KeepClient) DiscoverKeepServers() (map[string]string, error) {
this.setClientSettingsStore()
}
- this.SetServiceRoots(service_roots)
-
- return service_roots, nil
+ this.SetServiceRoots(localRoots, gatewayRoots)
+ return nil
}
type uploadStatus struct {
@@ -204,7 +208,7 @@ func (this KeepClient) putReplicas(
requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(this.LocalRoots(), hash).GetSortedRoots()
// The next server to try contacting
next_server := 0
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 6196b50..e702005 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -518,6 +518,7 @@ class KeepClient(object):
if not proxy.endswith('/'):
proxy += '/'
self.api_token = api_token
+ self._gateway_services = None
self._keep_services = [{
'uuid': 'proxy',
'_service_root': proxy,
@@ -531,6 +532,7 @@ class KeepClient(object):
api_client = arvados.api('v1')
self.api_client = api_client
self.api_token = api_client.api_token
+ self._gateway_services = None
self._keep_services = None
self.using_proxy = None
self._static_services_list = False
@@ -560,21 +562,30 @@ class KeepClient(object):
except Exception: # API server predates Keep services.
keep_services = self.api_client.keep_disks().list()
- self._keep_services = keep_services.execute().get('items')
- if not self._keep_services:
+ accessible = keep_services.execute().get('items')
+ if not accessible:
raise arvados.errors.NoKeepServersError()
- self.using_proxy = any(ks.get('service_type') == 'proxy'
- for ks in self._keep_services)
-
# Precompute the base URI for each service.
- for r in self._keep_services:
+ for r in accessible:
r['_service_root'] = "{}://[{}]:{:d}/".format(
'https' if r['service_ssl_flag'] else 'http',
r['service_host'],
r['service_port'])
+
+ self._gateway_services = {
+ ks.get('uuid'): ks
+ for ks in accessible if ks.get('service_type') == 'gateway'}
+ _logger.debug(str(self._gateway_services))
+
+ self._keep_services = [
+ ks for ks in accessible
+ if ks.get('service_type') in ['disk', 'proxy']]
_logger.debug(str(self._keep_services))
+ self.using_proxy = any(ks.get('service_type') == 'proxy'
+ for ks in self._keep_services)
+
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
block with a known hash.
@@ -670,15 +681,26 @@ class KeepClient(object):
v = slot.get()
return v
+ # If the locator has hints specifying a prefix (indicating a
+ # remote keepproxy) or the UUID of a local gateway service,
+ # read data from the indicated service(s) instead of the usual
+ # list of local disk services.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+ hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+ for hint in locator.hints if (
+ hint.startswith('K@') and
+ len(hint) == 29 and
+ self._gateway_services.get(hint[2:])
+ )])
+ # Map root URLs to their KeepService objects.
+ roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
+
# See #3147 for a discussion of the loop implementation. Highlights:
# * Refresh the list of Keep services after each failure, in case
# it's being updated.
# * Retry until we succeed, we're out of retries, or every available
# service has returned permanent failure.
- hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
- for hint in locator.hints if hint.startswith('K@')]
- # Map root URLs their KeepService objects.
- roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
blob = None
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go
index 581f7f4..1ee6dab 100644
--- a/services/keepproxy/keepproxy.go
+++ b/services/keepproxy/keepproxy.go
@@ -1,6 +1,7 @@
package main
import (
+ "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -105,7 +106,7 @@ func main() {
log.Fatalf("Could not listen on %v", listen)
}
- go RefreshServicesList(&kc)
+ go RefreshServicesList(kc)
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
@@ -118,10 +119,10 @@ func main() {
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Printf("Arvados Keep proxy started listening on %v with server list %v", listener.Addr(), kc.ServiceRoots())
+ log.Printf("Arvados Keep proxy started listening on %v", listener.Addr())
// Start listening for requests.
- http.Serve(listener, MakeRESTRouter(!no_get, !no_put, &kc))
+ http.Serve(listener, MakeRESTRouter(!no_get, !no_put, kc))
log.Println("shutting down")
}
@@ -134,27 +135,24 @@ type ApiTokenCache struct {
// Refresh the keep service list every five minutes.
func RefreshServicesList(kc *keepclient.KeepClient) {
- var sleeptime time.Duration
+ previousRoots := ""
for {
- oldservices := kc.ServiceRoots()
- newservices, err := kc.DiscoverKeepServers()
- if err == nil && len(newservices) > 0 {
- s1 := fmt.Sprint(oldservices)
- s2 := fmt.Sprint(newservices)
- if s1 != s2 {
- log.Printf("Updated server list to %v", s2)
- }
- sleeptime = 300 * time.Second
+ if err := kc.DiscoverKeepServers(); err != nil {
+ log.Println("Error retrieving services list:", err)
+ time.Sleep(3*time.Second)
+ previousRoots = ""
+ } else if len(kc.LocalRoots()) == 0 {
+ log.Println("Received empty services list")
+ time.Sleep(3*time.Second)
+ previousRoots = ""
} else {
- // There was an error, or the list is empty, so wait 3 seconds and try again.
- if err != nil {
- log.Printf("Error retrieving server list: %v", err)
- } else {
- log.Printf("Retrieved an empty server list")
+ newRoots := fmt.Sprint("Locals ", kc.LocalRoots(), ", gateways ", kc.GatewayRoots())
+ if newRoots != previousRoots {
+ log.Println("Updated services list:", newRoots)
+ previousRoots = newRoots
}
- sleeptime = 3 * time.Second
+ time.Sleep(300*time.Second)
}
- time.Sleep(sleeptime)
}
}
@@ -258,14 +256,14 @@ func MakeRESTRouter(
rest := mux.NewRouter()
if enable_get {
- rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
+ rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`,
GetBlockHandler{kc, t}).Methods("GET", "HEAD")
- rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
+ rest.Handle(`/{locator:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
}
if enable_put {
- rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
- rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
+ rest.Handle(`/{locator:[0-9a-f]{32}\+.*}`, PutBlockHandler{kc, t}).Methods("PUT")
+ rest.Handle(`/{locator:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
rest.Handle(`/`, PutBlockHandler{kc, t}).Methods("POST")
rest.Handle(`/{any}`, OptionsHandler{}).Methods("OPTIONS")
rest.Handle(`/`, OptionsHandler{}).Methods("OPTIONS")
@@ -293,22 +291,29 @@ func (this OptionsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
SetCorsHeaders(resp)
}
+var BadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
+var ContentLengthMismatch = errors.New("Actual length != expected content length")
+
func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
- kc := *this.KeepClient
-
- hash := mux.Vars(req)["hash"]
- hints := mux.Vars(req)["hints"]
+ locator := mux.Vars(req)["locator"]
+ var err error
+ var status int
+ var expectLength, responseLength int64
+ var proxiedURI = "-"
- locator := keepclient.MakeLocator2(hash, hints)
+ defer func() {
+ log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, responseLength, proxiedURI, err)
+ }()
- log.Printf("%s: %s %s begin", GetRemoteAddress(req), req.Method, hash)
+ kc := *this.KeepClient
var pass bool
var tok string
if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ err = BadAuthorizationHeader
+ http.Error(resp, err.Error(), http.StatusForbidden)
return
}
@@ -318,42 +323,36 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
kc.Arvados = &arvclient
var reader io.ReadCloser
- var err error
- var blocklen int64
- if req.Method == "GET" {
- reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
+ switch req.Method {
+ case "HEAD":
+ expectLength, proxiedURI, err = kc.Ask(locator)
+ case "GET":
+ reader, expectLength, proxiedURI, err = kc.Get(locator)
if reader != nil {
defer reader.Close()
}
- } else if req.Method == "HEAD" {
- blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
+ default:
+ err = errors.New("Unsupported method")
+ return
}
- if blocklen == -1 {
- log.Printf("%s: %s %s Keep server did not return Content-Length",
- GetRemoteAddress(req), req.Method, hash)
+ if expectLength == -1 {
+ log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
- var status = 0
switch err {
case nil:
status = http.StatusOK
- resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
- if reader != nil {
- n, err2 := io.Copy(resp, reader)
- if blocklen > -1 && n != blocklen {
- log.Printf("%s: %s %s %v %v mismatched copy size expected Content-Length: %v",
- GetRemoteAddress(req), req.Method, hash, status, n, blocklen)
- } else if err2 == nil {
- log.Printf("%s: %s %s %v %v",
- GetRemoteAddress(req), req.Method, hash, status, n)
- } else {
- log.Printf("%s: %s %s %v %v copy error: %v",
- GetRemoteAddress(req), req.Method, hash, status, n, err2.Error())
+ resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
+ switch req.Method {
+ case "HEAD":
+ responseLength = 0
+ case "GET":
+ responseLength, err = io.Copy(resp, reader)
+ if err == nil && expectLength > -1 && responseLength != expectLength {
+ err = ContentLengthMismatch
}
- } else {
- log.Printf("%s: %s %s %v 0", GetRemoteAddress(req), req.Method, hash, status)
}
case keepclient.BlockNotFound:
status = http.StatusNotFound
@@ -362,48 +361,56 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
status = http.StatusBadGateway
http.Error(resp, err.Error(), http.StatusBadGateway)
}
-
- if err != nil {
- log.Printf("%s: %s %s %v error: %v",
- GetRemoteAddress(req), req.Method, hash, status, err.Error())
- }
}
+var LengthRequiredError = errors.New(http.StatusText(http.StatusLengthRequired))
+var LengthMismatchError = errors.New("Locator size hint does not match Content-Length header")
+
func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
SetCorsHeaders(resp)
- kc := *this.KeepClient
-
- hash := mux.Vars(req)["hash"]
- hints := mux.Vars(req)["hints"]
+ var err error
+ var expectLength int64 = -1
+ var status = http.StatusInternalServerError
+ var wantReplicas, wroteReplicas int
+ var locatorOut string = "-"
+
+ defer func() {
+ if status != http.StatusOK {
+ http.Error(resp, err.Error(), status)
+ }
+ log.Println(GetRemoteAddress(req), req.Method, req.URL.Path, status, expectLength, wantReplicas, wroteReplicas, locatorOut, err)
+ }()
- locator := keepclient.MakeLocator2(hash, hints)
+ locatorIn := mux.Vars(req)["locator"]
+ kc := *this.KeepClient
+ wantReplicas = kc.Want_replicas
- var contentLength int64 = -1
if req.Header.Get("Content-Length") != "" {
- _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
+ _, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
if err != nil {
- resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
+ resp.Header().Set("Content-Length", fmt.Sprintf("%d", expectLength))
}
}
- log.Printf("%s: %s %s Content-Length %v", GetRemoteAddress(req), req.Method, hash, contentLength)
-
- if contentLength < 0 {
- http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
+ if expectLength < 0 {
+ err = LengthRequiredError
+ status = http.StatusLengthRequired
return
}
- if locator.Size > 0 && int64(locator.Size) != contentLength {
- http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
+ if loc := keepclient.MakeLocator(locatorIn); loc.Size > 0 && int64(loc.Size) != expectLength {
+ err = LengthMismatchError
+ status = http.StatusBadRequest
return
}
var pass bool
var tok string
if pass, tok = CheckAuthorizationHeader(kc, this.ApiTokenCache, req); !pass {
- http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+ err = BadAuthorizationHeader
+ status = http.StatusForbidden
return
}
@@ -414,65 +421,54 @@ func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
// Check if the client specified the number of replicas
if req.Header.Get("X-Keep-Desired-Replicas") != "" {
- var r int
- _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &r)
+ _, err := fmt.Sscanf(req.Header.Get(keepclient.X_Keep_Desired_Replicas), "%d", &wantReplicas)
if err != nil {
- kc.Want_replicas = r
+ defer func(w int) {
+ // Restore kc's previous Want_replicas
+ // back to its default for next time.
+ kc.Want_replicas = w
+ }(kc.Want_replicas)
+ kc.Want_replicas = wantReplicas
}
}
// Now try to put the block through
- var replicas int
- var put_err error
- if hash == "" {
+ if locatorIn == "" {
if bytes, err := ioutil.ReadAll(req.Body); err != nil {
- msg := fmt.Sprintf("Error reading request body: %s", err)
- log.Printf(msg)
- http.Error(resp, msg, http.StatusInternalServerError)
+ err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
+ status = http.StatusInternalServerError
return
} else {
- hash, replicas, put_err = kc.PutB(bytes)
+ locatorOut, wroteReplicas, err = kc.PutB(bytes)
}
} else {
- hash, replicas, put_err = kc.PutHR(hash, req.Body, contentLength)
+ locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
}
// Tell the client how many successful PUTs we accomplished
- resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
+ resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", wroteReplicas))
- switch put_err {
+ switch err {
case nil:
- // Default will return http.StatusOK
- log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
- n, err2 := io.WriteString(resp, hash)
- if err2 != nil {
- log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
- }
+ status = http.StatusOK
+ _, err = io.WriteString(resp, locatorOut)
case keepclient.OversizeBlockError:
// Too much data
- http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
+ status = http.StatusRequestEntityTooLarge
case keepclient.InsufficientReplicasError:
- if replicas > 0 {
+ if wroteReplicas > 0 {
// At least one write is considered success. The
// client can decide if getting less than the number of
// replications it asked for is a fatal error.
- // Default will return http.StatusOK
- n, err2 := io.WriteString(resp, hash)
- if err2 != nil {
- log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
- }
+ status = http.StatusOK
+ _, err = io.WriteString(resp, locatorOut)
} else {
- http.Error(resp, put_err.Error(), http.StatusServiceUnavailable)
+ status = http.StatusServiceUnavailable
}
default:
- http.Error(resp, put_err.Error(), http.StatusBadGateway)
- }
-
- if put_err != nil {
- log.Printf("%s: %s %s stored %v replicas (desired %v) got error %v", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas, put_err.Error())
+ status = http.StatusBadGateway
}
-
}
diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go
index e3b4e36..5f6e2b9 100644
--- a/services/keepproxy/keepproxy_test.go
+++ b/services/keepproxy/keepproxy_test.go
@@ -117,10 +117,10 @@ func runProxy(c *C, args []string, port int, bogusClientToken bool) keepclient.K
}
kc.SetServiceRoots(map[string]string{
"proxy": fmt.Sprintf("http://localhost:%v", port),
- })
+ }, nil)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.ServiceRoots()), Equals, 1)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 1)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
}
log.Print("keepclient created")
@@ -154,8 +154,8 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
c.Assert(err, Equals, nil)
c.Check(kc.Arvados.External, Equals, true)
c.Check(kc.Using_proxy, Equals, true)
- c.Check(len(kc.ServiceRoots()), Equals, 1)
- for _, root := range kc.ServiceRoots() {
+ c.Check(len(kc.LocalRoots()), Equals, 1)
+ for _, root := range kc.LocalRoots() {
c.Check(root, Equals, "http://localhost:29950")
}
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index a363bac..c1371dc 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -276,7 +276,7 @@ func main() {
}
// Initialize Pull queue and worker
- keepClient := keepclient.KeepClient{
+ keepClient := &keepclient.KeepClient{
Arvados: nil,
Want_replicas: 1,
Using_proxy: true,
diff --git a/services/keepstore/pull_worker.go b/services/keepstore/pull_worker.go
index fac4bb1..d85458a 100644
--- a/services/keepstore/pull_worker.go
+++ b/services/keepstore/pull_worker.go
@@ -19,7 +19,7 @@ import (
Skip the rest of the servers if no errors
Repeat
*/
-func RunPullWorker(pullq *WorkQueue, keepClient keepclient.KeepClient) {
+func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
nextItem := pullq.NextItem
for item := range nextItem {
pullRequest := item.(PullRequest)
@@ -39,14 +39,14 @@ func RunPullWorker(pullq *WorkQueue, keepClient keepclient.KeepClient) {
Using this token & signature, retrieve the given block.
Write to storage
*/
-func PullItemAndProcess(pullRequest PullRequest, token string, keepClient keepclient.KeepClient) (err error) {
+func PullItemAndProcess(pullRequest PullRequest, token string, keepClient *keepclient.KeepClient) (err error) {
keepClient.Arvados.ApiToken = token
service_roots := make(map[string]string)
for _, addr := range pullRequest.Servers {
service_roots[addr] = addr
}
- keepClient.SetServiceRoots(service_roots)
+ keepClient.SetServiceRoots(service_roots, nil)
// Generate signature with a random token
expires_at := time.Now().Add(60 * time.Second)
@@ -75,7 +75,7 @@ func PullItemAndProcess(pullRequest PullRequest, token string, keepClient keepcl
}
// Fetch the content for the given locator using keepclient.
-var GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
reader, blocklen, url, err := keepClient.Get(signedLocator)
return reader, blocklen, url, err
diff --git a/services/keepstore/pull_worker_integration_test.go b/services/keepstore/pull_worker_integration_test.go
index b293cf9..7a930d5 100644
--- a/services/keepstore/pull_worker_integration_test.go
+++ b/services/keepstore/pull_worker_integration_test.go
@@ -10,7 +10,7 @@ import (
"testing"
)
-var keepClient keepclient.KeepClient
+var keepClient *keepclient.KeepClient
type PullWorkIntegrationTestData struct {
Name string
@@ -33,7 +33,7 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
}
// keep client
- keepClient = keepclient.KeepClient{
+ keepClient = &keepclient.KeepClient{
Arvados: &arv,
Want_replicas: 1,
Using_proxy: true,
@@ -42,17 +42,15 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
// discover keep services
var servers []string
- service_roots, err := keepClient.DiscoverKeepServers()
- if err != nil {
+ if err := keepClient.DiscoverKeepServers(); err != nil {
t.Error("Error discovering keep services")
}
- for _, host := range service_roots {
+ for _, host := range keepClient.LocalRoots() {
servers = append(servers, host)
}
// Put content if the test needs it
if wantData {
- keepClient.SetServiceRoots(service_roots)
locator, _, err := keepClient.PutB([]byte(testData.Content))
if err != nil {
t.Errorf("Error putting test data in setup for %s %s %v", testData.Content, locator, err)
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index f0e9e65..124c9b8 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -244,7 +244,7 @@ func performTest(testData PullWorkerTestData, c *C) {
testPullLists[testData.name] = testData.response_body
// Override GetContent to mock keepclient Get functionality
- GetContent = func(signedLocator string, keepClient keepclient.KeepClient) (
+ GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
reader io.ReadCloser, contentLength int64, url string, err error) {
processedPullLists[testData.name] = testData.response_body
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list