[ARVADOS] updated: 64efd1030538d59821ce288a7674e29d49c35744
Git user
git at public.curoverse.com
Fri Sep 30 17:15:38 EDT 2016
Summary of changes:
services/keepstore/azure_blob_volume.go | 138 ++++++++++++++-------------
services/keepstore/azure_blob_volume_test.go | 12 ++-
services/keepstore/config.go | 34 ++++---
services/keepstore/keepstore.go | 8 +-
services/keepstore/s3_volume.go | 32 ++++++-
services/keepstore/usage.go | 14 +++
services/keepstore/volume.go | 7 ++
services/keepstore/volume_unix.go | 36 ++++---
8 files changed, 177 insertions(+), 104 deletions(-)
discards 50eaa6771e3937436a409d201f3476160fd16703 (commit)
discards 002996388ceb070dbb36d8538a3e5358f535cd63 (commit)
discards 22d8f1bf10b2539aab6dddb0d42545198f78f099 (commit)
via 64efd1030538d59821ce288a7674e29d49c35744 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (50eaa6771e3937436a409d201f3476160fd16703)
\
N -- N -- N (64efd1030538d59821ce288a7674e29d49c35744)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 64efd1030538d59821ce288a7674e29d49c35744
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Sep 30 09:36:57 2016 -0400
9956: Load volume config from YAML file
diff --git a/sdk/go/arvados/duration.go b/sdk/go/arvados/duration.go
index 7b87aee..a64eaac 100644
--- a/sdk/go/arvados/duration.go
+++ b/sdk/go/arvados/duration.go
@@ -28,6 +28,11 @@ func (d Duration) String() string {
return time.Duration(d).String()
}
+// Duration returns a time.Duration
+func (d Duration) Duration() time.Duration {
+ return time.Duration(d)
+}
+
// Value implements flag.Value
func (d *Duration) Set(s string) error {
dur, err := time.ParseDuration(s)
diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
index 2217dd3..0c4d208 100644
--- a/sdk/go/streamer/streamer.go
+++ b/sdk/go/streamer/streamer.go
@@ -37,8 +37,11 @@ package streamer
import (
"io"
+ "errors"
)
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
type AsyncStream struct {
buffer []byte
requests chan sliceRequest
@@ -115,6 +118,9 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
// Close the responses channel
func (this *StreamReader) Close() error {
+ if this.stream == nil {
+ return ErrAlreadyClosed
+ }
this.stream.subtract_reader <- true
close(this.responses)
this.stream = nil
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 48cb026..41b6c69 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -40,41 +40,29 @@ func readKeyFromFile(file string) (string, error) {
}
type azureVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (s *azureVolumeAdder) Set(containerName string) error {
- if trashLifetime != 0 {
- return ErrNotImplemented
- }
+// String implements flag.Value
+func (s *azureVolumeAdder) String() string {
+ return "-"
+}
- if containerName == "" {
- return errors.New("no container name given")
- }
- if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
- return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
- }
- accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
- if err != nil {
- return err
- }
- azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
- if err != nil {
- return errors.New("creating Azure storage client: " + err.Error())
- }
- if flagSerializeIO {
- log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
- }
- v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+func (s *azureVolumeAdder) Set(containerName string) error {
+ s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
+ ContainerName: containerName,
+ StorageAccountName: azureStorageAccountName,
+ StorageAccountKeyFile: azureStorageAccountKeyFile,
+ AzureReplication: azureStorageReplication,
+ ReadOnly: flagReadonly,
+ })
return nil
}
func init() {
- flag.Var(&azureVolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
+
+ flag.Var(&azureVolumeAdder{theConfig},
"azure-storage-container-volume",
"Use the given container as a storage volume. Can be given multiple times.")
flag.StringVar(
@@ -86,7 +74,7 @@ func init() {
&azureStorageAccountKeyFile,
"azure-storage-account-key-file",
"",
- "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+ "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
flag.IntVar(
&azureStorageReplication,
"azure-storage-replication",
@@ -102,41 +90,64 @@ func init() {
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
type AzureBlobVolume struct {
- azClient storage.Client
- bsClient storage.BlobStorageClient
- containerName string
- readonly bool
- replication int
+ StorageAccountName string
+ StorageAccountKeyFile string
+ ContainerName string
+ AzureReplication int
+ ReadOnly bool
+
+ azClient storage.Client
+ bsClient storage.BlobStorageClient
}
-// NewAzureBlobVolume returns a new AzureBlobVolume using the given
-// client and container name. The replication argument specifies the
-// replication level to report when writing data.
-func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
- return &AzureBlobVolume{
- azClient: client,
- bsClient: client.GetBlobService(),
- containerName: containerName,
- readonly: readonly,
- replication: replication,
+// Examples implements VolumeWithExamples.
+func (*AzureBlobVolume) Examples() []Volume {
+ return []Volume{
+ &AzureBlobVolume{
+ StorageAccountName: "example-account-name",
+ StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
+ ContainerName: "example-container-name",
+ AzureReplication: 3,
+ },
}
}
-// Check returns nil if the volume is usable.
-func (v *AzureBlobVolume) Check() error {
- ok, err := v.bsClient.ContainerExists(v.containerName)
+// Type implements Volume.
+func (v *AzureBlobVolume) Type() string {
+ return "Azure"
+}
+
+// Start implements Volume.
+func (v *AzureBlobVolume) Start() error {
+ if v.ContainerName == "" {
+ return errors.New("no container name given")
+ }
+ if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
+ return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
+ }
+ accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
+ if err != nil {
+ return err
+ }
+ v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
+ if err != nil {
+ return fmt.Errorf("creating Azure storage client: %s", err)
+ }
+ v.bsClient = v.azClient.GetBlobService()
+
+ ok, err := v.bsClient.ContainerExists(v.ContainerName)
if err != nil {
return err
}
if !ok {
- return errors.New("container does not exist")
+ return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
return nil
}
// Return true if expires_at metadata attribute is found on the block
func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return false, metadata, v.translateError(err)
}
@@ -197,7 +208,7 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
if azureMaxGetBytes < BlockSize {
// Unfortunately the handler doesn't tell us how long the blob
// is expected to be, so we have to ask Azure.
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return 0, v.translateError(err)
}
@@ -228,9 +239,9 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
var rdr io.ReadCloser
var err error
if startPos == 0 && endPos == expectSize {
- rdr, err = v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
} else {
- rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+ rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
}
if err != nil {
errors[p] = err
@@ -268,7 +279,7 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
if trashed {
return os.ErrNotExist
}
- rdr, err := v.bsClient.GetBlob(v.containerName, loc)
+ rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
@@ -278,15 +289,15 @@ func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
// Put stores a Keep block as a block blob in the container.
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+ return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
}
// Touch updates the last-modified property of a block blob.
func (v *AzureBlobVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
trashed, metadata, err := v.checkTrashed(loc)
@@ -298,7 +309,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
}
metadata["touch"] = fmt.Sprintf("%d", time.Now())
- return v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
}
// Mtime returns the last-modified property of a block blob.
@@ -311,7 +322,7 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
return time.Time{}, os.ErrNotExist
}
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return time.Time{}, err
}
@@ -326,7 +337,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
Include: "metadata",
}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
return err
}
@@ -361,7 +372,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
// Trash a Keep block.
func (v *AzureBlobVolume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
@@ -370,26 +381,26 @@ func (v *AzureBlobVolume) Trash(loc string) error {
// we get the Etag before checking Mtime, and use If-Match to
// ensure we don't delete data if Put() or Touch() happens
// between our calls to Mtime() and DeleteBlob().
- props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
+ props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
if err != nil {
return err
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- // If trashLifetime == 0, just delete it
- if trashLifetime == 0 {
- return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
+ // If TrashLifetime == 0, just delete it
+ if theConfig.TrashLifetime == 0 {
+ return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
"If-Match": props.Etag,
})
}
// Otherwise, mark as trash
- return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
- "expires_at": fmt.Sprintf("%d", time.Now().Add(trashLifetime).Unix()),
+ return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+ "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
}, map[string]string{
"If-Match": props.Etag,
})
@@ -399,7 +410,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
// Delete the expires_at metadata attribute
func (v *AzureBlobVolume) Untrash(loc string) error {
// if expires_at does not exist, return NotFoundError
- metadata, err := v.bsClient.GetBlobMetadata(v.containerName, loc)
+ metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
if err != nil {
return v.translateError(err)
}
@@ -409,7 +420,7 @@ func (v *AzureBlobVolume) Untrash(loc string) error {
// reset expires_at metadata attribute
metadata["expires_at"] = ""
- err = v.bsClient.SetBlobMetadata(v.containerName, loc, metadata, nil)
+ err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
return v.translateError(err)
}
@@ -424,19 +435,19 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
// String returns a volume label, including the container name.
func (v *AzureBlobVolume) String() string {
- return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
+ return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
}
// Writable returns true, unless the -readonly flag was on when the
// volume was added.
func (v *AzureBlobVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the replication level of the container, as
// specified by the -azure-storage-replication argument.
func (v *AzureBlobVolume) Replication() int {
- return v.replication
+ return v.AzureReplication
}
// If possible, translate an Azure SDK error to a recognizable error
@@ -459,7 +470,7 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
return keepBlockRegexp.MatchString(s)
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *AzureBlobVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
@@ -467,7 +478,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
params := storage.ListBlobsParameters{Include: "metadata"}
for {
- resp, err := v.bsClient.ListBlobs(v.containerName, params)
+ resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
if err != nil {
log.Printf("EmptyTrash: ListBlobs: %v", err)
break
@@ -491,7 +502,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
continue
}
- err = v.bsClient.DeleteBlob(v.containerName, b.Name, map[string]string{
+ err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
"If-Match": b.Properties.Etag,
})
if err != nil {
diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go
index 5d556b3..c8c898f 100644
--- a/services/keepstore/azure_blob_volume_test.go
+++ b/services/keepstore/azure_blob_volume_test.go
@@ -365,7 +365,13 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
}
}
- v := NewAzureBlobVolume(azClient, container, readonly, replication)
+ v := &AzureBlobVolume{
+ ContainerName: container,
+ ReadOnly: readonly,
+ AzureReplication: replication,
+ azClient: azClient,
+ bsClient: azClient.GetBlobService(),
+ }
return &TestableAzureBlobVolume{
AzureBlobVolume: v,
@@ -570,11 +576,11 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
}
func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
- v.azHandler.PutRaw(v.containerName, locator, data)
+ v.azHandler.PutRaw(v.ContainerName, locator, data)
}
func (v *TestableAzureBlobVolume) TouchWithDate(locator string, lastPut time.Time) {
- v.azHandler.TouchWithDate(v.containerName, locator, lastPut)
+ v.azHandler.TouchWithDate(v.ContainerName, locator, lastPut)
}
func (v *TestableAzureBlobVolume) Teardown() {
diff --git a/services/keepstore/bufferpool_test.go b/services/keepstore/bufferpool_test.go
index 7b51b64..bce8237 100644
--- a/services/keepstore/bufferpool_test.go
+++ b/services/keepstore/bufferpool_test.go
@@ -12,12 +12,12 @@ type BufferPoolSuite struct{}
// Initialize a default-sized buffer pool for the benefit of test
// suites that don't run main().
func init() {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
// Restore sane default after bufferpool's own tests
func (s *BufferPoolSuite) TearDownTest(c *C) {
- bufs = newBufferPool(maxBuffers, BlockSize)
+ bufs = newBufferPool(theConfig.MaxBuffers, BlockSize)
}
func (s *BufferPoolSuite) TestBufferPoolBufSize(c *C) {
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
new file mode 100644
index 0000000..9c318d1
--- /dev/null
+++ b/services/keepstore/config.go
@@ -0,0 +1,179 @@
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Listen string
+
+ PIDFile string
+
+ MaxBuffers int
+ MaxRequests int
+
+ BlobSignatureTTL arvados.Duration
+ BlobSigningKeyFile string
+ RequireSignatures bool
+ SystemAuthTokenFile string
+ EnableDelete bool
+ TrashLifetime arvados.Duration
+ TrashCheckInterval arvados.Duration
+
+ Volumes VolumeList
+
+ blobSigningKey []byte
+ systemAuthToken string
+}
+
+var theConfig = DefaultConfig()
+
+// DefaultConfig returns the default configuration.
+func DefaultConfig() *Config {
+ return &Config{
+ Listen: ":25107",
+ MaxBuffers: 128,
+ RequireSignatures: true,
+ BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
+ TrashLifetime: arvados.Duration(14 * 24 * time.Hour),
+ TrashCheckInterval: arvados.Duration(24 * time.Hour),
+ Volumes: []Volume{},
+ }
+}
+
+// Start should be called exactly once: after setting all public
+// fields, and before using the config.
+func (cfg *Config) Start() error {
+ if cfg.MaxBuffers < 0 {
+ return fmt.Errorf("MaxBuffers must be greater than zero")
+ }
+ bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+ if cfg.MaxRequests < 1 {
+ cfg.MaxRequests = cfg.MaxBuffers * 2
+ log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
+ }
+
+ if cfg.BlobSigningKeyFile != "" {
+ buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
+ if err != nil {
+ return fmt.Errorf("reading blob signing key file: %s", err)
+ }
+ cfg.blobSigningKey = bytes.TrimSpace(buf)
+ if len(cfg.blobSigningKey) == 0 {
+ return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
+ }
+ } else if cfg.RequireSignatures {
+ return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
+ } else {
+ log.Println("Running without a blob signing key. Block locators " +
+ "returned by this server will not be signed, and will be rejected " +
+ "by a server that enforces permissions.")
+ log.Println("To fix this, use the BlobSigningKeyFile config entry.")
+ }
+
+ if fn := cfg.SystemAuthTokenFile; fn != "" {
+ buf, err := ioutil.ReadFile(fn)
+ if err != nil {
+ return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
+ }
+ cfg.systemAuthToken = strings.TrimSpace(string(buf))
+ }
+
+ if cfg.EnableDelete {
+ log.Print("Trash/delete features are enabled. WARNING: this has not " +
+ "been extensively tested. You should disable this unless you can afford to lose data.")
+ }
+
+ if len(cfg.Volumes) == 0 {
+ if (&unixVolumeAdder{cfg}).Discover() == 0 {
+ return fmt.Errorf("no volumes found")
+ }
+ }
+ for _, v := range cfg.Volumes {
+ if err := v.Start(); err != nil {
+ return fmt.Errorf("volume %s: %s", v, err)
+ }
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
+ }
+ return nil
+}
+
+// VolumeTypes is built up by init() funcs in the source files that
+// define the volume types.
+var VolumeTypes = []func() VolumeWithExamples{}
+
+type VolumeList []Volume
+
+// UnmarshalJSON, given an array of objects, deserializes each object
+// as the volume type indicated by the object's Type field.
+func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+ typeMap := map[string]func() VolumeWithExamples{}
+ for _, factory := range VolumeTypes {
+ t := factory().Type()
+ if _, ok := typeMap[t]; ok {
+ log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+ }
+ typeMap[t] = factory
+ }
+
+ var mapList []map[string]interface{}
+ err := json.Unmarshal(data, &mapList)
+ if err != nil {
+ return err
+ }
+ for _, mapIn := range mapList {
+ typeIn, ok := mapIn["Type"].(string)
+ if !ok {
+ return fmt.Errorf("invalid volume type %+v", mapIn["Type"])
+ }
+ factory, ok := typeMap[typeIn]
+ if !ok {
+ return fmt.Errorf("unsupported volume type %+q", typeIn)
+ }
+ data, err := json.Marshal(mapIn)
+ if err != nil {
+ return err
+ }
+ vol := factory()
+ err = json.Unmarshal(data, vol)
+ if err != nil {
+ return err
+ }
+ *vols = append(*vols, vol)
+ }
+ return nil
+}
+
+// MarshalJSON adds a "Type" field to each volume corresponding to its
+// Type().
+func (vl *VolumeList) MarshalJSON() ([]byte, error) {
+ data := []byte{'['}
+ for _, vs := range *vl {
+ j, err := json.Marshal(vs)
+ if err != nil {
+ return nil, err
+ }
+ if len(data) > 1 {
+ data = append(data, byte(','))
+ }
+ t, err := json.Marshal(vs.Type())
+ if err != nil {
+ panic(err)
+ }
+ data = append(data, j[0])
+ data = append(data, []byte(`"Type":`)...)
+ data = append(data, t...)
+ data = append(data, byte(','))
+ data = append(data, j[1:]...)
+ }
+ return append(data, byte(']')), nil
+}
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 7c17424..dc9bcb1 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -20,6 +20,8 @@ import (
"strings"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// A RequestTester represents the parameters for an HTTP request to
@@ -52,13 +54,13 @@ func TestGetHandler(t *testing.T) {
// Create locators for testing.
// Turn on permission settings so we can generate signed locators.
- enforcePermissions = true
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.RequireSignatures = true
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
var (
unsignedLocator = "/" + TestHash
- validTimestamp = time.Now().Add(blobSignatureTTL)
+ validTimestamp = time.Now().Add(theConfig.BlobSignatureTTL.Duration())
expiredTimestamp = time.Now().Add(-time.Hour)
signedLocator = "/" + SignLocator(TestHash, knownToken, validTimestamp)
expiredLocator = "/" + SignLocator(TestHash, knownToken, expiredTimestamp)
@@ -66,7 +68,7 @@ func TestGetHandler(t *testing.T) {
// -----------------
// Test unauthenticated request with permissions off.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// Unauthenticated request, unsigned locator
// => OK
@@ -90,7 +92,7 @@ func TestGetHandler(t *testing.T) {
// ----------------
// Permissions: on.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// Authenticated request, signed locator
// => OK
@@ -175,8 +177,8 @@ func TestPutHandler(t *testing.T) {
// ------------------
// With a server key.
- PermissionSecret = []byte(knownKey)
- blobSignatureTTL = 300 * time.Second
+ theConfig.blobSigningKey = []byte(knownKey)
+ theConfig.BlobSignatureTTL.Set("5m")
// When a permission key is available, the locator returned
// from an authenticated PUT request will be signed.
@@ -220,7 +222,7 @@ func TestPutHandler(t *testing.T) {
func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
defer teardown()
- dataManagerToken = "fake-data-manager-token"
+ theConfig.systemAuthToken = "fake-data-manager-token"
vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
vols[0].Readonly = true
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
@@ -232,15 +234,15 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
requestBody: TestBlock,
})
defer func(orig bool) {
- neverDelete = orig
- }(neverDelete)
- neverDelete = false
+ theConfig.EnableDelete = orig
+ }(theConfig.EnableDelete)
+ theConfig.EnableDelete = true
IssueRequest(
&RequestTester{
method: "DELETE",
uri: "/" + TestHash,
requestBody: TestBlock,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
})
type expect struct {
volnum int
@@ -274,7 +276,7 @@ func TestPutAndDeleteSkipReadonlyVolumes(t *testing.T) {
// - authenticated /index/prefix request | superuser
//
// The only /index requests that should succeed are those issued by the
-// superuser. They should pass regardless of the value of enforcePermissions.
+// superuser. They should pass regardless of the value of RequireSignatures.
//
func TestIndexHandler(t *testing.T) {
defer teardown()
@@ -291,7 +293,7 @@ func TestIndexHandler(t *testing.T) {
vols[0].Put(TestHash+".meta", []byte("metadata"))
vols[1].Put(TestHash2+".meta", []byte("metadata"))
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
unauthenticatedReq := &RequestTester{
method: "GET",
@@ -305,7 +307,7 @@ func TestIndexHandler(t *testing.T) {
superuserReq := &RequestTester{
method: "GET",
uri: "/index",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
unauthPrefixReq := &RequestTester{
method: "GET",
@@ -319,32 +321,32 @@ func TestIndexHandler(t *testing.T) {
superuserPrefixReq := &RequestTester{
method: "GET",
uri: "/index/" + TestHash[0:3],
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNoSuchPrefixReq := &RequestTester{
method: "GET",
uri: "/index/abcd",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserInvalidPrefixReq := &RequestTester{
method: "GET",
uri: "/index/xyz",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// -------------------------------------------------------------
// Only the superuser should be allowed to issue /index requests.
// ---------------------------
- // enforcePermissions enabled
+ // RequireSignatures enabled
// This setting should not affect tests passing.
- enforcePermissions = true
+ theConfig.RequireSignatures = true
// unauthenticated /index request
// => UnauthorizedError
response := IssueRequest(unauthenticatedReq)
ExpectStatusCode(t,
- "enforcePermissions on, unauthenticated request",
+ "RequireSignatures on, unauthenticated request",
UnauthorizedError.HTTPCode,
response)
@@ -381,9 +383,9 @@ func TestIndexHandler(t *testing.T) {
response)
// ----------------------------
- // enforcePermissions disabled
+ // RequireSignatures disabled
// Valid Request should still pass.
- enforcePermissions = false
+ theConfig.RequireSignatures = false
// superuser /index request
// => OK
@@ -477,15 +479,15 @@ func TestDeleteHandler(t *testing.T) {
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- // Explicitly set the blobSignatureTTL to 0 for these
+ // Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
// even though they have just been created.
- blobSignatureTTL = time.Duration(0)
+ theConfig.BlobSignatureTTL = arvados.Duration(0)
var userToken = "NOT DATA MANAGER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
- neverDelete = false
+ theConfig.EnableDelete = true
unauthReq := &RequestTester{
method: "DELETE",
@@ -501,13 +503,13 @@ func TestDeleteHandler(t *testing.T) {
superuserExistingBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
superuserNonexistentBlockReq := &RequestTester{
method: "DELETE",
uri: "/" + TestHash2,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
// Unauthenticated request returns PermissionError.
@@ -538,14 +540,14 @@ func TestDeleteHandler(t *testing.T) {
http.StatusNotFound,
response)
- // Authenticated admin request for existing block while neverDelete is set.
- neverDelete = true
+ // Authenticated admin request for existing block while EnableDelete is false.
+ theConfig.EnableDelete = false
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
"authenticated request, existing block, method disabled",
MethodDisabledError.HTTPCode,
response)
- neverDelete = false
+ theConfig.EnableDelete = true
// Authenticated admin request for existing block.
response = IssueRequest(superuserExistingBlockReq)
@@ -568,10 +570,10 @@ func TestDeleteHandler(t *testing.T) {
t.Error("superuserExistingBlockReq: block not deleted")
}
- // A DELETE request on a block newer than blobSignatureTTL
+ // A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
vols[0].Put(TestHash, TestBlock)
- blobSignatureTTL = time.Hour
+ theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
ExpectStatusCode(t,
@@ -623,7 +625,7 @@ func TestPullHandler(t *testing.T) {
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
pullq = NewWorkQueue()
@@ -668,13 +670,13 @@ func TestPullHandler(t *testing.T) {
},
{
"Valid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 pull requests\n",
},
{
"Invalid pull request from the data manager",
- RequestTester{"/pull", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/pull", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
@@ -729,7 +731,7 @@ func TestTrashHandler(t *testing.T) {
defer teardown()
var userToken = "USER TOKEN"
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
trashq = NewWorkQueue()
@@ -772,13 +774,13 @@ func TestTrashHandler(t *testing.T) {
},
{
"Valid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", goodJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", goodJSON},
http.StatusOK,
"Received 3 trash requests\n",
},
{
"Invalid trash list from the data manager",
- RequestTester{"/trash", dataManagerToken, "PUT", badJSON},
+ RequestTester{"/trash", theConfig.systemAuthToken, "PUT", badJSON},
http.StatusBadRequest,
"",
},
@@ -873,7 +875,7 @@ func TestPutNeedsOnlyOneBuffer(t *testing.T) {
select {
case <-ok:
case <-time.After(time.Second):
- t.Fatal("PUT deadlocks with maxBuffers==1")
+ t.Fatal("PUT deadlocks with MaxBuffers==1")
}
}
@@ -888,7 +890,7 @@ func TestPutHandlerNoBufferleak(t *testing.T) {
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, no server key
// => OK (unsigned response)
unsignedLocator := "/" + TestHash
@@ -925,9 +927,9 @@ func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
func TestGetHandlerClientDisconnect(t *testing.T) {
defer func(was bool) {
- enforcePermissions = was
- }(enforcePermissions)
- enforcePermissions = false
+ theConfig.RequireSignatures = was
+ }(theConfig.RequireSignatures)
+ theConfig.RequireSignatures = false
defer func(orig *bufferPool) {
bufs = orig
@@ -975,7 +977,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
// Invoke the GetBlockHandler a bunch of times to test for bufferpool resource
// leak.
-func TestGetHandlerNoBufferleak(t *testing.T) {
+func TestGetHandlerNoBufferLeak(t *testing.T) {
defer teardown()
// Prepare two test Keep volumes. Our block is stored on the second volume.
@@ -989,7 +991,7 @@ func TestGetHandlerNoBufferleak(t *testing.T) {
ok := make(chan bool)
go func() {
- for i := 0; i < maxBuffers+1; i++ {
+ for i := 0; i < theConfig.MaxBuffers+1; i++ {
// Unauthenticated request, unsigned locator
// => OK
unsignedLocator := "/" + TestHash
@@ -1040,7 +1042,7 @@ func TestUntrashHandler(t *testing.T) {
vols := KeepVM.AllWritable()
vols[0].Put(TestHash, TestBlock)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// unauthenticatedReq => UnauthorizedError
unauthenticatedReq := &RequestTester{
@@ -1070,7 +1072,7 @@ func TestUntrashHandler(t *testing.T) {
datamanagerWithBadHashReq := &RequestTester{
method: "PUT",
uri: "/untrash/thisisnotalocator",
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWithBadHashReq)
ExpectStatusCode(t,
@@ -1082,7 +1084,7 @@ func TestUntrashHandler(t *testing.T) {
datamanagerWrongMethodReq := &RequestTester{
method: "GET",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerWrongMethodReq)
ExpectStatusCode(t,
@@ -1094,7 +1096,7 @@ func TestUntrashHandler(t *testing.T) {
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response = IssueRequest(datamanagerReq)
ExpectStatusCode(t,
@@ -1119,13 +1121,13 @@ func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
defer KeepVM.Close()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
// datamanagerReq => StatusOK
datamanagerReq := &RequestTester{
method: "PUT",
uri: "/untrash/" + TestHash,
- apiToken: dataManagerToken,
+ apiToken: theConfig.systemAuthToken,
}
response := IssueRequest(datamanagerReq)
ExpectStatusCode(t,
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index a6798a9..54b8b48 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -71,7 +71,7 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
- if enforcePermissions {
+ if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
@@ -185,8 +185,8 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// return it to the client.
returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
apiToken := GetAPIToken(req)
- if PermissionSecret != nil && apiToken != "" {
- expiry := time.Now().Add(blobSignatureTTL)
+ if theConfig.blobSigningKey != nil && apiToken != "" {
+ expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
returnHash = SignLocator(returnHash, apiToken, expiry)
}
resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
@@ -196,7 +196,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
// IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
func IndexHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -334,7 +334,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
return
}
- if neverDelete {
+ if !theConfig.EnableDelete {
http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
return
}
@@ -419,7 +419,7 @@ type PullRequest struct {
// PullHandler processes "PUT /pull" requests for the data manager.
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -455,7 +455,7 @@ type TrashRequest struct {
// TrashHandler processes /trash requests.
func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -485,7 +485,7 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
- if !IsDataManagerToken(GetAPIToken(req)) {
+ if !IsSystemAuth(GetAPIToken(req)) {
http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
return
}
@@ -746,7 +746,7 @@ func CanDelete(apiToken string) bool {
}
// Blocks may be deleted only when Keep has been configured with a
// data manager.
- if IsDataManagerToken(apiToken) {
+ if IsSystemAuth(apiToken) {
return true
}
// TODO(twp): look up apiToken with the API server
@@ -755,8 +755,8 @@ func CanDelete(apiToken string) bool {
return false
}
-// IsDataManagerToken returns true if apiToken represents the data
-// manager's token.
-func IsDataManagerToken(apiToken string) bool {
- return dataManagerToken != "" && apiToken == dataManagerToken
+// IsSystemAuth returns true if the given token is allowed to perform
+// system level actions like deleting data.
+func IsSystemAuth(token string) bool {
+ return token != "" && token == theConfig.systemAuthToken
}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index 48b83de..9fc2db3 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -1,32 +1,23 @@
package main
import (
- "bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
- "strings"
"syscall"
"time"
-)
-// ======================
-// Configuration settings
-//
-// TODO(twp): make all of these configurable via command line flags
-// and/or configuration file settings.
-
-// Default TCP address on which to listen for requests.
-// Initialized by the --listen flag.
-const DefaultAddr = ":25107"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/ghodss/yaml"
+)
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
@@ -38,36 +29,6 @@ const MinFreeKilobytes = BlockSize / 1024
// ProcMounts /proc/mounts
var ProcMounts = "/proc/mounts"
-// enforcePermissions controls whether permission signatures
-// should be enforced (affecting GET and DELETE requests).
-// Initialized by the -enforce-permissions flag.
-var enforcePermissions bool
-
-// blobSignatureTTL is the time duration for which new permission
-// signatures (returned by PUT requests) will be valid.
-// Initialized by the -permission-ttl flag.
-var blobSignatureTTL time.Duration
-
-// dataManagerToken represents the API token used by the
-// Data Manager, and is required on certain privileged operations.
-// Initialized by the -data-manager-token-file flag.
-var dataManagerToken string
-
-// neverDelete can be used to prevent the DELETE handler from
-// actually deleting anything.
-var neverDelete = true
-
-// trashLifetime is the time duration after a block is trashed
-// during which it can be recovered using an /untrash request
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashLifetime time.Duration
-
-// trashCheckInterval is the time duration at which the emptyTrash goroutine
-// will check and delete expired trashed blocks. Default is one day.
-// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
-var trashCheckInterval time.Duration
-
-var maxBuffers = 128
var bufs *bufferPool
// KeepError types.
@@ -121,132 +82,78 @@ var KeepVM VolumeManager
var pullq *WorkQueue
var trashq *WorkQueue
-type volumeSet []Volume
-
var (
flagSerializeIO bool
flagReadonly bool
- volumes volumeSet
)
-func (vs *volumeSet) String() string {
- return fmt.Sprintf("%+v", (*vs)[:])
-}
-
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
// permission arguments).
func main() {
- log.Println("keepstore starting, pid", os.Getpid())
- defer log.Println("keepstore exiting, pid", os.Getpid())
-
- var (
- dataManagerTokenFile string
- listen string
- blobSigningKeyFile string
- permissionTTLSec int
- pidfile string
- maxRequests int
- )
+ neverDelete := !theConfig.EnableDelete
+ signatureTTLSeconds := int(theConfig.BlobSignatureTTL.Duration() / time.Second)
+ flag.StringVar(&theConfig.Listen, "listen", theConfig.Listen, "see Listen configuration")
+ flag.IntVar(&theConfig.MaxBuffers, "max-buffers", theConfig.MaxBuffers, "see MaxBuffers configuration")
+ flag.IntVar(&theConfig.MaxRequests, "max-requests", theConfig.MaxRequests, "see MaxRequests configuration")
+ flag.BoolVar(&neverDelete, "never-delete", neverDelete, "see EnableDelete configuration")
+ flag.BoolVar(&theConfig.RequireSignatures, "enforce-permissions", theConfig.RequireSignatures, "see RequireSignatures configuration")
+ flag.StringVar(&theConfig.BlobSigningKeyFile, "permission-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&theConfig.BlobSigningKeyFile, "blob-signing-key-file", theConfig.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
+ flag.StringVar(&theConfig.SystemAuthTokenFile, "data-manager-token-file", theConfig.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
+ flag.IntVar(&signatureTTLSeconds, "permission-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.IntVar(&signatureTTLSeconds, "blob-signature-ttl", signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
+ flag.Var(&theConfig.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
+ flag.BoolVar(&flagSerializeIO, "serialize", false, "serialize read and write operations on the following volumes.")
+ flag.BoolVar(&flagReadonly, "readonly", false, "do not write, delete, or touch anything on the following volumes.")
+ flag.StringVar(&theConfig.PIDFile, "pid", theConfig.PIDFile, "see `PIDFile` configuration")
+ flag.Var(&theConfig.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
+
+ dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+
+ defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
+ var configPath string
flag.StringVar(
- &dataManagerTokenFile,
- "data-manager-token-file",
- "",
- "File with the API token used by the Data Manager. All DELETE "+
- "requests or GET /index requests must carry this token.")
- flag.BoolVar(
- &enforcePermissions,
- "enforce-permissions",
- false,
- "Enforce permission signatures on requests.")
- flag.StringVar(
- &listen,
- "listen",
- DefaultAddr,
- "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
- flag.IntVar(
- &maxRequests,
- "max-requests",
- 0,
- "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
- flag.BoolVar(
- &neverDelete,
- "never-delete",
- true,
- "If true, nothing will be deleted. "+
- "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
- "You should leave this option alone unless you can afford to lose data.")
- flag.StringVar(
- &blobSigningKeyFile,
- "permission-key-file",
- "",
- "Synonym for -blob-signing-key-file.")
- flag.StringVar(
- &blobSigningKeyFile,
- "blob-signing-key-file",
- "",
- "File containing the secret key for generating and verifying "+
- "blob permission signatures.")
- flag.IntVar(
- &permissionTTLSec,
- "permission-ttl",
- 0,
- "Synonym for -blob-signature-ttl.")
- flag.IntVar(
- &permissionTTLSec,
- "blob-signature-ttl",
- 2*7*24*3600,
- "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
- "See services/api/config/application.default.yml.")
- flag.BoolVar(
- &flagSerializeIO,
- "serialize",
- false,
- "Serialize read and write operations on the following volumes.")
- flag.BoolVar(
- &flagReadonly,
- "readonly",
- false,
- "Do not write, delete, or touch anything on the following volumes.")
- flag.StringVar(
- &pidfile,
- "pid",
- "",
- "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
- flag.IntVar(
- &maxBuffers,
- "max-buffers",
- maxBuffers,
- fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
- flag.DurationVar(
- &trashLifetime,
- "trash-lifetime",
- 0,
- "Time duration after a block is trashed during which it can be recovered using an /untrash request")
- flag.DurationVar(
- &trashCheckInterval,
- "trash-check-interval",
- 24*time.Hour,
- "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
-
+ &configPath,
+ "config",
+ defaultConfigPath,
+ "YAML or JSON configuration file `path`")
+ flag.Usage = usage
flag.Parse()
- if maxBuffers < 0 {
- log.Fatal("-max-buffers must be greater than zero.")
+ theConfig.BlobSignatureTTL = arvados.Duration(signatureTTLSeconds) * arvados.Duration(time.Second)
+ theConfig.EnableDelete = !neverDelete
+
+ // TODO: Load config
+ err := config.LoadFile(theConfig, configPath)
+ if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
+ log.Fatal(err)
}
- bufs = newBufferPool(maxBuffers, BlockSize)
- if pidfile != "" {
+ if *dumpConfig {
+ y, err := yaml.Marshal(theConfig)
+ if err != nil {
+ log.Fatal(err)
+ }
+ os.Stdout.Write(y)
+ os.Exit(0)
+ }
+
+ err = theConfig.Start()
+
+ if pidfile := theConfig.PIDFile; pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Fatalf("open pidfile (%s): %s", pidfile, err)
}
+ defer f.Close()
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
log.Fatalf("flock pidfile (%s): %s", pidfile, err)
}
+ defer os.Remove(pidfile)
err = f.Truncate(0)
if err != nil {
log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
@@ -259,74 +166,22 @@ func main() {
if err != nil {
log.Fatalf("sync pidfile (%s): %s", pidfile, err)
}
- defer f.Close()
- defer os.Remove(pidfile)
- }
-
- if len(volumes) == 0 {
- if (&unixVolumeAdder{&volumes}).Discover() == 0 {
- log.Fatal("No volumes found.")
- }
- }
-
- for _, v := range volumes {
- log.Printf("Using volume %v (writable=%v)", v, v.Writable())
- }
-
- // Initialize data manager token and permission key.
- // If these tokens are specified but cannot be read,
- // raise a fatal error.
- if dataManagerTokenFile != "" {
- if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
- dataManagerToken = strings.TrimSpace(string(buf))
- } else {
- log.Fatalf("reading data manager token: %s\n", err)
- }
- }
-
- if neverDelete != true {
- log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
- "been extensively tested. You should leave this option alone unless you can afford to lose data.")
- }
-
- if blobSigningKeyFile != "" {
- if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
- PermissionSecret = bytes.TrimSpace(buf)
- } else {
- log.Fatalf("reading permission key: %s\n", err)
- }
- }
-
- blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
-
- if PermissionSecret == nil {
- if enforcePermissions {
- log.Fatal("-enforce-permissions requires a permission key")
- } else {
- log.Println("Running without a PermissionSecret. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, use the -blob-signing-key-file flag " +
- "to specify the file containing the permission key.")
- }
}
- if maxRequests <= 0 {
- maxRequests = maxBuffers * 2
- log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
- }
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(volumes)
+ KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, maxRequests limiter, method handlers
+ // Middleware stack: logger, MaxRequests limiter, method handlers
http.Handle("/", &LoggingRESTRouter{
- httpserver.NewRequestLimiter(maxRequests,
+ httpserver.NewRequestLimiter(theConfig.MaxRequests,
MakeRESTRouter()),
})
// Set up a TCP listener.
- listener, err := net.Listen("tcp", listen)
+ listener, err := net.Listen("tcp", theConfig.Listen)
if err != nil {
log.Fatal(err)
}
@@ -348,7 +203,7 @@ func main() {
// Start emptyTrash goroutine
doneEmptyingTrash := make(chan bool)
- go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+ go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
@@ -362,24 +217,24 @@ func main() {
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- log.Println("listening at", listen)
- srv := &http.Server{Addr: listen}
+ log.Println("listening at", listener.Addr)
+ srv := &http.Server{}
srv.Serve(listener)
}
-// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
- ticker := time.NewTicker(trashCheckInterval)
+// Periodically (once per interval) invoke EmptyTrash on all volumes.
+func emptyTrash(done <-chan bool, interval time.Duration) {
+ ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
- for _, v := range volumes {
+ for _, v := range theConfig.Volumes {
if v.Writable() {
v.EmptyTrash()
}
}
- case <-doneEmptyingTrash:
+ case <-done:
ticker.Stop()
return
}
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index c0adbc0..dc6af0f 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -341,23 +341,23 @@ func TestDiscoverTmpfs(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
- if added != len(resultVols) {
+ if added != len(cfg.Volumes) {
t.Errorf("Discover returned %d, but added %d volumes",
- added, len(resultVols))
+ added, len(cfg.Volumes))
}
if added != len(tempVols) {
t.Errorf("Discover returned %d but we set up %d volumes",
added, len(tempVols))
}
for i, tmpdir := range tempVols {
- if tmpdir != resultVols[i].(*UnixVolume).root {
+ if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
t.Errorf("Discover returned %s, expected %s\n",
- resultVols[i].(*UnixVolume).root, tmpdir)
+ cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
}
- if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
+ if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
t.Errorf("Discover added %s with readonly=%v, should be %v",
tmpdir, !expectReadonly, expectReadonly)
}
@@ -381,10 +381,10 @@ func TestDiscoverNone(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- resultVols := volumeSet{}
- added := (&unixVolumeAdder{&resultVols}).Discover()
- if added != 0 || len(resultVols) != 0 {
- t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
+ cfg := &Config{}
+ added := (&unixVolumeAdder{cfg}).Discover()
+ if added != 0 || len(cfg.Volumes) != 0 {
+ t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
}
}
@@ -442,8 +442,8 @@ func MakeTestVolumeManager(numVolumes int) VolumeManager {
// teardown cleans up after each test.
func teardown() {
- dataManagerToken = ""
- enforcePermissions = false
- PermissionSecret = nil
+ theConfig.systemAuthToken = ""
+ theConfig.RequireSignatures = false
+ theConfig.blobSigningKey = nil
KeepVM = nil
}
diff --git a/services/keepstore/perms.go b/services/keepstore/perms.go
index 9cd97bd..38445d9 100644
--- a/services/keepstore/perms.go
+++ b/services/keepstore/perms.go
@@ -5,15 +5,10 @@ import (
"time"
)
-// The PermissionSecret is the secret key used to generate SHA1
-// digests for permission hints. apiserver and Keep must use the same
-// key.
-var PermissionSecret []byte
-
// SignLocator takes a blobLocator, an apiToken and an expiry time, and
// returns a signed locator string.
func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
- return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
+ return keepclient.SignLocator(blobLocator, apiToken, expiry, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
}
// VerifySignature returns nil if the signature on the signedLocator
@@ -22,7 +17,7 @@ func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
// something the client could have figured out independently) or
// PermissionError.
func VerifySignature(signedLocator, apiToken string) error {
- err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
+ err := keepclient.VerifySignature(signedLocator, apiToken, theConfig.BlobSignatureTTL.Duration(), theConfig.blobSigningKey)
if err == keepclient.ErrSignatureExpired {
return ExpiredError
} else if err != nil {
diff --git a/services/keepstore/perms_test.go b/services/keepstore/perms_test.go
index 43717b2..8e47e4a 100644
--- a/services/keepstore/perms_test.go
+++ b/services/keepstore/perms_test.go
@@ -4,6 +4,8 @@ import (
"strconv"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
const (
@@ -17,7 +19,7 @@ const (
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- knownSignatureTTL = 1209600 * time.Second
+ knownSignatureTTL = arvados.Duration(24 * 14 * time.Hour)
knownSignature = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
knownTimestamp = "7fffffff"
knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
@@ -26,8 +28,8 @@ const (
func TestSignLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
tsInt, err := strconv.ParseInt(knownTimestamp, 16, 0)
if err != nil {
@@ -35,33 +37,33 @@ func TestSignLocator(t *testing.T) {
}
t0 := time.Unix(tsInt, 0)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if x := SignLocator(knownLocator, knownToken, t0); x == knownSignedLocator {
- t.Fatalf("Got same signature %+q, even though PermissionSecret changed", x)
+ t.Fatalf("Got same signature %+q, even though blobSigningKey changed", x)
}
}
func TestVerifyLocator(t *testing.T) {
defer func(b []byte) {
- PermissionSecret = b
- }(PermissionSecret)
+ theConfig.blobSigningKey = b
+ }(theConfig.blobSigningKey)
- blobSignatureTTL = knownSignatureTTL
+ theConfig.BlobSignatureTTL = knownSignatureTTL
- PermissionSecret = []byte(knownKey)
+ theConfig.blobSigningKey = []byte(knownKey)
if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
t.Fatal(err)
}
- PermissionSecret = []byte("arbitrarykey")
+ theConfig.blobSigningKey = []byte("arbitrarykey")
if err := VerifySignature(knownSignedLocator, knownToken); err == nil {
- t.Fatal("Verified signature even with wrong PermissionSecret")
+ t.Fatal("Verified signature even with wrong blobSigningKey")
}
}
diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go
index 4d85d5f..43a6de6 100644
--- a/services/keepstore/pull_worker_test.go
+++ b/services/keepstore/pull_worker_test.go
@@ -84,10 +84,10 @@ type PullWorkerTestData struct {
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello",
@@ -101,10 +101,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_locators(c *C) {
func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola",
@@ -118,10 +118,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_one_locator(c *C) {
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "unused",
@@ -135,10 +135,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_one_locator(c *C) {
func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_get_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "unused",
@@ -152,10 +152,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_get_two_locators(c *C) {
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_one_locator",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hello hello",
@@ -169,10 +169,10 @@ func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_one_locator(c *C) {
func (s *PullWorkerTestSuite) TestPullWorker_error_on_put_two_locators(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorker_error_on_put_two_locators",
- req: RequestTester{"/pull", dataManagerToken, "PUT", firstPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", firstPullList},
responseCode: http.StatusOK,
responseBody: "Received 2 pull requests\n",
readContent: "hello again",
@@ -195,10 +195,10 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
pullq.ReplaceQueue(makeTestWorkList(firstInput))
testPullLists["Added_before_actual_test_item"] = string(1)
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_items_latest_replacing_old",
- req: RequestTester{"/pull", dataManagerToken, "PUT", secondPullList},
+ req: RequestTester{"/pull", theConfig.systemAuthToken, "PUT", secondPullList},
responseCode: http.StatusOK,
responseBody: "Received 1 pull requests\n",
readContent: "hola de nuevo",
@@ -210,14 +210,14 @@ func (s *PullWorkerTestSuite) TestPullWorkerPullList_with_two_items_latest_repla
}
// In this case, the item will not be placed on pullq
-func (s *PullWorkerTestSuite) TestPullWorker_invalid_dataManagerToken(c *C) {
+func (s *PullWorkerTestSuite) TestPullWorker_invalidToken(c *C) {
defer teardown()
- dataManagerToken = "DATA MANAGER TOKEN"
+ theConfig.systemAuthToken = "DATA MANAGER TOKEN"
testData := PullWorkerTestData{
name: "TestPullWorkerPullList_with_two_locators",
- req: RequestTester{"/pull", "invalid_dataManagerToken", "PUT", firstPullList},
+ req: RequestTester{"/pull", "invalidToken", "PUT", firstPullList},
responseCode: http.StatusUnauthorized,
responseBody: "Unauthorized\n",
readContent: "hello",
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 1a2a47b..7f2c3f0 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -11,8 +11,10 @@ import (
"os"
"regexp"
"strings"
+ "sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
)
@@ -39,7 +41,12 @@ const (
)
type s3VolumeAdder struct {
- *volumeSet
+ *Config
+}
+
+// String implements flag.Value
+func (s *s3VolumeAdder) String() string {
+ return "-"
}
func (s *s3VolumeAdder) Set(bucketName string) error {
@@ -49,39 +56,21 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
}
- region, ok := aws.Regions[s3RegionName]
- if s3Endpoint == "" {
- if !ok {
- return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
- }
- } else {
- if ok {
- return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
- }
- region = aws.Region{
- Name: s3RegionName,
- S3Endpoint: s3Endpoint,
- }
- }
- var err error
- var auth aws.Auth
- auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
- if err != nil {
- return err
- }
- auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
- if err != nil {
- return err
- }
if flagSerializeIO {
log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
}
- v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
- if err := v.Check(); err != nil {
- return err
- }
- *s.volumeSet = append(*s.volumeSet, v)
+ s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
+ Bucket: bucketName,
+ AccessKeyFile: s3AccessKeyFile,
+ SecretKeyFile: s3SecretKeyFile,
+ Endpoint: s3Endpoint,
+ Region: s3RegionName,
+ RaceWindow: arvados.Duration(s3RaceWindow),
+ S3Replication: s3Replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: flagReadonly,
+ IndexPageSize: 1000,
+ })
return nil
}
@@ -93,7 +82,9 @@ func s3regions() (okList []string) {
}
func init() {
- flag.Var(&s3VolumeAdder{&volumes},
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
+
+ flag.Var(&s3VolumeAdder{theConfig},
"s3-bucket-volume",
"Use the given bucket as a storage volume. Can be given multiple times.")
flag.StringVar(
@@ -110,12 +101,12 @@ func init() {
&s3AccessKeyFile,
"s3-access-key-file",
"",
- "File containing the access key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
flag.StringVar(
&s3SecretKeyFile,
"s3-secret-key-file",
"",
- "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
+ "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
flag.DurationVar(
&s3RaceWindow,
"s3-race-window",
@@ -135,32 +126,87 @@ func init() {
// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
- *s3.Bucket
- raceWindow time.Duration
- readonly bool
- replication int
- indexPageSize int
-}
-
-// NewS3Volume returns a new S3Volume using the given auth, region,
-// and bucket name. The replication argument specifies the replication
-// level to report when writing data.
-func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
- return &S3Volume{
- Bucket: &s3.Bucket{
- S3: s3.New(auth, region),
- Name: bucket,
+ AccessKeyFile string
+ SecretKeyFile string
+ Endpoint string
+ Region string
+ Bucket string
+ LocationConstraint bool
+ IndexPageSize int
+ S3Replication int
+ RaceWindow arvados.Duration
+ ReadOnly bool
+ UnsafeDelete bool
+
+ bucket *s3.Bucket
+
+ startOnce sync.Once
+}
+
+// Examples implements VolumeWithExamples.
+func (*S3Volume) Examples() []Volume {
+ return []Volume{
+ &S3Volume{
+ AccessKeyFile: "/etc/aws_s3_access_key.txt",
+ SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+ Endpoint: "",
+ Region: "us-east-1",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ },
+ &S3Volume{
+ AccessKeyFile: "/etc/gce_s3_access_key.txt",
+ SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
},
- raceWindow: raceWindow,
- readonly: readonly,
- replication: replication,
- indexPageSize: 1000,
}
}
-// Check returns an error if the volume is inaccessible (e.g., config
-// error).
-func (v *S3Volume) Check() error {
+// Type implements Volume.
+func (*S3Volume) Type() string {
+ return "S3"
+}
+
+// Start populates private fields and verifies the configuration is
+// valid.
+func (v *S3Volume) Start() error {
+ region, ok := aws.Regions[v.Region]
+ if v.Endpoint == "" {
+ if !ok {
+ return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+ }
+ } else if ok {
+ return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
+ "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+ } else {
+ region = aws.Region{
+ Name: v.Region,
+ S3Endpoint: v.Endpoint,
+ S3LocationConstraint: v.LocationConstraint,
+ }
+ }
+
+ var err error
+ var auth aws.Auth
+ auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
+ if err != nil {
+ return err
+ }
+ auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
+ if err != nil {
+ return err
+ }
+ v.bucket = &s3.Bucket{
+ S3: s3.New(auth, region),
+ Name: v.Bucket,
+ }
return nil
}
@@ -170,12 +216,12 @@ func (v *S3Volume) Check() error {
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.Bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
@@ -186,7 +232,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
err = os.ErrNotExist
return
}
- rdr, err = v.Bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(loc)
if err != nil {
log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
@@ -223,7 +269,7 @@ func (v *S3Volume) Compare(loc string, expect []byte) error {
// Put writes a block.
func (v *S3Volume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
@@ -234,20 +280,20 @@ func (v *S3Volume) Put(loc string, block []byte) error {
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
if err != nil {
return v.translateError(err)
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
func (v *S3Volume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) && v.fixRace(loc) {
// The data object got trashed in a race, but fixRace
@@ -255,27 +301,27 @@ func (v *S3Volume) Touch(loc string) error {
} else if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.Bucket.Head(loc, nil)
+ _, err := v.bucket.Head(loc, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.Bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("error: creating %q: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
}
log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.Bucket.Head("recent/"+loc, nil)
+ resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
return zeroTime, v.translateError(err)
@@ -292,14 +338,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
recentL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "recent/" + prefix,
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
if data.Key >= "g" {
@@ -346,19 +392,19 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Trash a Keep block.
func (v *S3Volume) Trash(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if t, err := v.Mtime(loc); err != nil {
return err
- } else if time.Since(t) < blobSignatureTTL {
+ } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
if !s3UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.Bucket.Del(loc)
+ return v.bucket.Del(loc)
}
err := v.checkRaceWindow(loc)
if err != nil {
@@ -368,13 +414,13 @@ func (v *S3Volume) Trash(loc string) error {
if err != nil {
return err
}
- return v.translateError(v.Bucket.Del(loc))
+ return v.translateError(v.bucket.Del(loc))
}
// checkRaceWindow returns a non-nil error if trash/loc is, or might
// be, in the race window (i.e., it's not safe to trash loc).
func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.Bucket.Head("trash/"+loc, nil)
+ resp, err := v.bucket.Head("trash/"+loc, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
@@ -390,7 +436,7 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
// Can't parse timestamp
return err
}
- safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
+ safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
if safeWindow <= 0 {
// We can't count on "touch trash/X" to prolong
// trash/X's lifetime. The new timestamp might not
@@ -408,10 +454,10 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
// (PutCopy returns 200 OK if the request was received, even if the
// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
- resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+ resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
MetadataDirective: "REPLACE",
- }, v.Bucket.Name+"/"+src)
+ }, v.bucket.Name+"/"+src)
err = v.translateError(err)
if err != nil {
return err
@@ -446,7 +492,7 @@ func (v *S3Volume) Untrash(loc string) error {
if err != nil {
return err
}
- err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
@@ -463,19 +509,19 @@ func (v *S3Volume) Status() *VolumeStatus {
// String implements fmt.Stringer.
func (v *S3Volume) String() string {
- return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
+ return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *S3Volume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the storage redundancy of the underlying
// device. Configured via command line flag.
func (v *S3Volume) Replication() int {
- return v.replication
+ return v.S3Replication
}
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
@@ -489,7 +535,7 @@ func (v *S3Volume) isKeepBlock(s string) bool {
// there was a race between Put and Trash, fixRace recovers from the
// race by Untrashing the block.
func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.Bucket.Head("trash/"+loc, nil)
+ trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
@@ -502,7 +548,7 @@ func (v *S3Volume) fixRace(loc string) bool {
return false
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
return false
@@ -514,13 +560,13 @@ func (v *S3Volume) fixRace(loc string) bool {
}
ageWhenTrashed := trashTime.Sub(recentTime)
- if ageWhenTrashed >= blobSignatureTTL {
+ if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
// No evidence of a race: block hasn't been written
// since it became eligible for Trash. No fix needed.
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
+ log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
@@ -545,16 +591,16 @@ func (v *S3Volume) translateError(err error) error {
return err
}
-// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
func (v *S3Volume) EmptyTrash() {
var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
- Bucket: v.Bucket,
+ Bucket: v.bucket,
Prefix: "trash/",
- PageSize: v.indexPageSize,
+ PageSize: v.IndexPageSize,
}
// Define "ready to delete" as "...when EmptyTrash started".
startT := time.Now()
@@ -571,7 +617,7 @@ func (v *S3Volume) EmptyTrash() {
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
continue
}
- recent, err := v.Bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
@@ -588,21 +634,21 @@ func (v *S3Volume) EmptyTrash() {
log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
continue
}
- if trashT.Sub(recentT) < blobSignatureTTL {
- if age := startT.Sub(recentT); age >= blobSignatureTTL-v.raceWindow {
+ if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
+ if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
// recent/loc is too old to protect
// loc from being Trashed again during
// the raceWindow that starts if we
// delete trash/X now.
//
- // Note this means (trashCheckInterval
- // < blobSignatureTTL - raceWindow) is
+ // Note this means (TrashCheckInterval
+ // < BlobSignatureTTL - raceWindow) is
// necessary to avoid starvation.
log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
v.fixRace(loc)
v.Touch(loc)
continue
- } else if _, err := v.Bucket.Head(loc, nil); os.IsNotExist(err) {
+ } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
v.fixRace(loc)
continue
@@ -611,10 +657,10 @@ func (v *S3Volume) EmptyTrash() {
continue
}
}
- if startT.Sub(trashT) < trashLifetime {
+ if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
continue
}
- err = v.Bucket.Del(trash.Key)
+ err = v.bucket.Del(trash.Key)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
continue
@@ -622,9 +668,9 @@ func (v *S3Volume) EmptyTrash() {
bytesDeleted += trash.Size
blocksDeleted++
- _, err = v.Bucket.Head(loc, nil)
+ _, err = v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- err = v.Bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + loc)
if err != nil {
log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 6ba3904..76dcbc9 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -4,23 +4,17 @@ import (
"bytes"
"crypto/md5"
"fmt"
+ "io/ioutil"
"log"
"os"
"time"
- "github.com/AdRoll/goamz/aws"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
check "gopkg.in/check.v1"
)
-type TestableS3Volume struct {
- *S3Volume
- server *s3test.Server
- c *check.C
- serverClock *fakeClock
-}
-
const (
TestBucketName = "testbucket"
)
@@ -42,30 +36,6 @@ func init() {
s3UnsafeDelete = true
}
-func NewTestableS3Volume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
- clock := &fakeClock{}
- srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
- c.Assert(err, check.IsNil)
- auth := aws.Auth{}
- region := aws.Region{
- Name: "test-region-1",
- S3Endpoint: srv.URL(),
- S3LocationConstraint: true,
- }
- bucket := &s3.Bucket{
- S3: s3.New(auth, region),
- Name: TestBucketName,
- }
- err = bucket.PutBucket(s3.ACL("private"))
- c.Assert(err, check.IsNil)
-
- return &TestableS3Volume{
- S3Volume: NewS3Volume(auth, region, TestBucketName, raceWindow, readonly, replication),
- server: srv,
- serverClock: clock,
- }
-}
-
var _ = check.Suite(&StubbedS3Suite{})
type StubbedS3Suite struct {
@@ -76,19 +46,19 @@ func (s *StubbedS3Suite) TestGeneric(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
// Use a negative raceWindow so s3test's 1-second
// timestamp precision doesn't confuse fixRace.
- return NewTestableS3Volume(c, -2*time.Second, false, 2)
+ return s.newTestableVolume(c, -2*time.Second, false, 2)
})
}
func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
DoGenericVolumeTests(c, func(t TB) TestableVolume {
- return NewTestableS3Volume(c, -2*time.Second, true, 2)
+ return s.newTestableVolume(c, -2*time.Second, true, 2)
})
}
func (s *StubbedS3Suite) TestIndex(c *check.C) {
- v := NewTestableS3Volume(c, 0, false, 2)
- v.indexPageSize = 3
+ v := s.newTestableVolume(c, 0, false, 2)
+ v.IndexPageSize = 3
for i := 0; i < 256; i++ {
v.PutRaw(fmt.Sprintf("%02x%030x", i, i), []byte{102, 111, 111})
}
@@ -112,14 +82,14 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
}
func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
- defer func(tl, bs time.Duration) {
- trashLifetime = tl
- blobSignatureTTL = bs
- }(trashLifetime, blobSignatureTTL)
- trashLifetime = time.Hour
- blobSignatureTTL = time.Hour
+ defer func(tl, bs arvados.Duration) {
+ theConfig.TrashLifetime = tl
+ theConfig.BlobSignatureTTL = bs
+ }(theConfig.TrashLifetime, theConfig.BlobSignatureTTL)
+ theConfig.TrashLifetime.Set("1h")
+ theConfig.BlobSignatureTTL.Set("1h")
- v := NewTestableS3Volume(c, 5*time.Minute, false, 2)
+ v := s.newTestableVolume(c, 5*time.Minute, false, 2)
var none time.Time
putS3Obj := func(t time.Time, key string, data []byte) {
@@ -127,7 +97,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
return
}
v.serverClock.now = &t
- v.Bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
}
t0 := time.Now()
@@ -214,12 +184,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
false, false, false, true, false, false,
},
{
- "Erroneously trashed during a race, detected before trashLifetime",
+ "Erroneously trashed during a race, detected before TrashLifetime",
none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
true, false, true, true, true, false,
},
{
- "Erroneously trashed during a race, rescue during EmptyTrash despite reaching trashLifetime",
+ "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
true, false, true, true, true, false,
},
@@ -286,7 +256,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
// freshAfterEmpty
loc, blk = setupScenario()
v.EmptyTrash()
- _, err = v.Bucket.Head("trash/"+loc, nil)
+ _, err = v.bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
if scenario.freshAfterEmpty {
t, err := v.Mtime(loc)
@@ -307,9 +277,51 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
}
}
+type TestableS3Volume struct {
+ *S3Volume
+ server *s3test.Server
+ c *check.C
+ serverClock *fakeClock
+}
+
+func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration, readonly bool, replication int) *TestableS3Volume {
+ clock := &fakeClock{}
+ srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
+ c.Assert(err, check.IsNil)
+
+ tmp, err := ioutil.TempFile("", "keepstore")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(tmp.Name())
+ _, err = tmp.Write([]byte("xxx\n"))
+ c.Assert(err, check.IsNil)
+ c.Assert(tmp.Close(), check.IsNil)
+
+ v := &TestableS3Volume{
+ S3Volume: &S3Volume{
+ Bucket: TestBucketName,
+ AccessKeyFile: tmp.Name(),
+ SecretKeyFile: tmp.Name(),
+ Endpoint: srv.URL(),
+ Region: "test-region-1",
+ LocationConstraint: true,
+ RaceWindow: arvados.Duration(raceWindow),
+ S3Replication: replication,
+ UnsafeDelete: s3UnsafeDelete,
+ ReadOnly: readonly,
+ IndexPageSize: 1000,
+ },
+ server: srv,
+ serverClock: clock,
+ }
+ c.Assert(v.Start(), check.IsNil)
+ err = v.bucket.PutBucket(s3.ACL("private"))
+ c.Assert(err, check.IsNil)
+ return v
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("PutRaw: %+v", err)
}
@@ -320,7 +332,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.Bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
panic(err)
}
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index d11bc05..27d6216 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -4,6 +4,8 @@ import (
"errors"
"log"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -23,13 +25,13 @@ func RunTrashWorker(trashq *WorkQueue) {
// TrashItem deletes the indicated block from every writable volume.
func TrashItem(trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
- if time.Since(reqMtime) < blobSignatureTTL {
+ if time.Since(reqMtime) < theConfig.BlobSignatureTTL.Duration() {
log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
- time.Since(reqMtime),
+ arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
reqMtime,
- blobSignatureTTL)
+ theConfig.BlobSignatureTTL)
return
}
@@ -44,8 +46,8 @@ func TrashItem(trashRequest TrashRequest) {
continue
}
- if neverDelete {
- err = errors.New("did not delete block because neverDelete is true")
+ if !theConfig.EnableDelete {
+ err = errors.New("did not delete block because EnableDelete is false")
} else {
err = volume.Trash(trashRequest.Locator)
}
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index 94798d9..5ec413d 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -31,7 +31,7 @@ type TrashWorkerTestData struct {
Expect no errors.
*/
func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
@@ -53,7 +53,7 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
Expect the second locator in volume 2 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -75,7 +75,7 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
Expect the first locator in volume 1 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -97,7 +97,7 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
Expect locator to be deleted from both volumes.
*/
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -119,7 +119,7 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
Delete the second and expect the first to be still around.
*/
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -143,7 +143,7 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
Expect the other unaffected.
*/
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -166,7 +166,7 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
will not be deleted because its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
- neverDelete = false
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -188,11 +188,11 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
performTrashWorkerTest(testData, t)
}
-/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
so block won't be deleted.
*/
-func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
- neverDelete = true
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+ theConfig.EnableDelete = false
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
@@ -231,7 +231,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
}
}
- oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+ oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
// Create TrashRequest for the test
trashRequest := TrashRequest{
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
new file mode 100644
index 0000000..748ef38
--- /dev/null
+++ b/services/keepstore/usage.go
@@ -0,0 +1,114 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+
+ "github.com/ghodss/yaml"
+)
+
+func usage() {
+ c := DefaultConfig()
+ for _, vt := range VolumeTypes {
+ c.Volumes = append(c.Volumes, vt().Examples()...)
+ }
+ exampleConfigFile, err := yaml.Marshal(c)
+ if err != nil {
+ panic(err)
+ }
+ fmt.Fprintf(os.Stderr, `
+
+keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
+
+Usage: keepstore -config path/to/keepstore.yml
+ keepstore [OPTIONS] -dump-config
+
+NOTE: All options (other than -config) are deprecated in favor of YAML
+ configuration. Use -dump-config to translate existing
+ configurations to YAML format.
+
+Options:
+`)
+ flag.PrintDefaults()
+ fmt.Fprintf(os.Stderr, `
+Example config file:
+
+%s
+
+Listen:
+
+ Local port to listen on. Can be "address", "address:port", or
+ ":port", where "address" is a host IP address or name and "port"
+ is a port number or name.
+
+PIDFile:
+
+ Path to write PID file during startup. This file is kept open and
+ locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
+ one way to shut down. Exit immediately if there is an error
+ opening, locking, or writing the PID file.
+
+MaxBuffers:
+
+ Maximum RAM to use for data buffers, given in multiples of block
+ size (64 MiB). When this limit is reached, HTTP requests requiring
+ buffers (like GET and PUT) will wait for buffer space to be
+ released.
+
+MaxRequests:
+
+ Maximum concurrent requests. When this limit is reached, new
+ requests will receive 503 responses. Note: this limit does not
+ include idle connections from clients using HTTP keepalive, so it
+ does not strictly limit the number of concurrent connections. If
+ omitted or zero, the default is 2 * MaxBuffers.
+
+BlobSigningKeyFile:
+
+ Local file containing the secret blob signing key (used to
+ generate and verify blob signatures). This key should be
+ identical to the API server's blob_signing_key configuration
+ entry.
+
+RequireSignatures:
+
+ Honor read requests only if a valid signature is provided. This
+ should be true, except for development use and when migrating from
+ a very old version.
+
+BlobSignatureTTL:
+
+ Duration for which new permission signatures (returned in PUT
+ responses) will be valid. This should be equal to the API
+ server's blob_signature_ttl configuration entry.
+
+SystemAuthTokenFile:
+
+ Local file containing the Arvados API token used by keep-balance
+ or data manager. Delete, trash, and index requests are honored
+ only for this token.
+
+EnableDelete:
+
+ Enable trash and delete features. If false, trash lists will be
+ accepted but blocks will not be trashed or deleted.
+
+TrashLifetime:
+
+ Time duration after a block is trashed during which it can be
+ recovered using an /untrash request.
+
+TrashCheckInterval:
+
+ How often to check for (and delete) trashed blocks whose
+ TrashLifetime has expired.
+
+Volumes:
+
+ List of storage volumes. If omitted or empty, the default is to
+ use all directories named "keep" that exist in the top level
+ directory of a mount point at startup time.
+
+`, exampleConfigFile)
+}
diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go
index 8ae6660..6e01e75 100644
--- a/services/keepstore/volume.go
+++ b/services/keepstore/volume.go
@@ -10,6 +10,15 @@ import (
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
+ // Volume type as specified in config file. Examples: "S3",
+ // "Directory".
+ Type() string
+
+ // Do whatever private setup tasks and configuration checks
+ // are needed. Return non-nil if the volume is unusable (e.g.,
+ // invalid config).
+ Start() error
+
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
//
@@ -150,7 +159,7 @@ type Volume interface {
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // blobSignatureTTL, Trash must not trash the data.
+ // BlobSignatureTTL, Trash must not trash the data.
//
// If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
@@ -171,7 +180,7 @@ type Volume interface {
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be trashed for at least blobSignatureTTL
+ // will not be trashed for at least BlobSignatureTTL
// seconds.
Trash(loc string) error
@@ -204,11 +213,18 @@ type Volume interface {
// responses to PUT requests.
Replication() int
- // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+ // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
EmptyTrash()
}
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+ Volume
+ Examples() []Volume
+}
+
// A VolumeManager tells callers which volumes can read, which volumes
// can write, and on which volume the next write should be attempted.
type VolumeManager interface {
diff --git a/services/keepstore/volume_generic_test.go b/services/keepstore/volume_generic_test.go
index bc3e537..1738fe9 100644
--- a/services/keepstore/volume_generic_test.go
+++ b/services/keepstore/volume_generic_test.go
@@ -11,6 +11,7 @@ import (
"strings"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
@@ -430,7 +431,7 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
@@ -451,19 +452,19 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
}
// Calling Delete() for a block with a timestamp older than
-// blobSignatureTTL seconds in the past should delete the data.
+// BlobSignatureTTL seconds in the past should delete the data.
// Test is intended for only writable volumes
func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- blobSignatureTTL = 300 * time.Second
+ theConfig.BlobSignatureTTL.Set("5m")
if v.Writable() == false {
return
}
v.Put(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
t.Error(err)
@@ -733,7 +734,7 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
}
}
-// With trashLifetime != 0, perform:
+// With TrashLifetime != 0, perform:
// Trash an old block - which either raises ErrNotImplemented or succeeds
// Untrash - which either raises ErrNotImplemented or succeeds
// Get - which must succeed
@@ -741,14 +742,14 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
defer func() {
- trashLifetime = 0
+ theConfig.TrashLifetime = 0
}()
- trashLifetime = 3600 * time.Second
+ theConfig.TrashLifetime.Set("1h")
// put block and backdate it
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
n, err := v.Get(TestHash, buf)
@@ -795,9 +796,9 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
v := factory(t)
defer v.Teardown()
- defer func(orig time.Duration) {
- trashLifetime = orig
- }(trashLifetime)
+ defer func(orig arvados.Duration) {
+ theConfig.TrashLifetime = orig
+ }(theConfig.TrashLifetime)
checkGet := func() error {
buf := make([]byte, BlockSize)
@@ -830,10 +831,10 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// First set: EmptyTrash before reaching the trash deadline.
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err := checkGet()
if err != nil {
@@ -844,7 +845,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
err = v.Trash(TestHash)
if err == MethodDisabledError || err == ErrNotImplemented {
// Skip the trash tests for read-only volumes, and
- // volume types that don't support trashLifetime>0.
+ // volume types that don't support TrashLifetime>0.
return
}
@@ -878,7 +879,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
}
// Because we Touch'ed, need to backdate again for next set of tests
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// If the only block in the trash has already been untrashed,
// most volumes will fail a subsequent Untrash with a 404, but
@@ -896,11 +897,11 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
}
// Untrash might have updated the timestamp, so backdate again
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// Second set: EmptyTrash after the trash deadline has passed.
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
@@ -925,7 +926,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// Trash it again, and this time call EmptyTrash so it really
// goes away.
// (In Azure volumes, un/trash changes Mtime, so first backdate again)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
err = v.Trash(TestHash)
err = checkGet()
if err == nil || !os.IsNotExist(err) {
@@ -950,9 +951,9 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// un-trashed copy doesn't get deleted along with it.
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
@@ -963,7 +964,7 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
// EmptyTrash should not delete the untrashed copy.
v.EmptyTrash()
@@ -978,18 +979,18 @@ func testTrashEmptyTrashUntrash(t TB, factory TestableVolumeFactory) {
// untrash the block whose deadline is "C".
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Nanosecond
+ theConfig.TrashLifetime.Set("1ns")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
}
v.PutRaw(TestHash, TestBlock)
- v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+ v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- trashLifetime = time.Hour
+ theConfig.TrashLifetime.Set("1h")
err = v.Trash(TestHash)
if err != nil {
t.Fatal(err)
diff --git a/services/keepstore/volume_test.go b/services/keepstore/volume_test.go
index 5671b8d..6ab386a 100644
--- a/services/keepstore/volume_test.go
+++ b/services/keepstore/volume_test.go
@@ -189,7 +189,7 @@ func (v *MockVolume) Trash(loc string) error {
return MethodDisabledError
}
if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < blobSignatureTTL {
+ if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
delete(v.Store, loc)
@@ -198,7 +198,14 @@ func (v *MockVolume) Trash(loc string) error {
return os.ErrNotExist
}
-// TBD
+func (v *MockVolume) Type() string {
+ return "Mock"
+}
+
+func (v *MockVolume) Start() error {
+ return nil
+}
+
func (v *MockVolume) Untrash(loc string) error {
return nil
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 5982fb0..1866c7b 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -2,7 +2,6 @@ package main
import (
"bufio"
- "errors"
"flag"
"fmt"
"io"
@@ -19,11 +18,16 @@ import (
)
type unixVolumeAdder struct {
- *volumeSet
+ *Config
}
-func (vs *unixVolumeAdder) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
+// String implements flag.Value
+func (s *unixVolumeAdder) String() string {
+ return "-"
+}
+
+func (vs *unixVolumeAdder) Set(path string) error {
+ if dirs := strings.Split(path, ","); len(dirs) > 1 {
log.Print("DEPRECATED: using comma-separated volume list.")
for _, dir := range dirs {
if err := vs.Set(dir); err != nil {
@@ -32,33 +36,19 @@ func (vs *unixVolumeAdder) Set(value string) error {
}
return nil
}
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
+ vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
+ Root: path,
+ ReadOnly: flagReadonly,
+ Serialize: flagSerializeIO,
})
return nil
}
func init() {
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &unixVolumeAdder{&volumes},
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+ VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
+
+ flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
+ flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
}
// Discover adds a UnixVolume for every directory named "keep" that is
@@ -111,17 +101,49 @@ func (vs *unixVolumeAdder) Discover() int {
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
- // path to the volume's root directory
- root string
+ Root string // path to the volume's root directory
+ ReadOnly bool
+ Serialize bool
+
// something to lock during IO, typically a sync.Mutex (or nil
// to skip locking)
- locker sync.Locker
- readonly bool
+ locker sync.Locker
+}
+
+// Examples implements VolumeWithExamples.
+func (*UnixVolume) Examples() []Volume {
+ return []Volume{
+ &UnixVolume{
+ Root: "/mnt/local-disk",
+ Serialize: true,
+ },
+ &UnixVolume{
+ Root: "/mnt/network-disk",
+ Serialize: false,
+ },
+ }
+}
+
+// Type implements Volume
+func (v *UnixVolume) Type() string {
+ return "Directory"
+}
+
+// Start implements Volume
+func (v *UnixVolume) Start() error {
+ if v.Serialize {
+ v.locker = &sync.Mutex{}
+ }
+ if !strings.HasPrefix(v.Root, "/") {
+ return fmt.Errorf("volume root does not start with '/': %q", v.Root)
+ }
+ _, err := os.Stat(v.Root)
+ return err
}
// Touch sets the timestamp for the given locator to the current time
func (v *UnixVolume) Touch(loc string) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
p := v.blockPath(loc)
@@ -218,7 +240,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(loc string, block []byte) error {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.IsFull() {
@@ -268,14 +290,14 @@ func (v *UnixVolume) Status() *VolumeStatus {
var fs syscall.Statfs_t
var devnum uint64
- if fi, err := os.Stat(v.root); err == nil {
+ if fi, err := os.Stat(v.Root); err == nil {
devnum = fi.Sys().(*syscall.Stat_t).Dev
} else {
log.Printf("%s: os.Stat: %s\n", v, err)
return nil
}
- err := syscall.Statfs(v.root, &fs)
+ err := syscall.Statfs(v.Root, &fs)
if err != nil {
log.Printf("%s: statfs: %s\n", v, err)
return nil
@@ -285,7 +307,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{v.root, devnum, free, used}
+ return &VolumeStatus{v.Root, devnum, free, used}
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -307,7 +329,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
var lastErr error
- rootdir, err := os.Open(v.root)
+ rootdir, err := os.Open(v.Root)
if err != nil {
return err
}
@@ -326,7 +348,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
if !blockDirRe.MatchString(names[0]) {
continue
}
- blockdirpath := filepath.Join(v.root, names[0])
+ blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := os.Open(blockdirpath)
if err != nil {
log.Print("Error reading ", blockdirpath, ": ", err)
@@ -360,9 +382,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
}
// Trash trashes the block data from the unix storage
-// If trashLifetime == 0, the block is deleted
+// If TrashLifetime == 0, the block is deleted
// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + trashLifetime
+// where deadline = now + TrashLifetime
func (v *UnixVolume) Trash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
@@ -372,7 +394,7 @@ func (v *UnixVolume) Trash(loc string) error {
// Trash() will read the correct up-to-date timestamp and choose not to
// trash the file.
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
if v.locker != nil {
@@ -397,21 +419,21 @@ func (v *UnixVolume) Trash(loc string) error {
// anyway (because the permission signatures have expired).
if fi, err := os.Stat(p); err != nil {
return err
- } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+ } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
return nil
}
- if trashLifetime == 0 {
+ if theConfig.TrashLifetime == 0 {
return os.Remove(p)
}
- return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+ return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
}
// Untrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
func (v *UnixVolume) Untrash(loc string) (err error) {
- if v.readonly {
+ if v.ReadOnly {
return MethodDisabledError
}
@@ -446,7 +468,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
// blockDir returns the fully qualified directory name for the directory
// where loc is (or would be) stored on this volume.
func (v *UnixVolume) blockDir(loc string) string {
- return filepath.Join(v.root, loc[0:3])
+ return filepath.Join(v.Root, loc[0:3])
}
// blockPath returns the fully qualified pathname for the path to loc
@@ -459,7 +481,7 @@ func (v *UnixVolume) blockPath(loc string) string {
// MinFreeKilobytes.
//
func (v *UnixVolume) IsFull() (isFull bool) {
- fullSymlink := v.root + "/full"
+ fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
if link, err := os.Readlink(fullSymlink); err == nil {
@@ -491,7 +513,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
//
func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
var fs syscall.Statfs_t
- err = syscall.Statfs(v.root, &fs)
+ err = syscall.Statfs(v.Root, &fs)
if err == nil {
// Statfs output is not guaranteed to measure free
// space in terms of 1K blocks.
@@ -501,13 +523,13 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
}
func (v *UnixVolume) String() string {
- return fmt.Sprintf("[UnixVolume %s]", v.root)
+ return fmt.Sprintf("[UnixVolume %s]", v.Root)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// are expected to fail.
func (v *UnixVolume) Writable() bool {
- return !v.readonly
+ return !v.ReadOnly
}
// Replication returns the number of replicas promised by the
@@ -546,7 +568,7 @@ func (v *UnixVolume) EmptyTrash() {
var bytesDeleted, bytesInTrash int64
var blocksDeleted, blocksInTrash int
- err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+ err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
return nil
diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go
index c95538b..ac0a492 100644
--- a/services/keepstore/volume_unix_test.go
+++ b/services/keepstore/volume_unix_test.go
@@ -30,9 +30,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
}
return &TestableUnixVolume{
UnixVolume: UnixVolume{
- root: d,
+ Root: d,
+ ReadOnly: readonly,
locker: locker,
- readonly: readonly,
},
t: t,
}
@@ -42,9 +42,9 @@ func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVol
// the volume is readonly.
func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
defer func(orig bool) {
- v.readonly = orig
- }(v.readonly)
- v.readonly = false
+ v.ReadOnly = orig
+ }(v.ReadOnly)
+ v.ReadOnly = false
err := v.Put(locator, data)
if err != nil {
v.t.Fatal(err)
@@ -59,7 +59,7 @@ func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
}
func (v *TestableUnixVolume) Teardown() {
- if err := os.RemoveAll(v.root); err != nil {
+ if err := os.RemoveAll(v.Root); err != nil {
v.t.Fatal(err)
}
}
@@ -126,7 +126,7 @@ func TestPut(t *testing.T) {
if err != nil {
t.Error(err)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
if buf, err := ioutil.ReadFile(p); err != nil {
t.Error(err)
} else if bytes.Compare(buf, TestBlock) != 0 {
@@ -139,7 +139,7 @@ func TestPutBadVolume(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- os.Chmod(v.root, 000)
+ os.Chmod(v.Root, 000)
err := v.Put(TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
@@ -178,7 +178,7 @@ func TestIsFull(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- fullPath := v.root + "/full"
+ fullPath := v.Root + "/full"
now := fmt.Sprintf("%d", time.Now().Unix())
os.Symlink(now, fullPath)
if !v.IsFull() {
@@ -200,8 +200,8 @@ func TestNodeStatus(t *testing.T) {
// Get node status and make a basic sanity check.
volinfo := v.Status()
- if volinfo.MountPoint != v.root {
- t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+ if volinfo.MountPoint != v.Root {
+ t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
}
if volinfo.DeviceNum == 0 {
t.Errorf("uninitialized device_num in %v", volinfo)
@@ -301,7 +301,7 @@ func TestUnixVolumeCompare(t *testing.T) {
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
- p := fmt.Sprintf("%s/%s/%s", v.root, TestHash[:3], TestHash)
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
err = v.Compare(TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list