[ARVADOS] updated: 16413f6ebe1bba3a070cdff3d7436ad508db8514
git at public.curoverse.com
git at public.curoverse.com
Tue May 27 16:50:12 EDT 2014
Summary of changes:
sdk/go/src/arvados.org/keepclient/keepclient.go | 66 +++++++++++++++---
.../src/arvados.org/keepclient/keepclient_test.go | 17 +++--
sdk/go/src/arvados.org/keepclient/support.go | 26 +++++--
sdk/python/arvados/api.py | 5 +-
sdk/python/arvados/errors.py | 2 +
sdk/python/arvados/keep.py | 74 +++++++++++++++-----
sdk/python/run_test_server.py | 47 +++++++++++--
sdk/python/test_keep_client.py | 81 ++++++++++++++++++++++
.../keep/src/arvados.org/keepproxy/keepproxy.go | 66 ++++++++++++++----
.../src/arvados.org/keepproxy/keepproxy_test.go | 19 ++---
services/keep/src/keep/keep.go | 22 ++++++
11 files changed, 363 insertions(+), 62 deletions(-)
via 16413f6ebe1bba3a070cdff3d7436ad508db8514 (commit)
via 4d50cea3609918775bf2a07981f1ef719067502b (commit)
via fd7ac9bf21002cc8a3cdb9a5e16c588ff734dfab (commit)
via c61379a97dd46273da01792a83f4cae9736d6adf (commit)
via 86680eab88b02d7f8bfe2f76308668957c825c36 (commit)
via c0c0d769cd812e87efa392649380dc5ba8a25cd4 (commit)
via ed101805b04c70bcbf1b070dceba2aee72170377 (commit)
via 27ffca811e6f43225fc82d582d1962eebbd1ab6e (commit)
via b646cec74484bf07a54f4be2de712f50dc387aa0 (commit)
via 0d35501f448a8e7ca44152429d3d38edaa9bb30c (commit)
via f3dd3608f2c32fd4dcd9551bfa1b871d816954d5 (commit)
via 12f8c88325daf4c6af8cbf091ea64cc5d64566c0 (commit)
via 575457ac8645c61ca71e94ce074291ec002b4c24 (commit)
via be92cf19aa7b61576ddcee0fd360103ca43ef5ea (commit)
via 86dffe03ec74387d14da9ceb17934bbdab1239b9 (commit)
via fff404a0e2ebe2582f616526c486ad0dbecce3c8 (commit)
via 66f1be77149feb033e3adc17c37a09a391eec6e9 (commit)
via 77504f6d369bd7bd323748d5347d6d99ed9c75ca (commit)
via f8743ac8fa1ed9dc8c7c4f7a23803ffe8721cfa6 (commit)
via 77d3ed33f525bd6859580954c6d7b9b36b394b98 (commit)
via e5b58426fb9064da76b207cd6d8aaaff05331d64 (commit)
via b0d3b048f6f16dd80e119623e0d4cb558a6537f7 (commit)
via 7d2ce5fa717f587963f64e466a9c3dd33faef27b (commit)
via 548f183c23606b9febcadbbfd658aa921c3baaf5 (commit)
from 7199f034c15930a3a733e526e12fa93df7f9d4fb (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 16413f6ebe1bba3a070cdff3d7436ad508db8514
Merge: 4d50cea 7199f03
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 27 16:50:09 2014 -0400
Merge branch 'master' of git.curoverse.com:arvados #2751
commit 4d50cea3609918775bf2a07981f1ef719067502b
Merge: fd7ac9b 7ca1a38
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 27 15:50:04 2014 -0400
Merge branch 'master' of git.curoverse.com:arvados refs #2751
commit fd7ac9bf21002cc8a3cdb9a5e16c588ff734dfab
Merge: 037ec42 c61379a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue May 27 15:49:37 2014 -0400
Merge branch '2751-python-sdk-keep-proxy-support' closes #2751
diff --cc sdk/go/src/arvados.org/keepclient/keepclient.go
index 8d26b32,8d26b32..ee91d6f
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@@ -9,9 -9,9 +9,12 @@@ import
"fmt"
"io"
"io/ioutil"
++ "log"
"net/http"
"os"
++ "regexp"
"sort"
++ "strings"
"sync"
"sync/atomic"
"unsafe"
@@@ -77,13 -77,13 +80,13 @@@ func MakeKeepClient() (kc KeepClient, e
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
// whenever 0 <= replicas < this.Wants_replicas.
--func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
++func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
// Buffer for reads from 'r'
var bufsize int
if expectedLength > 0 {
if expectedLength > BLOCKSIZE {
-- return 0, OversizeBlockError
++ return "", 0, OversizeBlockError
}
bufsize = int(expectedLength)
} else {
@@@ -100,7 -100,7 +103,7 @@@
// replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
--func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
++func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
t := streamer.AsyncStreamFromSlice(buf)
defer t.Close()
@@@ -111,19 -111,19 +114,18 @@@
// of replicas is given in KeepClient.Want_replicas. Returns the number of
// replicas that were written and if there was an error. Note this will return
// InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
--func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
-- hash = fmt.Sprintf("%x", md5.Sum(buffer))
-- replicas, err = this.PutHB(hash, buffer)
-- return hash, replicas, err
++func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
++ hash := fmt.Sprintf("%x", md5.Sum(buffer))
++ return this.PutHB(hash, buffer)
}
// Put a block, given a Reader. This will read the entire reader into a buffer
--// to computed the hash. The desired number of replicas is given in
++// to compute the hash. The desired number of replicas is given in
// KeepClient.Want_replicas. Returns the number of replicas that were written
// and if there was an error. Note this will return InsufficientReplias
// whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block
// hash and data size are available, PutHR() is more efficient.
--func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
++func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
if buffer, err := ioutil.ReadAll(r); err != nil {
return "", 0, err
} else {
@@@ -243,3 -243,3 +245,49 @@@ func (this *KeepClient) SetServiceRoots
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
unsafe.Pointer(&roots))
}
++
++type Locator struct {
++ Hash string
++ Size int
++ Signature string
++ Timestamp string
++}
++
++func MakeLocator2(hash string, hints string) (locator Locator) {
++ locator.Hash = hash
++ if hints != "" {
++ signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
++ for _, hint := range strings.Split(hints, "+") {
++ if hint != "" {
++ if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
++ fmt.Sscanf(hint, "%d", &locator.Size)
++ } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
++ locator.Signature = m[1]
++ locator.Timestamp = m[2]
++ } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
++ // Any unknown hint that starts with an uppercase letter is
++ // presumed to be valid and ignored, to permit forward compatibility.
++ } else {
++ // Unknown format; not a valid locator.
++ return Locator{"", 0, "", ""}
++ }
++ }
++ }
++ }
++ return locator
++}
++
++func MakeLocator(path string) Locator {
++ pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
++ if err != nil {
++ log.Print("Don't like regexp", err)
++ }
++
++ sm := pathpattern.FindStringSubmatch(path)
++ if sm == nil {
++ log.Print("Failed match ", path)
++ return Locator{"", 0, "", ""}
++ }
++
++ return MakeLocator2(sm[1], sm[2])
++}
diff --cc sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 291d8f8,291d8f8..8eedadd
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@@ -163,7 -163,7 +163,7 @@@ func (s *StandaloneSuite) TestUploadToS
<-st.handled
status := <-upload_status
-- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
++ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
})
log.Printf("TestUploadToStubKeepServer done")
@@@ -196,7 -196,7 +196,7 @@@ func (s *StandaloneSuite) TestUploadToS
<-st.handled
status := <-upload_status
-- c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1})
++ c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
})
log.Printf("TestUploadToStubKeepServerBufferReader done")
@@@ -386,7 -386,7 +386,7 @@@ func (s *StandaloneSuite) TestPutWithFa
<-fh.handled
c.Check(err, Equals, nil)
-- c.Check(phash, Equals, hash)
++ c.Check(phash, Equals, "")
c.Check(replicas, Equals, 2)
c.Check(<-st.handled, Equals, shuff[1])
c.Check(<-st.handled, Equals, shuff[2])
@@@ -598,7 -598,7 +598,7 @@@ func (s *ServerRequiredSuite) TestPutGe
}
{
hash2, replicas, err := kc.PutB([]byte("foo"))
-- c.Check(hash2, Equals, hash)
++ c.Check(hash2, Equals, fmt.Sprintf("%s+%v", hash, 3))
c.Check(replicas, Equals, 2)
c.Check(err, Equals, nil)
}
@@@ -687,3 -687,3 +687,12 @@@ func (s *StandaloneSuite) TestPutProxyI
log.Printf("TestPutProxy done")
}
++
++func (s *StandaloneSuite) TestMakeLocator(c *C) {
++ l := MakeLocator("91f372a266fe2bf2823cb8ec7fda31ce+3+Aabcde at 12345678")
++
++ c.Check(l.Hash, Equals, "91f372a266fe2bf2823cb8ec7fda31ce")
++ c.Check(l.Size, Equals, 3)
++ c.Check(l.Signature, Equals, "abcde")
++ c.Check(l.Timestamp, Equals, "12345678")
++}
diff --cc sdk/go/src/arvados.org/keepclient/support.go
index 913f7c7,913f7c7..b1d59f0
--- a/sdk/go/src/arvados.org/keepclient/support.go
+++ b/sdk/go/src/arvados.org/keepclient/support.go
@@@ -7,10 -7,10 +7,12 @@@ import
"errors"
"fmt"
"io"
++ "io/ioutil"
"log"
"net/http"
"os"
"strconv"
++ "strings"
)
type keepDisk struct {
@@@ -154,6 -154,6 +156,7 @@@ type uploadStatus struct
url string
statusCode int
replicas_stored int
++ response string
}
func (this KeepClient) uploadToKeepServer(host string, hash string, body io.ReadCloser,
@@@ -165,7 -165,7 +168,7 @@@
var err error
var url = fmt.Sprintf("%s/%s", host, hash)
if req, err = http.NewRequest("PUT", url, nil); err != nil {
-- upload_status <- uploadStatus{err, url, 0, 0}
++ upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
}
@@@ -185,7 -185,7 +188,7 @@@
var resp *http.Response
if resp, err = this.Client.Do(req); err != nil {
-- upload_status <- uploadStatus{err, url, 0, 0}
++ upload_status <- uploadStatus{err, url, 0, 0, ""}
body.Close()
return
}
@@@ -195,17 -195,17 +198,25 @@@
fmt.Sscanf(xr, "%d", &rep)
}
++ respbody, err2 := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
++ if err2 != nil && err2 != io.EOF {
++ upload_status <- uploadStatus{err2, url, resp.StatusCode, rep, string(respbody)}
++ return
++ }
++
++ locator := strings.TrimSpace(string(respbody))
++
if resp.StatusCode == http.StatusOK {
-- upload_status <- uploadStatus{nil, url, resp.StatusCode, rep}
++ upload_status <- uploadStatus{nil, url, resp.StatusCode, rep, locator}
} else {
-- upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep}
++ upload_status <- uploadStatus{errors.New(resp.Status), url, resp.StatusCode, rep, locator}
}
}
func (this KeepClient) putReplicas(
hash string,
tr *streamer.AsyncStream,
-- expectedLength int64) (replicas int, err error) {
++ expectedLength int64) (locator string, replicas int, err error) {
// Calculate the ordering for uploading to servers
sv := this.shuffledServiceRoots(hash)
@@@ -233,7 -233,7 +244,7 @@@
active += 1
} else {
if active == 0 {
-- return (this.Want_replicas - remaining_replicas), InsufficientReplicasError
++ return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
} else {
break
}
@@@ -245,6 -245,6 +256,7 @@@
if status.statusCode == 200 {
// good news!
remaining_replicas -= status.replicas_stored
++ locator = status.response
} else {
// writing to keep server failed for some reason
log.Printf("Keep server put to %v failed with '%v'",
@@@ -254,5 -254,5 +266,5 @@@
log.Printf("Upload to %v status code: %v remaining replicas: %v active: %v", status.url, status.statusCode, remaining_replicas, active)
}
-- return this.Want_replicas, nil
++ return locator, this.Want_replicas, nil
}
diff --cc sdk/python/arvados/keep.py
index 686b940,f5014a4..a93c602
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@@ -74,23 -73,12 +74,23 @@@ class KeepClient(object)
with self._done_lock:
return (self._done < self._todo)
- def save_response(self, response_body):
- def increment_done(self):
++ def save_response(self, response_body, replicas_stored):
"""
- Report that the current thread was successful.
+ Records a response body (a locator, possibly signed) returned by
+ the Keep server. It is not necessary to save more than
+ one response, since we presume that any locator returned
+ in response to a successful request is valid.
"""
with self._done_lock:
-- self._done += 1
++ self._done += replicas_stored
+ self._response = response_body
+
+ def response(self):
+ """
+ Returns the body from the response to a PUT request.
+ """
+ with self._done_lock:
+ return self._response
def done(self):
"""
@@@ -141,7 -136,19 +148,18 @@@
(str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root']))
- return limiter.save_response(content.strip())
-
++ replicas_stored = 1
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
- replicas = int(resp['x-keep-replicas-stored'])
- while replicas > 0:
- limiter.increment_done()
- replicas -= 1
- else:
- limiter.increment_done()
- return
++ try:
++ replicas_stored = int(resp['x-keep-replicas-stored'])
++ except ValueError:
++ pass
++ return limiter.save_response(content.strip(), replicas_stored)
++
logging.warning("Request fail: PUT %s => %s %s" %
(url, resp['status'], content))
except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
@@@ -159,17 -167,37 +178,38 @@@
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- try:
- keep_disks = arvados.api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+
+ # Override normal keep disk lookup with an explict proxy
+ # configuration.
+ keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+ if keep_proxy_env != None and len(keep_proxy_env) > 0:
++
+ if keep_proxy_env[-1:] != '/':
+ keep_proxy_env += "/"
+ self.service_roots = [keep_proxy_env]
+ self.using_proxy = True
+ else:
+ try:
+ try:
+ keep_services = arvados.api().keep_services().accessible().execute()['items']
+ except Exception:
+ keep_services = arvados.api().keep_disks().list().execute()['items']
+
+ if len(keep_services) == 0:
+ raise arvados.errors.NoKeepServersError()
+
+ if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+ self.using_proxy = True
+
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ logging.debug(str(self.service_roots))
+ finally:
+ self.lock.release()
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
@@@ -358,9 -390,8 +400,9 @@@
for t in threads:
t.join()
have_copies = thread_limiter.done()
+ # If we're done, return the response from Keep
- if have_copies == want_copies:
+ if have_copies >= want_copies:
- return (data_hash + '+' + str(len(data)))
+ return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
diff --cc sdk/python/run_test_server.py
index 4b65d72,7bf432a..bdfdea9
--- a/sdk/python/run_test_server.py
+++ b/sdk/python/run_test_server.py
@@@ -123,18 -123,9 +123,20 @@@ def stop()
os.chdir(cwd)
-def _start_keep(n):
+def _start_keep(n, keep_args):
keep0 = tempfile.mkdtemp()
- kp0 = subprocess.Popen(["bin/keep", "-volumes={}".format(keep0), "-listen=:{}".format(25107+n), "-pid={}".format("tmp/keep{}.pid".format(n))])
+ keep_cmd = ["bin/keep",
+ "-volumes={}".format(keep0),
- "-listen=:{}".format(25107+n)]
++ "-listen=:{}".format(25107+n),
++ "-pid={}".format("tmp/keep{}.pid".format(n))]
+
+ for arg, val in keep_args.iteritems():
+ keep_cmd.append("{}={}".format(arg, val))
+
+ kp0 = subprocess.Popen(keep_cmd)
+ with open("tmp/keep{}.pid".format(n), 'w') as f:
+ f.write(str(kp0.pid))
++
with open("tmp/keep{}.volume".format(n), 'w') as f:
f.write(keep0)
@@@ -153,18 -144,9 +155,17 @@@ def run_keep(blob_signing_key=None, enf
if not os.path.exists("tmp"):
os.mkdir("tmp")
- _start_keep(0)
- _start_keep(1)
+ keep_args = {}
+ if blob_signing_key:
+ with open("tmp/keep.blob_signing_key", "w") as f:
+ f.write(blob_signing_key)
+ keep_args['--permission-key-file'] = 'tmp/keep.blob_signing_key'
+ if enforce_permissions:
+ keep_args['--enforce-permissions'] = 'true'
+
+ _start_keep(0, keep_args)
+ _start_keep(1, keep_args)
-
os.environ["ARVADOS_API_HOST"] = "127.0.0.1:3001"
os.environ["ARVADOS_API_HOST_INSECURE"] = "true"
@@@ -187,8 -169,7 +188,9 @@@ def _stop_keep(n)
if os.path.exists("tmp/keep{}.volume".format(n)):
with open("tmp/keep{}.volume".format(n), 'r') as r:
shutil.rmtree(r.read(), True)
+ os.unlink("tmp/keep{}.volume".format(n))
+ if os.path.exists("tmp/keep.blob_signing_key"):
+ os.remove("tmp/keep.blob_signing_key")
def stop_keep():
cwd = os.getcwd()
diff --cc sdk/python/test_keep_client.py
index f863ad3,29600e2..6d0470a
--- a/sdk/python/test_keep_client.py
+++ b/sdk/python/test_keep_client.py
@@@ -66,123 -77,67 +77,193 @@@ class KeepTestCase(unittest.TestCase)
blob_str,
'wrong content from Keep.get(md5(<binarydata>))')
+class KeepPermissionTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ try:
+ del os.environ['KEEP_LOCAL_STORE']
+ except KeyError:
+ pass
++
+ run_test_server.run()
+ run_test_server.run_keep(blob_signing_key='abcdefghijk0123456789',
+ enforce_permissions=True)
+
+ @classmethod
+ def tearDownClass(cls):
+ run_test_server.stop()
+ run_test_server.stop_keep()
+
+ def test_KeepBasicRWTest(self):
+ run_test_server.authorize_with('active')
+ foo_locator = arvados.Keep.put('foo')
+ self.assertRegexpMatches(
+ foo_locator,
+ r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("foo"): ' + foo_locator)
+ self.assertEqual(arvados.Keep.get(foo_locator),
+ 'foo',
+ 'wrong content from Keep.get(md5("foo"))')
+
+ # With Keep permissions enabled, a GET request without a signature will fail.
+ bar_locator = arvados.Keep.put('bar')
+ self.assertRegexpMatches(
+ bar_locator,
+ r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("bar"): ' + bar_locator)
+ self.assertRaises(arvados.errors.NotFoundError,
+ arvados.Keep.get,
+ "37b51d194a7513e45b56f6524f2d51f2")
+
+ # A request without an API token will also fail.
+ del arvados.config.settings()["ARVADOS_API_TOKEN"]
+ self.assertRaises(arvados.errors.NotFoundError,
+ arvados.Keep.get,
+ bar_locator)
+
+# KeepOptionalPermission: starts Keep with --permission-key-file
+# but not --enforce-permissions (i.e. generate signatures on PUT
+# requests, but do not require them for GET requests)
+#
+# All of these requests should succeed when permissions are optional:
+# * authenticated request, signed locator
+# * authenticated request, unsigned locator
+# * unauthenticated request, signed locator
+# * unauthenticated request, unsigned locator
+
+class KeepOptionalPermission(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ try:
+ del os.environ['KEEP_LOCAL_STORE']
+ except KeyError:
+ pass
+ run_test_server.run()
+ run_test_server.run_keep(blob_signing_key='abcdefghijk0123456789',
+ enforce_permissions=False)
+
+ @classmethod
+ def tearDownClass(cls):
+ run_test_server.stop()
+ run_test_server.stop_keep()
+
+ def test_KeepAuthenticatedSignedTest(self):
+ run_test_server.authorize_with('active')
+ signed_locator = arvados.Keep.put('foo')
+ self.assertRegexpMatches(
+ signed_locator,
+ r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("foo"): ' + signed_locator)
+ self.assertEqual(arvados.Keep.get(signed_locator),
+ 'foo',
+ 'wrong content from Keep.get(md5("foo"))')
+
+ def test_KeepAuthenticatedUnsignedTest(self):
+ run_test_server.authorize_with('active')
+ signed_locator = arvados.Keep.put('foo')
+ self.assertRegexpMatches(
+ signed_locator,
+ r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("foo"): ' + signed_locator)
+ self.assertEqual(arvados.Keep.get("acbd18db4cc2f85cedef654fccc4a4d8"),
+ 'foo',
+ 'wrong content from Keep.get(md5("foo"))')
+
+ def test_KeepUnauthenticatedSignedTest(self):
+ # Since --enforce-permissions is not in effect, GET requests
+ # need not be authenticated.
+ run_test_server.authorize_with('active')
+ signed_locator = arvados.Keep.put('foo')
+ self.assertRegexpMatches(
+ signed_locator,
+ r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("foo"): ' + signed_locator)
+
+ del arvados.config.settings()["ARVADOS_API_TOKEN"]
+ self.assertEqual(arvados.Keep.get(signed_locator),
+ 'foo',
+ 'wrong content from Keep.get(md5("foo"))')
+
+ def test_KeepUnauthenticatedUnsignedTest(self):
+ # Since --enforce-permissions is not in effect, GET requests
+ # need not be authenticated.
+ run_test_server.authorize_with('active')
+ signed_locator = arvados.Keep.put('foo')
+ self.assertRegexpMatches(
+ signed_locator,
+ r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
+ 'invalid locator from Keep.put("foo"): ' + signed_locator)
+
+ del arvados.config.settings()["ARVADOS_API_TOKEN"]
+ self.assertEqual(arvados.Keep.get("acbd18db4cc2f85cedef654fccc4a4d8"),
+ 'foo',
+ 'wrong content from Keep.get(md5("foo"))')
++
+
+ class KeepProxyTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ super(KeepProxyTestCase, cls).setUpClass()
++
+ try:
+ del os.environ['KEEP_LOCAL_STORE']
+ except KeyError:
+ pass
+
+ os.environ["ARVADOS_KEEP_PROXY"] = ""
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
+
+ run_test_server.run()
+ run_test_server.run_keep()
+ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+ run_test_server.run_keep_proxy("admin")
- cls.arvados_keep_proxy = os.environ["ARVADOS_KEEP_PROXY"]
++ KeepProxyTestCase.arvados_keep_proxy = arvados.config.get("ARVADOS_KEEP_PROXY")
+
+ @classmethod
+ def tearDownClass(cls):
+ super(KeepProxyTestCase, cls).tearDownClass()
+ run_test_server.stop()
+ run_test_server.stop_keep()
+ run_test_server.stop_keep_proxy()
+
+ def test_KeepProxyTest1(self):
+ # Will use ARVADOS_KEEP_PROXY environment variable that is set by
+ # run_keep_proxy() in setUpClass()
++
+ os.environ["ARVADOS_KEEP_PROXY"] = KeepProxyTestCase.arvados_keep_proxy
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = ""
++ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+
+ baz_locator = arvados.Keep.put('baz')
+ self.assertEqual(baz_locator,
+ '73feffa4b7f6bb68e44cf984c85f6e88+3',
+ 'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
+ self.assertEqual(arvados.Keep.get(baz_locator),
+ 'baz',
+ 'wrong content from Keep.get(md5("baz"))')
+
+ self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
+
+ def test_KeepProxyTest2(self):
+ # We don't want to use ARVADOS_KEEP_PROXY from run_keep_proxy() in
+ # setUpClass(), so clear it and set ARVADOS_EXTERNAL_CLIENT which will
+ # contact the API server.
+ os.environ["ARVADOS_KEEP_PROXY"] = ""
+ os.environ["ARVADOS_EXTERNAL_CLIENT"] = "true"
++ arvados.keep.global_client_object = None
+ arvados.config._settings = None
+
+ # Will send X-External-Client to server and get back the proxy from
+ # keep_services/accessible
+
+ baz_locator = arvados.Keep.put('baz2')
+ self.assertEqual(baz_locator,
+ '91f372a266fe2bf2823cb8ec7fda31ce+4',
+ 'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
+ self.assertEqual(arvados.Keep.get(baz_locator),
+ 'baz2',
+ 'wrong content from Keep.get(md5("baz2"))')
+
+ self.assertEqual(True, arvados.Keep.global_client_object().using_proxy)
diff --cc services/keep/src/arvados.org/keepproxy/keepproxy.go
index 38e14fd,ae76c26..414835c
--- a/services/keep/src/arvados.org/keepproxy/keepproxy.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@@ -231,19 -257,19 +259,16 @@@ func MakeRESTRouter
t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
rest := mux.NewRouter()
- gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
- ghsig := rest.Handle(
- `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
- GetBlockHandler{kc, t})
- ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
if enable_get {
- gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
- ghsig := rest.Handle(
- `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
- GetBlockHandler{kc, t})
-
-- gh.Methods("GET", "HEAD")
-- ghsig.Methods("GET", "HEAD")
++ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`,
++ GetBlockHandler{kc, t}).Methods("GET", "HEAD")
++ rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t}).Methods("GET", "HEAD")
}
if enable_put {
- ph.Methods("PUT")
++ rest.Handle(`/{hash:[0-9a-f]{32}}+{hints}`, PutBlockHandler{kc, t}).Methods("PUT")
+ rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t}).Methods("PUT")
}
rest.NotFoundHandler = InvalidPathHandler{}
@@@ -261,8 -287,8 +286,9 @@@ func (this GetBlockHandler) ServeHTTP(r
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
-- signature := mux.Vars(req)["signature"]
-- timestamp := mux.Vars(req)["timestamp"]
++ hints := mux.Vars(req)["hints"]
++
++ locator := keepclient.MakeLocator2(hash, hints)
log.Printf("%s: %s %s", GetRemoteAddress(req), req.Method, hash)
@@@ -276,10 -302,10 +302,10 @@@
var blocklen int64
if req.Method == "GET" {
-- reader, blocklen, _, err = kc.AuthorizedGet(hash, signature, timestamp)
++ reader, blocklen, _, err = kc.AuthorizedGet(hash, locator.Signature, locator.Timestamp)
defer reader.Close()
} else if req.Method == "HEAD" {
-- blocklen, _, err = kc.AuthorizedAsk(hash, signature, timestamp)
++ blocklen, _, err = kc.AuthorizedAsk(hash, locator.Signature, locator.Timestamp)
}
resp.Header().Set("Content-Length", fmt.Sprint(blocklen))
@@@ -314,6 -340,6 +340,9 @@@ func (this PutBlockHandler) ServeHTTP(r
kc := *this.KeepClient
hash := mux.Vars(req)["hash"]
++ hints := mux.Vars(req)["hints"]
++
++ locator := keepclient.MakeLocator2(hash, hints)
var contentLength int64 = -1
if req.Header.Get("Content-Length") != "" {
@@@ -331,6 -357,6 +360,11 @@@
return
}
++ if locator.Size > 0 && int64(locator.Size) != contentLength {
++ http.Error(resp, "Locator size hint does not match Content-Length header", http.StatusBadRequest)
++ return
++ }
++
if !CheckAuthorizationHeader(kc, this.ApiTokenCache, req) {
http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
return
@@@ -346,7 -372,7 +380,7 @@@
}
// Now try to put the block through
-- replicas, err := kc.PutHR(hash, req.Body, contentLength)
++ hash, replicas, err := kc.PutHR(hash, req.Body, contentLength)
// Tell the client how many successful PUTs we accomplished
resp.Header().Set(keepclient.X_Keep_Replicas_Stored, fmt.Sprintf("%d", replicas))
@@@ -355,6 -381,6 +389,10 @@@
case nil:
// Default will return http.StatusOK
log.Printf("%s: %s %s finished, stored %v replicas (desired %v)", GetRemoteAddress(req), req.Method, hash, replicas, kc.Want_replicas)
++ n, err2 := io.WriteString(resp, hash)
++ if err2 != nil {
++ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
++ }
case keepclient.OversizeBlockError:
// Too much data
@@@ -366,6 -392,6 +404,10 @@@
// client can decide if getting less than the number of
// replications it asked for is a fatal error.
// Default will return http.StatusOK
++ n, err2 := io.WriteString(resp, hash)
++ if err2 != nil {
++ log.Printf("%s: wrote %v bytes to response body and got error %v", n, err2.Error())
++ }
} else {
http.Error(resp, "", http.StatusServiceUnavailable)
}
diff --cc services/keep/src/arvados.org/keepproxy/keepproxy_test.go
index cedc61f,f03c94f..9e78223
--- a/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy_test.go
@@@ -136,11 -118,10 +136,12 @@@ func (s *ServerRequiredSuite) TestPutAs
c.Check(kc.ServiceRoots()[0], Equals, "http://localhost:29950")
c.Check(err, Equals, nil)
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
-
log.Print("keepclient created")
+ defer listener.Close()
+
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
++ var hash2 string
{
_, _, err := kc.Ask(hash)
@@@ -149,22 -130,22 +150,24 @@@
}
{
-- hash2, rep, err := kc.PutB([]byte("foo"))
-- c.Check(hash2, Equals, hash)
++ var rep int
++ var err error
++ hash2, rep, err = kc.PutB([]byte("foo"))
++ c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
log.Print("PutB")
}
{
-- blocklen, _, err := kc.Ask(hash)
++ blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
log.Print("Ask 2")
}
{
-- reader, blocklen, _, err := kc.Get(hash)
++ reader, blocklen, _, err := kc.Get(hash2)
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
@@@ -192,8 -180,8 +195,8 @@@ func (s *ServerRequiredSuite) TestPutAs
}
{
- hash2, rep, err := kc.PutB([]byte("foo"))
- c.Check(hash2, Equals, hash)
+ hash2, rep, err := kc.PutB([]byte("bar"))
- c.Check(hash2, Equals, hash)
++ c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, Equals, keepclient.InsufficientReplicasError)
log.Print("PutB")
@@@ -215,61 -203,3 +218,59 @@@
log.Print("TestPutAndGetForbidden done")
}
+
+func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
+ log.Print("TestGetDisabled start")
+
+ kc := runProxy(c, []string{"keepproxy", "-no-get"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29952)
+ defer listener.Close()
+
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
+
+ {
+ _, _, err := kc.Ask(hash)
+ c.Check(err, Equals, keepclient.BlockNotFound)
+ log.Print("Ask 1")
+ }
+
+ {
+ hash2, rep, err := kc.PutB([]byte("baz"))
- c.Check(hash2, Equals, hash)
++ c.Check(hash2, Equals, fmt.Sprintf("%s+3", hash))
+ c.Check(rep, Equals, 2)
+ c.Check(err, Equals, nil)
+ log.Print("PutB")
+ }
+
+ {
+ blocklen, _, err := kc.Ask(hash)
+ c.Assert(err, Equals, keepclient.BlockNotFound)
+ c.Check(blocklen, Equals, int64(0))
+ log.Print("Ask 2")
+ }
+
+ {
+ _, blocklen, _, err := kc.Get(hash)
+ c.Assert(err, Equals, keepclient.BlockNotFound)
+ c.Check(blocklen, Equals, int64(0))
+ log.Print("Get")
+ }
+
+ log.Print("TestGetDisabled done")
+}
+
+func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
+ log.Print("TestPutDisabled start")
+
+ kc := runProxy(c, []string{"keepproxy", "-no-put"}, "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h", 29953)
+ defer listener.Close()
+
- hash := fmt.Sprintf("%x", md5.Sum([]byte("quux")))
-
+ {
+ hash2, rep, err := kc.PutB([]byte("quux"))
- c.Check(hash2, Equals, hash)
++ c.Check(hash2, Equals, "")
+ c.Check(rep, Equals, 0)
+ c.Check(err, Equals, keepclient.InsufficientReplicasError)
+ log.Print("PutB")
+ }
+
+ log.Print("TestPutDisabled done")
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list