[ARVADOS] created: 48ffdd5ac196771381c8dc9ab47cfad5f1929720

git at public.curoverse.com git at public.curoverse.com
Thu May 15 15:15:01 EDT 2014


        at  48ffdd5ac196771381c8dc9ab47cfad5f1929720 (commit)


commit 48ffdd5ac196771381c8dc9ab47cfad5f1929720
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu May 15 15:14:55 2014 -0400

    1885: First commit of most complete keep proxy, no tests yet.  Removed build.sh
    in favor of go.sh that wraps 'go' with the desired GOPATH and 'go get'.  Added
    keepclient.Ask() method which does HTTP HEAD.  Added AuthorizedGet() and
    AuthorizedAsk() which provide signature and timestamp.

diff --git a/sdk/go/build.sh b/sdk/go/build.sh
deleted file mode 100755
index ed95228..0000000
--- a/sdk/go/build.sh
+++ /dev/null
@@ -1,37 +0,0 @@
-#! /bin/sh
-
-# This script builds a Keep executable and installs it in
-# ./bin/keep.
-#
-# In idiomatic Go style, a user would install Keep with something
-# like:
-#
-#     go get arvados.org/keep
-#     go install arvados.org/keep
-#
-# which would download both the Keep source and any third-party
-# packages it depends on.
-#
-# Since the Keep source is bundled within the overall Arvados source,
-# "go get" is not the primary tool for delivering Keep source and this
-# process doesn't work.  Instead, this script sets the environment
-# properly and fetches any necessary dependencies by hand.
-
-if [ -z "$GOPATH" ]
-then
-    GOPATH=$(pwd)
-else
-    GOPATH=$(pwd):${GOPATH}
-fi
-
-export GOPATH
-
-set -o errexit   # fail if any command returns an error
-
-mkdir -p pkg
-mkdir -p bin
-go get gopkg.in/check.v1
-go install arvados.org/keepclient
-if ls -l pkg/*/arvados.org/keepclient.a ; then
-    echo "success!"
-fi
diff --git a/sdk/go/go.sh b/sdk/go/go.sh
new file mode 100755
index 0000000..0203a1c
--- /dev/null
+++ b/sdk/go/go.sh
@@ -0,0 +1,12 @@
+#! /bin/sh
+
+rootdir=$(dirname $0)
+GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
+export GOPATH
+
+mkdir -p pkg
+mkdir -p bin
+
+go get gopkg.in/check.v1
+
+go $*
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient.go b/sdk/go/src/arvados.org/keepclient/keepclient.go
index aeb805b..de5066a 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient.go
@@ -15,13 +15,16 @@ import (
 	"strconv"
 )
 
+// A Keep "block" is 64MB.
+const BLOCKSIZE = 64 * 1024 * 1024
+
 type KeepClient struct {
 	ApiServer     string
 	ApiToken      string
 	ApiInsecure   bool
 	Service_roots []string
 	Want_replicas int
-	client        *http.Client
+	Client        *http.Client
 }
 
 type KeepDisk struct {
@@ -30,20 +33,19 @@ type KeepDisk struct {
 	SSL      bool   `json:"service_ssl_flag"`
 }
 
-func MakeKeepClient() (kc *KeepClient, err error) {
-	kc = &KeepClient{
-		ApiServer:     os.Getenv("ARVADOS_API_HOST"),
-		ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
-		ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
-		Want_replicas: 2}
-
+func MakeKeepClient() (kc KeepClient, err error) {
 	tr := &http.Transport{
 		TLSClientConfig: &tls.Config{InsecureSkipVerify: kc.ApiInsecure},
 	}
 
-	kc.client = &http.Client{Transport: tr}
+	kc = KeepClient{
+		ApiServer:     os.Getenv("ARVADOS_API_HOST"),
+		ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
+		ApiInsecure:   (os.Getenv("ARVADOS_API_HOST_INSECURE") != ""),
+		Want_replicas: 2,
+		Client:        &http.Client{Transport: tr}}
 
-	err = kc.DiscoverKeepDisks()
+	err = (&kc).DiscoverKeepDisks()
 
 	return kc, err
 }
@@ -61,7 +63,7 @@ func (this *KeepClient) DiscoverKeepDisks() error {
 
 	// Make the request
 	var resp *http.Response
-	if resp, err = this.client.Do(req); err != nil {
+	if resp, err = this.Client.Do(req); err != nil {
 		return err
 	}
 
@@ -415,7 +417,7 @@ func (this KeepClient) uploadToKeepServer(host string, hash string, body io.Read
 	req.Body = body
 
 	var resp *http.Response
-	if resp, err = this.client.Do(req); err != nil {
+	if resp, err = this.Client.Do(req); err != nil {
 		upload_status <- UploadStatus{err, url, 0}
 		return
 	}
@@ -496,12 +498,12 @@ func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (re
 	// Buffer for reads from 'r'
 	var buffer []byte
 	if expectedLength > 0 {
-		if expectedLength > 64*1024*1024 {
+		if expectedLength > BLOCKSIZE {
 			return 0, OversizeBlockError
 		}
 		buffer = make([]byte, expectedLength)
 	} else {
-		buffer = make([]byte, 64*1024*1024)
+		buffer = make([]byte, BLOCKSIZE)
 	}
 
 	// Read requests on Transfer() buffer
@@ -547,14 +549,27 @@ var BlockNotFound = errors.New("Block not found")
 
 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 	contentLength int64, url string, err error) {
+	return this.AuthorizedGet(hash, "", "")
+}
 
-	// Calculate the ordering for uploading to servers
+func (this KeepClient) AuthorizedGet(hash string,
+	signature string,
+	timestamp string) (reader io.ReadCloser,
+	contentLength int64, url string, err error) {
+
+	// Calculate the ordering for asking servers
 	sv := this.ShuffledServiceRoots(hash)
 
 	for _, host := range sv {
 		var req *http.Request
 		var err error
-		var url = fmt.Sprintf("%s/%s", host, hash)
+		var url string
+		if signature != "" {
+			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+				signature, timestamp)
+		} else {
+			url = fmt.Sprintf("%s/%s", host, hash)
+		}
 		if req, err = http.NewRequest("GET", url, nil); err != nil {
 			continue
 		}
@@ -562,7 +577,7 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
 
 		var resp *http.Response
-		if resp, err = this.client.Do(req); err != nil {
+		if resp, err = this.Client.Do(req); err != nil {
 			continue
 		}
 
@@ -573,3 +588,42 @@ func (this KeepClient) Get(hash string) (reader io.ReadCloser,
 
 	return nil, 0, "", BlockNotFound
 }
+
+func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
+	return this.AuthorizedAsk(hash, "", "")
+}
+
+func (this KeepClient) AuthorizedAsk(hash string, signature string,
+	timestamp string) (contentLength int64, url string, err error) {
+	// Calculate the ordering for asking servers
+	sv := this.ShuffledServiceRoots(hash)
+
+	for _, host := range sv {
+		var req *http.Request
+		var err error
+		if signature != "" {
+			url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
+				signature, timestamp)
+		} else {
+			url = fmt.Sprintf("%s/%s", host, hash)
+		}
+
+		if req, err = http.NewRequest("HEAD", url, nil); err != nil {
+			continue
+		}
+
+		req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
+
+		var resp *http.Response
+		if resp, err = this.Client.Do(req); err != nil {
+			continue
+		}
+
+		if resp.StatusCode == http.StatusOK {
+			return resp.ContentLength, url, nil
+		}
+	}
+
+	return 0, "", BlockNotFound
+
+}
diff --git a/sdk/go/src/arvados.org/keepclient/keepclient_test.go b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
index 00a2063..756868c 100644
--- a/sdk/go/src/arvados.org/keepclient/keepclient_test.go
+++ b/sdk/go/src/arvados.org/keepclient/keepclient_test.go
@@ -13,6 +13,7 @@ import (
 	"os"
 	"os/exec"
 	"sort"
+	"strings"
 	"testing"
 	"time"
 )
@@ -32,18 +33,23 @@ type ServerRequiredSuite struct{}
 // Standalone tests
 type StandaloneSuite struct{}
 
+func pythonDir() string {
+	gopath := os.Getenv("GOPATH")
+	return fmt.Sprintf("%s/../python", strings.Split(gopath, ":")[0])
+}
+
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
 	if *no_server {
 		c.Skip("Skipping tests that require server")
 	} else {
-		os.Chdir(os.ExpandEnv("$GOPATH../python"))
+		os.Chdir(pythonDir())
 		exec.Command("python", "run_test_server.py", "start").Run()
 		exec.Command("python", "run_test_server.py", "start_keep").Run()
 	}
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-	os.Chdir(os.ExpandEnv("$GOPATH../python"))
+	os.Chdir(pythonDir())
 	exec.Command("python", "run_test_server.py", "stop_keep").Run()
 	exec.Command("python", "run_test_server.py", "stop").Run()
 }
@@ -869,7 +875,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
 	c.Check(content, DeepEquals, []byte("foo"))
 }
 
-func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
+func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
 	os.Setenv("ARVADOS_API_HOST", "localhost:3001")
 	os.Setenv("ARVADOS_API_TOKEN", "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
 	os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
@@ -882,12 +888,21 @@ func (s *ServerRequiredSuite) TestPutAndGet(c *C) {
 	c.Check(replicas, Equals, 2)
 	c.Check(err, Equals, nil)
 
-	r, n, url2, err := kc.Get(hash)
-	c.Check(err, Equals, nil)
-	c.Check(n, Equals, int64(3))
-	c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+	{
+		r, n, url2, err := kc.Get(hash)
+		c.Check(err, Equals, nil)
+		c.Check(n, Equals, int64(3))
+		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
 
-	content, err2 := ioutil.ReadAll(r)
-	c.Check(err2, Equals, nil)
-	c.Check(content, DeepEquals, []byte("foo"))
+		content, err2 := ioutil.ReadAll(r)
+		c.Check(err2, Equals, nil)
+		c.Check(content, DeepEquals, []byte("foo"))
+	}
+
+	{
+		n, url2, err := kc.Ask(hash)
+		c.Check(err, Equals, nil)
+		c.Check(n, Equals, int64(3))
+		c.Check(url2, Equals, fmt.Sprintf("http://localhost:25108/%s", hash))
+	}
 }
diff --git a/services/keep/build.sh b/services/keep/build.sh
deleted file mode 100755
index 26648ef..0000000
--- a/services/keep/build.sh
+++ /dev/null
@@ -1,36 +0,0 @@
-#! /bin/sh
-
-# This script builds a Keep executable and installs it in
-# ./bin/keep.
-#
-# In idiomatic Go style, a user would install Keep with something
-# like:
-#
-#     go get arvados.org/keep
-#     go install arvados.org/keep
-#
-# which would download both the Keep source and any third-party
-# packages it depends on.
-#
-# Since the Keep source is bundled within the overall Arvados source,
-# "go get" is not the primary tool for delivering Keep source and this
-# process doesn't work.  Instead, this script sets the environment
-# properly and fetches any necessary dependencies by hand.
-
-if [ -z "$GOPATH" ]
-then
-    GOPATH=$(pwd)
-else
-    GOPATH=$(pwd):${GOPATH}
-fi
-
-export GOPATH
-
-set -o errexit   # fail if any command returns an error
-
-mkdir -p pkg
-mkdir -p bin
-go get github.com/gorilla/mux
-go install keep
-ls -l bin/keep
-echo "success!"
diff --git a/services/keep/go.sh b/services/keep/go.sh
new file mode 100755
index 0000000..fa6b5f6
--- /dev/null
+++ b/services/keep/go.sh
@@ -0,0 +1,12 @@
+#! /bin/sh
+
+rootdir=$(dirname $0)
+GOPATH=$rootdir:$rootdir/../../sdk/go:$GOPATH
+export GOPATH
+
+mkdir -p pkg
+mkdir -p bin
+
+go get github.com/gorilla/mux
+
+go $*
diff --git a/services/keep/src/arvados.org/keepproxy/keepproxy.go b/services/keep/src/arvados.org/keepproxy/keepproxy.go
new file mode 100644
index 0000000..c118f22
--- /dev/null
+++ b/services/keep/src/arvados.org/keepproxy/keepproxy.go
@@ -0,0 +1,298 @@
+package main
+
+import (
+	"arvados.org/keepclient"
+	"flag"
+	"fmt"
+	"github.com/gorilla/mux"
+	"io"
+	"log"
+	"net/http"
+	"sync"
+	"time"
+)
+
+// Default TCP address on which to listen for requests.
+// Initialized by the --listen flag.
+const DEFAULT_ADDR = ":25107"
+
+func main() {
+	var (
+		listen           string
+		no_get           bool
+		no_put           bool
+		no_head          bool
+		default_replicas int
+	)
+
+	flag.StringVar(
+		&listen,
+		"listen",
+		DEFAULT_ADDR,
+		"Interface on which to listen for requests, in the format "+
+			"ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
+			"to listen on all network interfaces.")
+	flag.BoolVar(
+		&no_get,
+		"no-get",
+		true,
+		"If true, disable GET operations")
+
+	flag.BoolVar(
+		&no_get,
+		"no-put",
+		false,
+		"If true, disable PUT operations")
+
+	flag.BoolVar(
+		&no_head,
+		"no-head",
+		false,
+		"If true, disable HEAD operations")
+
+	flag.IntVar(
+		&default_replicas,
+		"default-replicas",
+		2,
+		"Default number of replicas to write if not specified by the client.")
+
+	flag.Parse()
+
+	if no_get == false {
+		log.Print("Must specify --no-get")
+		return
+	}
+
+	kc, err := keepclient.MakeKeepClient()
+	if err != nil {
+		log.Print(err)
+		return
+	}
+
+	kc.Want_replicas = default_replicas
+
+	// Tell the built-in HTTP server to direct all requests to the REST
+	// router.
+	http.Handle("/", MakeRESTRouter(!no_get, !no_put, !no_head, kc))
+
+	// Start listening for requests.
+	http.ListenAndServe(listen, nil)
+}
+
+type ApiTokenCache struct {
+	tokens     map[string]int64
+	lock       sync.Mutex
+	expireTime int64
+}
+
+// Cache the token and set an expire time.  If we already have an expire time
+// on the token, it is not updated.
+func (this *ApiTokenCache) RememberToken(token string) {
+	this.lock.Lock()
+	defer this.lock.Unlock()
+
+	now := time.Now().Unix()
+	if this.tokens[token] == 0 {
+		this.tokens[token] = now + this.expireTime
+	}
+}
+
+// Check if the cached token is known and still believed to be valid.
+func (this *ApiTokenCache) RecallToken(token string) bool {
+	this.lock.Lock()
+	defer this.lock.Unlock()
+
+	now := time.Now().Unix()
+	if this.tokens[token] == 0 {
+		// Unknown token
+		return false
+	} else if now < this.tokens[token] {
+		// Token is known and still valid
+		return true
+	} else {
+		// Token is expired
+		this.tokens[token] = 0
+		return false
+	}
+}
+
+func CheckAuthorizationHeader(kc keepclient.KeepClient, cache *ApiTokenCache, req *http.Request) bool {
+	if req.Header.Get("Authorization") == "" {
+		return false
+	}
+
+	var tok string
+	_, err := fmt.Sscanf(req.Header.Get("Authorization"), "OAuth2 %s", &tok)
+	if err != nil {
+		// Scanning error
+		return false
+	}
+
+	if cache.RecallToken(tok) {
+		// Valid in the cache, short circut
+		return true
+	}
+
+	var usersreq *http.Request
+
+	if usersreq, err = http.NewRequest("GET", fmt.Sprintf("https://%s/arvados/v1/users/current", kc.ApiServer), nil); err != nil {
+		// Can't construct the request
+		log.Print("CheckAuthorizationHeader error: %v", err)
+		return false
+	}
+
+	// Add api token header
+	usersreq.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", tok))
+
+	// Actually make the request
+	var resp *http.Response
+	if resp, err = kc.Client.Do(usersreq); err != nil {
+		// Something else failed
+		log.Print("CheckAuthorizationHeader error: %v", err)
+		return false
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		// Bad status
+		return false
+	}
+
+	// Success!  Update cache
+	cache.RememberToken(tok)
+
+	return true
+}
+
+type GetBlockHandler struct {
+	keepclient.KeepClient
+	*ApiTokenCache
+}
+
+type PutBlockHandler struct {
+	keepclient.KeepClient
+	*ApiTokenCache
+}
+
+// MakeRESTRouter
+//     Returns a mux.Router that passes GET and PUT requests to the
+//     appropriate handlers.
+//
+func MakeRESTRouter(
+	enable_get bool,
+	enable_put bool,
+	enable_head bool,
+	kc keepclient.KeepClient) *mux.Router {
+
+	t := &ApiTokenCache{tokens: make(map[string]int64), expireTime: 300}
+
+	rest := mux.NewRouter()
+	gh := rest.Handle(`/{hash:[0-9a-f]{32}}`, GetBlockHandler{kc, t})
+	ghsig := rest.Handle(
+		`/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
+		GetBlockHandler{kc, t})
+	ph := rest.Handle(`/{hash:[0-9a-f]{32}}`, PutBlockHandler{kc, t})
+
+	if enable_get {
+		gh.Methods("GET")
+		ghsig.Methods("GET")
+	}
+
+	if enable_put {
+		ph.Methods("PUT")
+	}
+
+	if enable_head {
+		gh.Methods("HEAD")
+		ghsig.Methods("HEAD")
+	}
+
+	return rest
+}
+
+func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	hash := mux.Vars(req)["hash"]
+	signature := mux.Vars(req)["signature"]
+	timestamp := mux.Vars(req)["timestamp"]
+
+	var reader io.ReadCloser
+	var err error
+
+	if req.Method == "GET" {
+		reader, _, _, err = this.KeepClient.AuthorizedGet(hash, signature, timestamp)
+	} else if req.Method == "HEAD" {
+		_, _, err = this.KeepClient.AuthorizedAsk(hash, signature, timestamp)
+	}
+
+	switch err {
+	case nil:
+		io.Copy(resp, reader)
+	case keepclient.BlockNotFound:
+		http.Error(resp, "Not found", http.StatusNotFound)
+	default:
+		http.Error(resp, err.Error(), http.StatusBadGateway)
+	}
+
+	if reader != nil {
+		reader.Close()
+	}
+}
+
+func (this PutBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	if !CheckAuthorizationHeader(this.KeepClient, this.ApiTokenCache, req) {
+		http.Error(resp, "Missing or invalid Authorization header", http.StatusForbidden)
+	}
+
+	hash := mux.Vars(req)["hash"]
+
+	var contentLength int64 = -1
+	if req.Header.Get("Content-Length") != "" {
+		_, err := fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &contentLength)
+		if err != nil {
+			resp.Header().Set("Content-Length", fmt.Sprintf("%d", contentLength))
+		}
+
+	}
+
+	if contentLength < 1 {
+		http.Error(resp, "Must include Content-Length header", http.StatusLengthRequired)
+		return
+	}
+
+	// Check if the client specified the number of replicas
+	if req.Header.Get("X-Keep-Desired-Replicas") != "" {
+		var r int
+		_, err := fmt.Sscanf(req.Header.Get("X-Keep-Desired-Replicas"), "%d", &r)
+		if err != nil {
+			this.KeepClient.Want_replicas = r
+		}
+	}
+
+	// Now try to put the block through
+	replicas, err := this.KeepClient.PutHR(hash, req.Body, contentLength)
+
+	// Tell the client how many successful PUTs we accomplished
+	resp.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", replicas))
+
+	switch err {
+	case nil:
+		// Default will return http.StatusOK
+
+	case keepclient.OversizeBlockError:
+		// Too much data
+		http.Error(resp, fmt.Sprintf("Exceeded maximum blocksize %d", keepclient.BLOCKSIZE), http.StatusRequestEntityTooLarge)
+
+	case keepclient.InsufficientReplicasError:
+		if replicas > 0 {
+			// At least one write is considered success.  The
+			// client can decide if getting less than the number of
+			// replications it asked for is a fatal error.
+			// Default will return http.StatusOK
+		} else {
+			http.Error(resp, "", http.StatusServiceUnavailable)
+		}
+
+	default:
+		http.Error(resp, err.Error(), http.StatusBadGateway)
+	}
+
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list