[ARVADOS] updated: 95d963b2d176e0ffe0797541368463fb5deaf3cf
git at public.curoverse.com
git at public.curoverse.com
Tue Jul 22 16:10:47 EDT 2014
Summary of changes:
services/keep/src/keep/{keep.go => handlers.go} | 283 +-------------
services/keep/src/keep/keep.go | 480 +-----------------------
2 files changed, 24 insertions(+), 739 deletions(-)
copy services/keep/src/keep/{keep.go => handlers.go} (63%)
via 95d963b2d176e0ffe0797541368463fb5deaf3cf (commit)
via 6a6fe8d14502fb47eb4e6b2c0b1cf6d2cdf7a551 (commit)
from f0abaec0c637772b781993a63b98aab85072eb97 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 95d963b2d176e0ffe0797541368463fb5deaf3cf
Merge: f0abaec 6a6fe8d
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Jul 22 16:10:24 2014 -0400
Merge branch '2769-keep-delete-request'
Refs #2769
commit 6a6fe8d14502fb47eb4e6b2c0b1cf6d2cdf7a551
Author: Tim Pierce <twp at curoverse.com>
Date: Tue Jul 22 16:09:28 2014 -0400
2769: reorganize REST handlers
Reorganizing REST handlers into their own source file.
Refs #2769.
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/handlers.go
similarity index 63%
copy from services/keep/src/keep/keep.go
copy to services/keep/src/keep/handlers.go
index a3f66e9..a5d5e47 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/handlers.go
@@ -1,20 +1,23 @@
package main
+// REST handlers for Keep are implemented here.
+//
+// GetBlockHandler (GET /locator)
+// PutBlockHandler (PUT /locator)
+// IndexHandler (GET /index, GET /index/prefix)
+// StatusHandler (GET /status.json)
+
import (
"bufio"
"bytes"
"crypto/md5"
"encoding/json"
- "flag"
"fmt"
"github.com/gorilla/mux"
"io"
- "io/ioutil"
"log"
- "net"
"net/http"
"os"
- "os/signal"
"regexp"
"runtime"
"strconv"
@@ -23,267 +26,8 @@ import (
"time"
)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
-
-// A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
-
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
-// in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
-
-var PROC_MOUNTS = "/proc/mounts"
-
-// The Keep VolumeManager maintains a list of available volumes.
-// Initialized by the --volumes flag (or by FindKeepVolumes).
-var KeepVM VolumeManager
-
-// enforce_permissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
-
-// permission_ttl is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
-
-// data_manager_token represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
-
-// ==========
-// Error types.
-//
-type KeepError struct {
- HTTPCode int
- ErrMsg string
-}
-
-var (
- BadRequestError = &KeepError{400, "Bad Request"}
- CollisionError = &KeepError{500, "Collision"}
- RequestHashError= &KeepError{422, "Hash mismatch in request"}
- PermissionError = &KeepError{403, "Forbidden"}
- DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
- ExpiredError = &KeepError{401, "Expired permission signature"}
- NotFoundError = &KeepError{404, "Not Found"}
- GenericError = &KeepError{500, "Fail"}
- FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
-)
-
-func (e *KeepError) Error() string {
- return e.ErrMsg
-}
-
-// TODO(twp): continue moving as much code as possible out of main
-// so it can be effectively tested. Esp. handling and postprocessing
-// of command line flags (identifying Keep volumes and initializing
-// permission arguments).
-
-func main() {
- log.Println("Keep started: pid", os.Getpid())
-
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
-
- var (
- data_manager_token_file string
- listen string
- permission_key_file string
- permission_ttl_sec int
- serialize_io bool
- volumearg string
- pidfile string
- )
- flag.StringVar(
- &data_manager_token_file,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforce_permissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
- flag.StringVar(
- &permission_key_file,
- "permission-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "permission signatures.")
- flag.IntVar(
- &permission_ttl_sec,
- "permission-ttl",
- 1209600,
- "Expiration time (in seconds) for newly generated permission "+
- "signatures.")
- flag.BoolVar(
- &serialize_io,
- "serialize",
- false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
- "volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
-
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file")
-
- flag.Parse()
-
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
- }
-
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
- }
- }
-
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
- }
-
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if data_manager_token_file != "" {
- if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
- data_manager_token = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
- }
- }
- if permission_key_file != "" {
- if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
- }
- }
-
- // Initialize permission TTL
- permission_ttl = time.Duration(permission_ttl_sec) * time.Second
-
- // If --enforce-permissions is true, we must have a permission key
- // to continue.
- if PermissionSecret == nil {
- if enforce_permissions {
- log.Fatal("--enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, run Keep with --permission-key-file=<path> " +
- "to define the location of a file containing the permission key.")
- }
- }
-
- // Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
-
- // Tell the built-in HTTP server to direct all requests to the REST
- // router.
- http.Handle("/", MakeRESTRouter())
-
- // Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
- if err != nil {
- log.Fatal(err)
- }
-
- // Shut down the server gracefully (by closing the listener)
- // if SIGTERM is received.
- term := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- s := <-sig
- log.Println("caught signal:", s)
- listener.Close()
- }(term)
- signal.Notify(term, syscall.SIGTERM)
-
- if pidfile != "" {
- f, err := os.Create(pidfile)
- if err == nil {
- fmt.Fprint(f, os.Getpid())
- f.Close()
- } else {
- log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
- }
- }
-
- // Start listening for requests.
- srv := &http.Server{Addr: listen}
- srv.Serve(listener)
-
- log.Println("shutting down")
-
- if pidfile != "" {
- os.Remove(pidfile)
- }
-}
-
-// MakeRESTRouter
-// Returns a mux.Router that passes GET and PUT requests to the
-// appropriate handlers.
+// MakeRESTRouter returns a new mux.Router that forwards all Keep
+// requests to the appropriate handlers.
//
func MakeRESTRouter() *mux.Router {
rest := mux.NewRouter()
@@ -324,11 +68,12 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// FindKeepVolumes
-// Returns a list of Keep volumes mounted on this system.
+// FindKeepVolumes scans all mounted volumes on the system for Keep
+// volumes, and returns a list of matching paths.
//
-// A Keep volume is a normal or tmpfs volume with a /keep
-// directory at the top level of the mount point.
+// A device is assumed to be a Keep volume if it is a normal or tmpfs
+// volume and has a "/keep" directory directly underneath the mount
+// point.
//
func FindKeepVolumes() []string {
vols := make([]string, 0)
diff --git a/services/keep/src/keep/keep.go b/services/keep/src/keep/keep.go
index a3f66e9..c34c268 100644
--- a/services/keep/src/keep/keep.go
+++ b/services/keep/src/keep/keep.go
@@ -1,23 +1,15 @@
package main
import (
- "bufio"
"bytes"
- "crypto/md5"
- "encoding/json"
"flag"
"fmt"
- "github.com/gorilla/mux"
- "io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
- "regexp"
- "runtime"
- "strconv"
"strings"
"syscall"
"time"
@@ -70,16 +62,16 @@ type KeepError struct {
}
var (
- BadRequestError = &KeepError{400, "Bad Request"}
- CollisionError = &KeepError{500, "Collision"}
- RequestHashError= &KeepError{422, "Hash mismatch in request"}
- PermissionError = &KeepError{403, "Forbidden"}
- DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
- ExpiredError = &KeepError{401, "Expired permission signature"}
- NotFoundError = &KeepError{404, "Not Found"}
- GenericError = &KeepError{500, "Fail"}
- FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
+ BadRequestError = &KeepError{400, "Bad Request"}
+ CollisionError = &KeepError{500, "Collision"}
+ RequestHashError = &KeepError{422, "Hash mismatch in request"}
+ PermissionError = &KeepError{403, "Forbidden"}
+ DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
+ ExpiredError = &KeepError{401, "Expired permission signature"}
+ NotFoundError = &KeepError{404, "Not Found"}
+ GenericError = &KeepError{500, "Fail"}
+ FullError = &KeepError{503, "Full"}
+ TooLongError = &KeepError{504, "Timeout"}
)
func (e *KeepError) Error() string {
@@ -280,455 +272,3 @@ func main() {
os.Remove(pidfile)
}
}
-
-// MakeRESTRouter
-// Returns a mux.Router that passes GET and PUT requests to the
-// appropriate handlers.
-//
-func MakeRESTRouter() *mux.Router {
- rest := mux.NewRouter()
-
- rest.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/{hash:[0-9a-f]{32}}+{hints}`,
- GetBlockHandler).Methods("GET", "HEAD")
-
- rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
-
- // For IndexHandler we support:
- // /index - returns all locators
- // /index/{prefix} - returns all locators that begin with {prefix}
- // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
- // If {prefix} is the empty string, return an index of all locators
- // (so /index and /index/ behave identically)
- // A client may supply a full 32-digit locator string, in which
- // case the server will return an index with either zero or one
- // entries. This usage allows a client to check whether a block is
- // present, and its size and upload time, without retrieving the
- // entire block.
- //
- rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(
- `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
- rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
-
- // Any request which does not match any of these routes gets
- // 400 Bad Request.
- rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
-
- return rest
-}
-
-func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
- http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
-}
-
-// FindKeepVolumes
-// Returns a list of Keep volumes mounted on this system.
-//
-// A Keep volume is a normal or tmpfs volume with a /keep
-// directory at the top level of the mount point.
-//
-func FindKeepVolumes() []string {
- vols := make([]string, 0)
-
- if f, err := os.Open(PROC_MOUNTS); err != nil {
- log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
- } else {
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- dev, mount := args[0], args[1]
- if mount != "/" &&
- (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
- keep := mount + "/keep"
- if st, err := os.Stat(keep); err == nil && st.IsDir() {
- vols = append(vols, keep)
- }
- }
- }
- if err := scanner.Err(); err != nil {
- log.Fatal(err)
- }
- }
- return vols
-}
-
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- hash := mux.Vars(req)["hash"]
-
- log.Printf("%s %s", req.Method, hash)
-
- hints := mux.Vars(req)["hints"]
-
- // Parse the locator string and hints from the request.
- // TODO(twp): implement a Locator type.
- var signature, timestamp string
- if hints != "" {
- signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
- for _, hint := range strings.Split(hints, "+") {
- if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
- // Server ignores size hints
- } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
- signature = m[1]
- timestamp = m[2]
- } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
- // Any unknown hint that starts with an uppercase letter is
- // presumed to be valid and ignored, to permit forward compatibility.
- } else {
- // Unknown format; not a valid locator.
- http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
- return
- }
- }
- }
-
- // If permission checking is in effect, verify this
- // request's permission signature.
- if enforce_permissions {
- if signature == "" || timestamp == "" {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
- return
- } else if IsExpired(timestamp) {
- http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
- return
- } else {
- req_locator := req.URL.Path[1:] // strip leading slash
- if !VerifySignature(req_locator, GetApiToken(req)) {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
- return
- }
- }
- }
-
- block, err := GetBlock(hash)
-
- // Garbage collect after each GET. Fixes #2865.
- // TODO(twp): review Keep memory usage and see if there's
- // a better way to do this than blindly garbage collecting
- // after every block.
- defer runtime.GC()
-
- if err != nil {
- // This type assertion is safe because the only errors
- // GetBlock can return are DiskHashError or NotFoundError.
- if err == NotFoundError {
- log.Printf("%s: not found, giving up\n", hash)
- }
- http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
- return
- }
-
- resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
-
- _, err = resp.Write(block)
- if err != nil {
- log.Printf("GetBlockHandler: writing response: %s", err)
- }
-
- return
-}
-
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
- // Garbage collect after each PUT. Fixes #2865.
- // See also GetBlockHandler.
- defer runtime.GC()
-
- hash := mux.Vars(req)["hash"]
-
- log.Printf("%s %s", req.Method, hash)
-
- // Read the block data to be stored.
- // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
- //
- if req.ContentLength > BLOCKSIZE {
- http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
- return
- }
-
- buf := make([]byte, req.ContentLength)
- nread, err := io.ReadFull(req.Body, buf)
- if err != nil {
- http.Error(resp, err.Error(), 500)
- } else if int64(nread) < req.ContentLength {
- http.Error(resp, "request truncated", 500)
- } else {
- if err := PutBlock(buf, hash); err == nil {
- // Success; add a size hint, sign the locator if
- // possible, and return it to the client.
- return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
- api_token := GetApiToken(req)
- if PermissionSecret != nil && api_token != "" {
- expiry := time.Now().Add(permission_ttl)
- return_hash = SignLocator(return_hash, api_token, expiry)
- }
- resp.Write([]byte(return_hash + "\n"))
- } else {
- ke := err.(*KeepError)
- http.Error(resp, ke.Error(), ke.HTTPCode)
- }
- }
- return
-}
-
-// IndexHandler
-// A HandleFunc to address /index and /index/{prefix} requests.
-//
-func IndexHandler(resp http.ResponseWriter, req *http.Request) {
- prefix := mux.Vars(req)["prefix"]
-
- // Only the data manager may issue /index requests,
- // and only if enforce_permissions is enabled.
- // All other requests return 403 Forbidden.
- api_token := GetApiToken(req)
- if !enforce_permissions ||
- api_token == "" ||
- data_manager_token != api_token {
- http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
- return
- }
- var index string
- for _, vol := range KeepVM.Volumes() {
- index = index + vol.Index(prefix)
- }
- resp.Write([]byte(index))
-}
-
-// StatusHandler
-// Responds to /status.json requests with the current node status,
-// described in a JSON structure.
-//
-// The data given in a status.json response includes:
-// volumes - a list of Keep volumes currently in use by this server
-// each volume is an object with the following fields:
-// * mount_point
-// * device_num (an integer identifying the underlying filesystem)
-// * bytes_free
-// * bytes_used
-//
-type VolumeStatus struct {
- MountPoint string `json:"mount_point"`
- DeviceNum uint64 `json:"device_num"`
- BytesFree uint64 `json:"bytes_free"`
- BytesUsed uint64 `json:"bytes_used"`
-}
-
-type NodeStatus struct {
- Volumes []*VolumeStatus `json:"volumes"`
-}
-
-func StatusHandler(resp http.ResponseWriter, req *http.Request) {
- st := GetNodeStatus()
- if jstat, err := json.Marshal(st); err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s\n", err)
- log.Printf("NodeStatus = %v\n", st)
- http.Error(resp, err.Error(), 500)
- }
-}
-
-// GetNodeStatus
-// Returns a NodeStatus struct describing this Keep
-// node's current status.
-//
-func GetNodeStatus() *NodeStatus {
- st := new(NodeStatus)
-
- st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
- for i, vol := range KeepVM.Volumes() {
- st.Volumes[i] = vol.Status()
- }
- return st
-}
-
-// GetVolumeStatus
-// Returns a VolumeStatus describing the requested volume.
-//
-func GetVolumeStatus(volume string) *VolumeStatus {
- var fs syscall.Statfs_t
- var devnum uint64
-
- if fi, err := os.Stat(volume); err == nil {
- devnum = fi.Sys().(*syscall.Stat_t).Dev
- } else {
- log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
- return nil
- }
-
- err := syscall.Statfs(volume, &fs)
- if err != nil {
- log.Printf("GetVolumeStatus: statfs: %s\n", err)
- return nil
- }
- // These calculations match the way df calculates disk usage:
- // "free" space is measured by fs.Bavail, but "used" space
- // uses fs.Blocks - fs.Bfree.
- free := fs.Bavail * uint64(fs.Bsize)
- used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{volume, devnum, free, used}
-}
-
-func GetBlock(hash string) ([]byte, error) {
- // Attempt to read the requested hash from a keep volume.
- error_to_caller := NotFoundError
-
- for _, vol := range KeepVM.Volumes() {
- if buf, err := vol.Get(hash); err != nil {
- // IsNotExist is an expected error and may be ignored.
- // (If all volumes report IsNotExist, we return a NotFoundError)
- // All other errors should be logged but we continue trying to
- // read.
- switch {
- case os.IsNotExist(err):
- continue
- default:
- log.Printf("GetBlock: reading %s: %s\n", hash, err)
- }
- } else {
- // Double check the file checksum.
- //
- filehash := fmt.Sprintf("%x", md5.Sum(buf))
- if filehash != hash {
- // TODO(twp): this condition probably represents a bad disk and
- // should raise major alarm bells for an administrator: e.g.
- // they should be sent directly to an event manager at high
- // priority or logged as urgent problems.
- //
- log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
- vol, hash, filehash)
- error_to_caller = DiskHashError
- } else {
- // Success!
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
- vol, hash)
- }
- return buf, nil
- }
- }
- }
-
- if error_to_caller != NotFoundError {
- log.Printf("%s: checksum mismatch, no good copy found\n", hash)
- }
- return nil, error_to_caller
-}
-
-/* PutBlock(block, hash)
- Stores the BLOCK (identified by the content id HASH) in Keep.
-
- The MD5 checksum of the block must be identical to the content id HASH.
- If not, an error is returned.
-
- PutBlock stores the BLOCK on the first Keep volume with free space.
- A failure code is returned to the user only if all volumes fail.
-
- On success, PutBlock returns nil.
- On failure, it returns a KeepError with one of the following codes:
-
- 500 Collision
- A different block with the same hash already exists on this
- Keep server.
- 422 MD5Fail
- The MD5 hash of the BLOCK does not match the argument HASH.
- 503 Full
- There was not enough space left in any Keep volume to store
- the object.
- 500 Fail
- The object could not be stored for some other reason (e.g.
- all writes failed). The text of the error message should
- provide as much detail as possible.
-*/
-
-func PutBlock(block []byte, hash string) error {
- // Check that BLOCK's checksum matches HASH.
- blockhash := fmt.Sprintf("%x", md5.Sum(block))
- if blockhash != hash {
- log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
- return RequestHashError
- }
-
- // If we already have a block on disk under this identifier, return
- // success (but check for MD5 collisions).
- // The only errors that GetBlock can return are DiskHashError and NotFoundError.
- // In either case, we want to write our new (good) block to disk,
- // so there is nothing special to do if err != nil.
- if oldblock, err := GetBlock(hash); err == nil {
- if bytes.Compare(block, oldblock) == 0 {
- return nil
- } else {
- return CollisionError
- }
- }
-
- // Choose a Keep volume to write to.
- // If this volume fails, try all of the volumes in order.
- vol := KeepVM.Choose()
- if err := vol.Put(hash, block); err == nil {
- return nil // success!
- } else {
- allFull := true
- for _, vol := range KeepVM.Volumes() {
- err := vol.Put(hash, block)
- if err == nil {
- return nil // success!
- }
- if err != FullError {
- // The volume is not full but the write did not succeed.
- // Report the error and continue trying.
- allFull = false
- log.Printf("%s: Write(%s): %s\n", vol, hash, err)
- }
- }
-
- if allFull {
- log.Printf("all Keep volumes full")
- return FullError
- } else {
- log.Printf("all Keep volumes failed")
- return GenericError
- }
- }
-}
-
-// IsValidLocator
-// Return true if the specified string is a valid Keep locator.
-// When Keep is extended to support hash types other than MD5,
-// this should be updated to cover those as well.
-//
-func IsValidLocator(loc string) bool {
- match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
- if err == nil {
- return match
- }
- log.Printf("IsValidLocator: %s\n", err)
- return false
-}
-
-// GetApiToken returns the OAuth2 token from the Authorization
-// header of a HTTP request, or an empty string if no matching
-// token is found.
-func GetApiToken(req *http.Request) string {
- if auth, ok := req.Header["Authorization"]; ok {
- if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
- log.Println(err)
- } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
- return match[1]
- }
- }
- return ""
-}
-
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestamp_hex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestamp_hex string) bool {
- ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s\n", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list