[arvados] updated: 2.7.0-5665-gdd317a6040
git repository hosting
git at public.arvados.org
Tue Jan 9 22:01:41 UTC 2024
Summary of changes:
sdk/go/arvados/keep_cache.go | 8 +-
sdk/go/keepclient/gateway_shim.go | 16 +++-
sdk/go/keepclient/keepclient.go | 57 ++++++++++--
sdk/go/keepclient/keepclient_test.go | 169 +++++++++++++++++++----------------
4 files changed, 163 insertions(+), 87 deletions(-)
via dd317a6040504457130e892372e4be035fdd9bd1 (commit)
via e349b0118883a3c3845bd9789f2a4d9fb8feeaf3 (commit)
via b767865b71f8f29f53ee97beb9a30530b87af78d (commit)
from 8e24aa37b7a2c788dd706013a36da6ca975fb981 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit dd317a6040504457130e892372e4be035fdd9bd1
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 9 17:01:05 2024 -0500
20318: Route (*KeepClient)Get() through disk cache.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index 0675ed9877..20994dbd69 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -42,10 +42,14 @@ func (kvh *keepViaHTTP) ReadAt(locator string, dst []byte, offset int) (int, err
}
func (kvh *keepViaHTTP) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
- rdr, _, _, _, err := kvh.getOrHead("GET", opts.Locator, nil)
+ rdr, _, url, _, err := kvh.getOrHead("GET", opts.Locator, nil)
if err != nil {
return 0, err
}
+ if stash, ok := ctx.Value(contextKeyStashBlockReadURL).(*string); ok {
+ // see (*KeepClient)Get()
+ *stash = url
+ }
defer rdr.Close()
n, err := io.Copy(opts.WriteTo, rdr)
return int(n), err
@@ -73,3 +77,7 @@ func (kvh *keepViaHTTP) LocalLocator(locator string) (string, error) {
}
return loc, nil
}
+
+type contextKey int
+
+const contextKeyStashBlockReadURL contextKey = iota
diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go
index b03362ee48..046a4adebd 100644
--- a/sdk/go/keepclient/keepclient.go
+++ b/sdk/go/keepclient/keepclient.go
@@ -7,6 +7,7 @@
package keepclient
import (
+ "bufio"
"bytes"
"context"
"crypto/md5"
@@ -400,16 +401,62 @@ func (kc *KeepClient) LocalLocator(locator string) (string, error) {
return kc.upstreamGateway().LocalLocator(locator)
}
-// Get retrieves a block, given a locator. Returns a reader, the
-// expected data length, the URL the block is being fetched from, and
-// an error.
+// Get retrieves the specified block from the local cache or a backend
+// server. Returns a reader, the expected data length, the URL the
+// block is being fetched from, and an error.
+//
+// The URL return value is empty if the returned data is coming from a
+// local cache.
//
// If the block checksum does not match, the final Read() on the
// reader returned by this method will return a BadChecksum error
// instead of EOF.
+//
+// New code should use BlockRead and/or ReadAt instead of Get.
func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) {
- rdr, size, url, _, err := kc.getOrHead("GET", locator, nil)
- return rdr, size, url, err
+ loc, err := MakeLocator(locator)
+ if err != nil {
+ return nil, 0, "", err
+ }
+ pr, pw := io.Pipe()
+ var url string
+ go func() {
+ n, err := kc.BlockRead(context.WithValue(context.Background(), contextKeyStashBlockReadURL, &url), arvados.BlockReadOptions{
+ Locator: locator,
+ WriteTo: pw,
+ })
+ if err != nil {
+ pw.CloseWithError(err)
+ } else if n != loc.Size {
+ pw.CloseWithError(fmt.Errorf("expected block size %d but read %d bytes", loc.Size, n))
+ } else {
+ pw.Close()
+ }
+ }()
+ // Wait for the first byte to arrive, so that (a) if there's
+ // an error before we receive any data, we can return the
+ // error directly, instead of indirectly via a reader that
+ // returns an error; and (b) if successful, keepViaHTTP has a
+ // chance to populate our url var before we return it.
+ bufr := bufio.NewReader(pr)
+ _, err = bufr.Peek(1)
+ if err != nil && err != io.EOF {
+ pr.CloseWithError(err)
+ return nil, 0, url, err
+ }
+ return struct {
+ io.Reader
+ io.Closer
+ }{
+ Reader: bufr,
+ Closer: pr,
+ }, int64(loc.Size), url, err
+}
+
+// BlockRead retrieves a block from the cache if it's present, otherwise
+// from the network.
+func (kc *KeepClient) BlockRead(ctx context.Context, opts arvados.BlockReadOptions) (int, error) {
+ return kc.upstreamGateway().BlockRead(ctx, opts)
}
// ReadAt retrieves a portion of block from the cache if it's
diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go
index ad5d12b505..ab526d7755 100644
--- a/sdk/go/keepclient/keepclient_test.go
+++ b/sdk/go/keepclient/keepclient_test.go
@@ -64,11 +64,11 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) {
func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
kc, err := MakeKeepClient(arv)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(len(kc.LocalRoots()), Equals, 2)
for _, root := range kc.LocalRoots() {
c.Check(root, Matches, "http://localhost:\\d+")
@@ -129,7 +129,7 @@ func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request
sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
}
body, err := ioutil.ReadAll(req.Body)
- sph.c.Check(err, Equals, nil)
+ sph.c.Check(err, IsNil)
sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
resp.Header().Set("X-Keep-Replicas-Stored", "1")
if sph.returnStorageClasses != "" {
@@ -519,7 +519,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
}
func (s *StandaloneSuite) TestPutHR(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := &StubPutHandler{
c: c,
@@ -570,7 +570,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
}
func (s *StandaloneSuite) TestPutWithFail(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := &StubPutHandler{
c: c,
@@ -618,7 +618,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
<-fh.handled
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(phash, Equals, "")
c.Check(replicas, Equals, 2)
@@ -632,7 +632,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
}
func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := &StubPutHandler{
c: c,
@@ -697,7 +697,7 @@ func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
}
func (s *StandaloneSuite) TestGet(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetHandler{
c,
@@ -716,18 +716,20 @@ func (s *StandaloneSuite) TestGet(c *C) {
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, n, url2, err := kc.Get(hash)
- defer r.Close()
- c.Check(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
- c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+ if url2 != "" {
+ c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
+ }
content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ c.Check(err2, IsNil)
c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
}
func (s *StandaloneSuite) TestGet404(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := Error404Handler{make(chan string, 1)}
@@ -744,7 +746,7 @@ func (s *StandaloneSuite) TestGet404(c *C) {
c.Check(err, Equals, BlockNotFound)
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
- c.Check(r, Equals, nil)
+ c.Check(r, IsNil)
}
func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
@@ -767,10 +769,11 @@ func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
buf, err := ioutil.ReadAll(r)
c.Check(err, IsNil)
c.Check(buf, DeepEquals, []byte{})
+ c.Check(r.Close(), IsNil)
}
func (s *StandaloneSuite) TestGetFail(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+4", md5.Sum([]byte("foo")))
st := FailHandler{make(chan string, 1)}
@@ -786,16 +789,17 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
r, n, url2, err := kc.Get(hash)
errNotFound, _ := err.(*ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
- c.Check(errNotFound.Temporary(), Equals, true)
+ if c.Check(errNotFound, NotNil) {
+ c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ }
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
- c.Check(r, Equals, nil)
+ c.Check(r, IsNil)
}
func (s *StandaloneSuite) TestGetFailRetry(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := &FailThenSucceedHandler{
handled: make(chan string, 1),
@@ -816,14 +820,14 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) {
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, n, url2, err := kc.Get(hash)
- defer r.Close()
- c.Check(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks.url, hash))
- content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ content, err := ioutil.ReadAll(r)
+ c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
c.Logf("%q", st.reqIDs)
c.Assert(len(st.reqIDs) > 1, Equals, true)
@@ -834,7 +838,7 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) {
}
func (s *StandaloneSuite) TestGetNetError(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
arv, err := arvadosclient.MakeArvadosClient()
c.Check(err, IsNil)
@@ -844,17 +848,18 @@ func (s *StandaloneSuite) TestGetNetError(c *C) {
r, n, url2, err := kc.Get(hash)
errNotFound, _ := err.(*ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
- c.Check(errNotFound.Temporary(), Equals, true)
+ if c.Check(errNotFound, NotNil) {
+ c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ }
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
- c.Check(r, Equals, nil)
+ c.Check(r, IsNil)
}
func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
uuid := "zzzzz-bi6l4-123451234512345"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
// This one shouldn't be used:
ks0 := RunFakeKeepServer(StubGetHandler{
@@ -883,21 +888,21 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
map[string]string{uuid: ks.url})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
- defer r.Close()
- c.Check(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
content, err := ioutil.ReadAll(r)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
}
// Use a service hint to fetch from a local disk service, overriding
// rendezvous probe order.
func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
uuid := "zzzzz-bi6l4-zzzzzzzzzzzzzzz"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
// This one shouldn't be used, although it appears first in
// rendezvous probe order:
@@ -936,19 +941,19 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
)
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
- defer r.Close()
- c.Check(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
c.Check(uri, Equals, fmt.Sprintf("%s/%s", ks.url, hash+"+K@"+uuid))
content, err := ioutil.ReadAll(r)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
}
func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
uuid := "zzzzz-bi6l4-123451234512345"
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
ksLocal := RunFakeKeepServer(StubGetHandler{
c,
@@ -975,14 +980,14 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
map[string]string{uuid: ksGateway.url})
r, n, uri, err := kc.Get(hash + "+K@" + uuid)
- c.Assert(err, Equals, nil)
- defer r.Close()
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
c.Check(uri, Equals, fmt.Sprintf("%s/%s", ksLocal.url, hash+"+K@"+uuid))
content, err := ioutil.ReadAll(r)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte("foo"))
+ c.Check(r.Close(), IsNil)
}
type BarHandler struct {
@@ -995,8 +1000,8 @@ func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
}
func (s *StandaloneSuite) TestChecksum(c *C) {
- foohash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- barhash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
+ foohash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
+ barhash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
st := BarHandler{make(chan string, 1)}
@@ -1013,14 +1018,14 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
c.Check(err, IsNil)
_, err = ioutil.ReadAll(r)
c.Check(n, Equals, int64(3))
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
<-st.handled
r, n, _, err = kc.Get(foohash)
- c.Check(err, IsNil)
- _, err = ioutil.ReadAll(r)
- c.Check(n, Equals, int64(3))
+ if err == nil {
+ _, err = ioutil.ReadAll(r)
+ }
c.Check(err, Equals, BadChecksum)
<-st.handled
@@ -1028,7 +1033,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
func (s *StandaloneSuite) TestGetWithFailures(c *C) {
content := []byte("waz")
- hash := fmt.Sprintf("%x", md5.Sum(content))
+ hash := fmt.Sprintf("%x+3", md5.Sum(content))
fh := Error404Handler{
make(chan string, 4)}
@@ -1075,13 +1080,14 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
r, n, url2, err := kc.Get(hash)
<-fh.handled
- c.Check(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(n, Equals, int64(3))
c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
readContent, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ c.Check(err2, IsNil)
c.Check(readContent, DeepEquals, content)
+ c.Check(r.Close(), IsNil)
}
func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
@@ -1090,9 +1096,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
c.Check(err, IsNil)
kc, err := MakeKeepClient(arv)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
- hash := fmt.Sprintf("%x", md5.Sum(content))
+ hash := fmt.Sprintf("%x+%d", md5.Sum(content), len(content))
{
n, _, err := kc.Ask(hash)
@@ -1102,29 +1108,34 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
{
hash2, replicas, err := kc.PutB(content)
c.Check(err, IsNil)
- c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content)))
+ c.Check(hash2, Matches, `\Q`+hash+`\E\b.*`)
c.Check(replicas, Equals, 2)
}
{
r, n, url2, err := kc.Get(hash)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(n, Equals, int64(len(content)))
- c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
+ if url2 != "" {
+ c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
+ }
if c.Check(r, NotNil) {
- readContent, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
- c.Check(readContent, DeepEquals, content)
+ readContent, err := ioutil.ReadAll(r)
+ c.Check(err, IsNil)
+ if c.Check(len(readContent), Equals, len(content)) {
+ c.Check(readContent, DeepEquals, content)
+ }
+ c.Check(r.Close(), IsNil)
}
}
{
n, url2, err := kc.Ask(hash)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(n, Equals, int64(len(content)))
- c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
+ c.Check(url2, Matches, "http://localhost:\\d+/\\Q"+hash+"\\E")
}
{
loc, err := kc.LocalLocator(hash)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Assert(len(loc) >= 32, Equals, true)
c.Check(loc[:32], Equals, hash[:32])
}
@@ -1171,7 +1182,7 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
_, replicas, err := kc.PutB([]byte("foo"))
<-st.handled
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(replicas, Equals, 2)
}
@@ -1205,7 +1216,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
func (s *StandaloneSuite) TestMakeLocator(c *C) {
l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde at 12345678")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
c.Check(l.Size, Equals, 3)
c.Check(l.Hints, DeepEquals, []string{"3", "Aabcde at 12345678"})
@@ -1213,7 +1224,7 @@ func (s *StandaloneSuite) TestMakeLocator(c *C) {
func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
c.Check(l.Size, Equals, -1)
c.Check(l.Hints, DeepEquals, []string{})
@@ -1221,7 +1232,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoHints(c *C) {
func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
l, err := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+Aabcde at 12345678")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
c.Check(l.Size, Equals, -1)
c.Check(l.Hints, DeepEquals, []string{"Aabcde at 12345678"})
@@ -1230,7 +1241,7 @@ func (s *StandaloneSuite) TestMakeLocatorNoSizeHint(c *C) {
func (s *StandaloneSuite) TestMakeLocatorPreservesUnrecognizedHints(c *C) {
str := "91f372a266fe2bf2823cb8ec7fda31ce+3+Unknown+Kzzzzz+Afoobar"
l, err := MakeLocator(str)
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
c.Check(l.Size, Equals, 3)
c.Check(l.Hints, DeepEquals, []string{"3", "Unknown", "Kzzzzz", "Afoobar"})
@@ -1336,7 +1347,7 @@ func (h StubGetIndexHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reque
}
func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
@@ -1359,12 +1370,12 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
c.Check(err, IsNil)
content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ c.Check(err2, IsNil)
c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}
func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
@@ -1383,15 +1394,15 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", hash[0:3])
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ c.Check(err2, IsNil)
c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}
func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
@@ -1414,7 +1425,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
}
func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
- hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("foo")))
st := StubGetIndexHandler{
c,
@@ -1454,10 +1465,10 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
r, err := kc.GetIndex("x", "abcd")
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
content, err2 := ioutil.ReadAll(r)
- c.Check(err2, Equals, nil)
+ c.Check(err2, IsNil)
c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}
@@ -1495,14 +1506,14 @@ func (s *StandaloneSuite) TestPutBRetry(c *C) {
hash, replicas, err := kc.PutB([]byte("foo"))
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(hash, Equals, "")
c.Check(replicas, Equals, 2)
}
func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
// Add an additional "testblobstore" keepservice
blobKeepService := make(arvadosclient.Dict)
@@ -1512,13 +1523,13 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
"service_port": "21321",
"service_type": "testblobstore"}},
&blobKeepService)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
RefreshServiceDiscovery()
// Make a keepclient and ensure that the testblobstore is included
kc, err := MakeKeepClient(arv)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
// verify kc.LocalRoots
c.Check(len(kc.LocalRoots()), Equals, 3)
commit e349b0118883a3c3845bd9789f2a4d9fb8feeaf3
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 9 16:59:21 2024 -0500
20318: Fix bad compare / short read behavior in DiskCache BlockRead.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 5f1535e7ca..6cf2e23d7e 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -547,12 +547,12 @@ func (cache *DiskCache) BlockRead(ctx context.Context, opts BlockReadOptions) (i
if ctx.Err() != nil {
return offset, ctx.Err()
}
- if int(blocksize)-offset > len(buf) {
+ if int(blocksize)-offset < len(buf) {
buf = buf[:int(blocksize)-offset]
}
nr, err := cache.ReadAt(opts.Locator, buf, offset)
if nr > 0 {
- nw, err := opts.WriteTo.Write(buf)
+ nw, err := opts.WriteTo.Write(buf[:nr])
if err != nil {
return offset + nw, err
}
commit b767865b71f8f29f53ee97beb9a30530b87af78d
Author: Tom Clegg <tom at curii.com>
Date: Mon Jan 8 11:37:53 2024 -0500
20318: Add comments about KeepClient/KeepGateway wiring.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/sdk/go/arvados/keep_cache.go b/sdk/go/arvados/keep_cache.go
index 2b578bd1de..5f1535e7ca 100644
--- a/sdk/go/arvados/keep_cache.go
+++ b/sdk/go/arvados/keep_cache.go
@@ -34,6 +34,10 @@ type KeepGateway interface {
}
// DiskCache wraps KeepGateway, adding a disk-based cache layer.
+//
+// A DiskCache is automatically incorporated into the backend stack of
+// each keepclient.KeepClient. Most programs do not need to use
+// DiskCache directly.
type DiskCache struct {
KeepGateway
Dir string
diff --git a/sdk/go/keepclient/gateway_shim.go b/sdk/go/keepclient/gateway_shim.go
index eeb187e107..0675ed9877 100644
--- a/sdk/go/keepclient/gateway_shim.go
+++ b/sdk/go/keepclient/gateway_shim.go
@@ -17,6 +17,12 @@ import (
// keepViaHTTP implements arvados.KeepGateway by using a KeepClient to
// do upstream requests to keepstore and keepproxy.
+//
+// This enables KeepClient to use KeepGateway wrappers (like
+// arvados.DiskCache) to wrap its own HTTP client back-end methods
+// (getOrHead, httpBlockWrite).
+//
+// See (*KeepClient)upstreamGateway() for the relevant glue.
type keepViaHTTP struct {
*KeepClient
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list