[ARVADOS] created: 1.3.0-1455-g587f6f423
Git user
git at public.curoverse.com
Tue Aug 13 13:41:09 UTC 2019
at 587f6f4239087291f617159cc1daad468b949345 (commit)
commit 587f6f4239087291f617159cc1daad468b949345
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Aug 13 09:40:55 2019 -0400
13647: Use cluster config instead of custom keepstore config.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/lib/config/cmd_test.go b/lib/config/cmd_test.go
index af7c57120..6d20a777f 100644
--- a/lib/config/cmd_test.go
+++ b/lib/config/cmd_test.go
@@ -85,7 +85,7 @@ func (s *CommandSuite) TestCheckOldKeepstoreConfigFile(c *check.C) {
c.Assert(err, check.IsNil)
defer os.Remove(f.Name())
- io.WriteString(f, "Debug: true\n")
+ io.WriteString(f, "Listen: :12345\nDebug: true\n")
var stdout, stderr bytes.Buffer
in := `
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 9ac4aeeb9..3e9ed4e9c 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -176,6 +176,15 @@ Clusters:
# parameter higher than this value, this value is used instead.
MaxItemsPerResponse: 1000
+ # Maximum number of concurrent requests to accept in a single
+ # service process, or 0 for no limit. Currently supported only
+ # by keepstore.
+ MaxConcurrentRequests: 0
+
+ # Maximum number of 64MiB memory buffers per keepstore server
+ # process, or 0 for no limit.
+ MaxKeepBlockBuffers: 128
+
# API methods to disable. Disabled methods are not listed in the
# discovery document, and respond 404 to all requests.
# Example: {"jobs.create":{}, "pipeline_instances.create": {}}
@@ -313,15 +322,44 @@ Clusters:
# BlobSigningKey is a string of alphanumeric characters used to
# generate permission signatures for Keep locators. It must be
- # identical to the permission key given to Keep. IMPORTANT: This is
- # a site secret. It should be at least 50 characters.
+ # identical to the permission key given to Keep. IMPORTANT: This
+ # is a site secret. It should be at least 50 characters.
#
- # Modifying blob_signing_key will invalidate all existing
+ # Modifying BlobSigningKey will invalidate all existing
# signatures, which can cause programs to fail (e.g., arv-put,
- # arv-get, and Crunch jobs). To avoid errors, rotate keys only when
- # no such processes are running.
+ # arv-get, and Crunch jobs). To avoid errors, rotate keys only
+ # when no such processes are running.
BlobSigningKey: ""
+ # Enable garbage collection of unreferenced blobs in Keep.
+ BlobTrash: true
+
+ # Time to leave unreferenced blobs in "trashed" state before
+ # deleting them, or 0 to skip the "trashed" state entirely and
+ # delete unreferenced blobs.
+ #
+ # If you use any Amazon S3 buckets as storage volumes, this
+ # must be at least 24h to avoid occasional data loss.
+ BlobTrashLifetime: 336h
+
+ # How often to check for (and delete) trashed blocks whose
+ # BlobTrashLifetime has expired.
+ BlobTrashCheckInterval: 24h
+
+ # Maximum number of concurrent "trash blob" and "delete trashed
+ # blob" operations conducted by a single keepstore process. Each
+ # of these can be set to 0 to disable the respective operation.
+ #
+ # If BlobTrashLifetime is zero, "trash" and "delete trash"
+ # happen at once, so only the lower of these two values is used.
+ BlobTrashConcurrency: 4
+ BlobDeleteConcurrency: 4
+
+ # Maximum number of concurrent "create additional replica of
+ # existing blob" operations conducted by a single keepstore
+ # process.
+ BlobReplicateConcurrency: 4
+
# Default replication level for collections. This is used when a
# collection's replication_desired attribute is nil.
DefaultReplication: 2
@@ -730,6 +768,44 @@ Clusters:
Price: 0.1
Preemptible: false
+ Volumes:
+ SAMPLE:
+ AccessViaHosts:
+ SAMPLE:
+ ReadOnly: false
+ ReadOnly: false
+ Replication: 1
+ StorageClasses:
+ default: true
+ SAMPLE: true
+ Driver: s3
+ DriverParameters:
+
+ # for s3 driver
+ AccessKey: aaaaa
+ SecretKey: aaaaa
+ Endpoint: ""
+ Region: us-east-1a
+ Bucket: aaaaa
+ LocationConstraint: false
+ IndexPageSize: 1000
+ ConnectTimeout: 1m
+ ReadTimeout: 10m
+ RaceWindow: 24h
+ UnsafeDelete: false
+
+ # for azure driver
+ StorageAccountName: aaaaa
+ StorageAccountKey: aaaaa
+ StorageBaseURL: ""
+ ContainerName: aaaaa
+ RequestTimeout: 30s
+ ListBlobsRetryDelay: 10s
+ ListBlobsMaxAttempts: 10
+
+ # for local directory driver
+ Root: /var/lib/arvados/keep-data
+
Mail:
MailchimpAPIKey: ""
MailchimpListID: ""
diff --git a/lib/config/deprecated.go b/lib/config/deprecated.go
index 12581ddff..10dfc98ae 100644
--- a/lib/config/deprecated.go
+++ b/lib/config/deprecated.go
@@ -102,12 +102,6 @@ func applyDeprecatedNodeProfile(hostname string, ssi systemServiceInstance, svc
svc.InternalURLs[arvados.URL{Scheme: scheme, Host: host}] = arvados.ServiceInstance{}
}
-const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
-
-type oldKeepstoreConfig struct {
- Debug *bool
-}
-
func (ldr *Loader) loadOldConfigHelper(component, path string, target interface{}) error {
if path == "" {
return nil
@@ -126,35 +120,6 @@ func (ldr *Loader) loadOldConfigHelper(component, path string, target interface{
return nil
}
-// update config using values from an old-style keepstore config file.
-func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
- if ldr.KeepstorePath == "" {
- return nil
- }
- var oc oldKeepstoreConfig
- err := ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
- if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
- return nil
- } else if err != nil {
- return err
- }
-
- cluster, err := cfg.GetCluster("")
- if err != nil {
- return err
- }
-
- if v := oc.Debug; v == nil {
- } else if *v && cluster.SystemLogs.LogLevel != "debug" {
- cluster.SystemLogs.LogLevel = "debug"
- } else if !*v && cluster.SystemLogs.LogLevel != "info" {
- cluster.SystemLogs.LogLevel = "info"
- }
-
- cfg.Clusters[cluster.ClusterID] = *cluster
- return nil
-}
-
type oldCrunchDispatchSlurmConfig struct {
Client *arvados.Client
diff --git a/lib/config/deprecated_keepstore.go b/lib/config/deprecated_keepstore.go
new file mode 100644
index 000000000..efa1ece9a
--- /dev/null
+++ b/lib/config/deprecated_keepstore.go
@@ -0,0 +1,470 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package config
+
+import (
+ "bytes"
+ "crypto/rand"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "math/big"
+ "net"
+ "os"
+ "strconv"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+const defaultKeepstoreConfigPath = "/etc/arvados/keepstore/keepstore.yml"
+
+type oldKeepstoreConfig struct {
+ Debug *bool
+ Listen *string
+
+ LogFormat *string
+
+ PIDFile *string
+
+ MaxBuffers *int
+ MaxRequests *int
+
+ BlobSignatureTTL *arvados.Duration
+ BlobSigningKeyFile *string
+ RequireSignatures *bool
+ SystemAuthTokenFile *string
+ EnableDelete *bool
+ TrashLifetime *arvados.Duration
+ TrashCheckInterval *arvados.Duration
+ PullWorkers *int
+ TrashWorkers *int
+ EmptyTrashWorkers *int
+ TLSCertificateFile *string
+ TLSKeyFile *string
+
+ Volumes *oldKeepstoreVolumeList
+
+ ManagementToken *string
+}
+
+type oldKeepstoreVolumeList []oldKeepstoreVolume
+
+type oldKeepstoreVolume struct {
+ arvados.Volume
+ Type string `json:",omitempty"`
+
+ // Azure driver configs
+ StorageAccountName string `json:",omitempty"`
+ StorageAccountKeyFile string `json:",omitempty"`
+ StorageBaseURL string `json:",omitempty"`
+ ContainerName string `json:",omitempty"`
+ AzureReplication int `json:",omitempty"`
+ RequestTimeout arvados.Duration `json:",omitempty"`
+ ListBlobsRetryDelay arvados.Duration `json:",omitempty"`
+ ListBlobsMaxAttempts int `json:",omitempty"`
+
+ // S3 driver configs
+ AccessKeyFile string `json:",omitempty"`
+ SecretKeyFile string `json:",omitempty"`
+ Endpoint string `json:",omitempty"`
+ Region string `json:",omitempty"`
+ Bucket string `json:",omitempty"`
+ LocationConstraint bool `json:",omitempty"`
+ IndexPageSize int `json:",omitempty"`
+ S3Replication int `json:",omitempty"`
+ ConnectTimeout arvados.Duration `json:",omitempty"`
+ ReadTimeout arvados.Duration `json:",omitempty"`
+ RaceWindow arvados.Duration `json:",omitempty"`
+ UnsafeDelete bool `json:",omitempty"`
+
+ // Directory driver configs
+ Root string
+ DirectoryReplication int
+ Serialize bool
+
+ // Common configs
+ ReadOnly bool `json:",omitempty"`
+ StorageClasses []string `json:",omitempty"`
+}
+
+// update config using values from an old-style keepstore config file.
+func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return fmt.Errorf("getting hostname: %s", err)
+ }
+
+ var oc oldKeepstoreConfig
+ err = ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, &oc)
+ if os.IsNotExist(err) && (ldr.KeepstorePath == defaultKeepstoreConfigPath) {
+ return nil
+ } else if err != nil {
+ return err
+ }
+
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
+ }
+
+ myURL := arvados.URL{Scheme: "http"}
+ if oc.TLSCertificateFile != nil && oc.TLSKeyFile != nil {
+ myURL.Scheme = "https"
+ }
+
+ if v := oc.Debug; v == nil {
+ } else if *v && cluster.SystemLogs.LogLevel != "debug" {
+ cluster.SystemLogs.LogLevel = "debug"
+ } else if !*v && cluster.SystemLogs.LogLevel != "info" {
+ cluster.SystemLogs.LogLevel = "info"
+ }
+
+ if v := oc.TLSCertificateFile; v != nil && "file://"+*v != cluster.TLS.Certificate {
+ cluster.TLS.Certificate = "file://" + *v
+ }
+ if v := oc.TLSKeyFile; v != nil && "file://"+*v != cluster.TLS.Key {
+ cluster.TLS.Key = "file://" + *v
+ }
+ if v := oc.Listen; v != nil {
+ if _, ok := cluster.Services.Keepstore.InternalURLs[arvados.URL{Scheme: myURL.Scheme, Host: *v}]; ok {
+ // already listed
+ myURL.Host = *v
+ } else if len(*v) > 1 && (*v)[0] == ':' {
+ myURL.Host = net.JoinHostPort(hostname, (*v)[1:])
+ cluster.Services.Keepstore.InternalURLs[myURL] = arvados.ServiceInstance{}
+ } else {
+ return fmt.Errorf("unable to migrate Listen value %q from legacy keepstore config file -- remove after configuring Services.Keepstore.InternalURLs.", *v)
+ }
+ } else {
+ for url := range cluster.Services.Keepstore.InternalURLs {
+ if host, _, _ := net.SplitHostPort(url.Host); host == hostname {
+ myURL = url
+ break
+ }
+ }
+ if myURL.Host == "" {
+ return fmt.Errorf("unable to migrate legacy keepstore config: no 'Listen' key, and hostname %q does not match an entry in Services.Keepstore.InternalURLs", hostname)
+ }
+ }
+
+ if v := oc.LogFormat; v != nil && *v != cluster.SystemLogs.Format {
+ cluster.SystemLogs.Format = *v
+ }
+ if v := oc.MaxBuffers; v != nil && *v != cluster.API.MaxKeepBlockBuffers {
+ cluster.API.MaxKeepBlockBuffers = *v
+ }
+ if v := oc.MaxRequests; v != nil && *v != cluster.API.MaxConcurrentRequests {
+ cluster.API.MaxConcurrentRequests = *v
+ }
+ if v := oc.BlobSignatureTTL; v != nil && *v != cluster.Collections.BlobSigningTTL {
+ cluster.Collections.BlobSigningTTL = *v
+ }
+ if v := oc.BlobSigningKeyFile; v != nil {
+ buf, err := ioutil.ReadFile(*v)
+ if err != nil {
+ return fmt.Errorf("error reading BlobSigningKeyFile: %s", err)
+ }
+ if key := strings.TrimSpace(string(buf)); key != cluster.Collections.BlobSigningKey {
+ cluster.Collections.BlobSigningKey = key
+ }
+ }
+ if v := oc.RequireSignatures; v != nil && *v != cluster.Collections.BlobSigning {
+ cluster.Collections.BlobSigning = *v
+ }
+ if v := oc.SystemAuthTokenFile; v != nil {
+ f, err := os.Open(*v)
+ if err != nil {
+ return fmt.Errorf("error opening SystemAuthTokenFile: %s", err)
+ }
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ if err != nil {
+ return fmt.Errorf("error reading SystemAuthTokenFile: %s", err)
+ }
+ if key := strings.TrimSpace(string(buf)); key != cluster.SystemRootToken {
+ cluster.SystemRootToken = key
+ }
+ }
+ if v := oc.EnableDelete; v != nil && *v != cluster.Collections.BlobTrash {
+ cluster.Collections.BlobTrash = *v
+ }
+ if v := oc.TrashLifetime; v != nil && *v != cluster.Collections.BlobTrashLifetime {
+ cluster.Collections.BlobTrashLifetime = *v
+ }
+ if v := oc.TrashCheckInterval; v != nil && *v != cluster.Collections.BlobTrashCheckInterval {
+ cluster.Collections.BlobTrashCheckInterval = *v
+ }
+ if v := oc.TrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+ cluster.Collections.BlobTrashConcurrency = *v
+ }
+ if v := oc.EmptyTrashWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+ cluster.Collections.BlobDeleteConcurrency = *v
+ }
+ if v := oc.PullWorkers; v != nil && *v != cluster.Collections.BlobReplicateConcurrency {
+ cluster.Collections.BlobReplicateConcurrency = *v
+ }
+ if v := oc.Volumes; v != nil {
+ for i, oldvol := range *v {
+ var accessViaHosts map[arvados.URL]arvados.VolumeAccess
+ oldUUID, found := ldr.alreadyMigrated(oldvol, cluster.Volumes, myURL)
+ if found {
+ accessViaHosts = cluster.Volumes[oldUUID].AccessViaHosts
+ writers := false
+ for _, va := range accessViaHosts {
+ if !va.ReadOnly {
+ writers = true
+ }
+ }
+ if writers || len(accessViaHosts) == 0 {
+ ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
+ if len(accessViaHosts) > 0 {
+ cluster.Volumes[oldUUID].AccessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
+ }
+ continue
+ }
+ }
+ var newvol arvados.Volume
+ if found {
+ ldr.Logger.Infof("ignoring volume #%d's parameters in legacy keepstore config: using matching entry in cluster config instead", i)
+ newvol = cluster.Volumes[oldUUID]
+ // Remove the old entry. It will be
+ // added back below, possibly with a
+ // new UUID.
+ delete(cluster.Volumes, oldUUID)
+ } else {
+ var params interface{}
+ switch oldvol.Type {
+ case "S3":
+ accesskeydata, err := ioutil.ReadFile(oldvol.AccessKeyFile)
+ if err != nil && oldvol.AccessKeyFile != "" {
+ return fmt.Errorf("error reading AccessKeyFile: %s", err)
+ }
+ secretkeydata, err := ioutil.ReadFile(oldvol.SecretKeyFile)
+ if err != nil && oldvol.SecretKeyFile != "" {
+ return fmt.Errorf("error reading SecretKeyFile: %s", err)
+ }
+ newvol = arvados.Volume{
+ Driver: "S3",
+ ReadOnly: oldvol.ReadOnly,
+ Replication: oldvol.S3Replication,
+ StorageClasses: array2boolmap(oldvol.StorageClasses),
+ }
+ params = arvados.S3VolumeDriverParameters{
+ AccessKey: string(bytes.TrimSpace(accesskeydata)),
+ SecretKey: string(bytes.TrimSpace(secretkeydata)),
+ Endpoint: oldvol.Endpoint,
+ Region: oldvol.Region,
+ Bucket: oldvol.Bucket,
+ LocationConstraint: oldvol.LocationConstraint,
+ IndexPageSize: oldvol.IndexPageSize,
+ ConnectTimeout: oldvol.ConnectTimeout,
+ ReadTimeout: oldvol.ReadTimeout,
+ RaceWindow: oldvol.RaceWindow,
+ UnsafeDelete: oldvol.UnsafeDelete,
+ }
+ case "Azure":
+ keydata, err := ioutil.ReadFile(oldvol.StorageAccountKeyFile)
+ if err != nil && oldvol.StorageAccountKeyFile != "" {
+ return fmt.Errorf("error reading StorageAccountKeyFile: %s", err)
+ }
+ newvol = arvados.Volume{
+ Driver: "Azure",
+ ReadOnly: oldvol.ReadOnly,
+ Replication: oldvol.AzureReplication,
+ StorageClasses: array2boolmap(oldvol.StorageClasses),
+ }
+ params = arvados.AzureVolumeDriverParameters{
+ StorageAccountName: oldvol.StorageAccountName,
+ StorageAccountKey: string(bytes.TrimSpace(keydata)),
+ StorageBaseURL: oldvol.StorageBaseURL,
+ ContainerName: oldvol.ContainerName,
+ RequestTimeout: oldvol.RequestTimeout,
+ ListBlobsRetryDelay: oldvol.ListBlobsRetryDelay,
+ ListBlobsMaxAttempts: oldvol.ListBlobsMaxAttempts,
+ }
+ case "Directory":
+ newvol = arvados.Volume{
+ Driver: "Directory",
+ ReadOnly: oldvol.ReadOnly,
+ Replication: oldvol.DirectoryReplication,
+ StorageClasses: array2boolmap(oldvol.StorageClasses),
+ }
+ params = arvados.DirectoryVolumeDriverParameters{
+ Root: oldvol.Root,
+ Serialize: oldvol.Serialize,
+ }
+ default:
+ return fmt.Errorf("unsupported volume type %q", oldvol.Type)
+ }
+ dp, err := json.Marshal(params)
+ if err != nil {
+ return err
+ }
+ newvol.DriverParameters = json.RawMessage(dp)
+ if newvol.Replication < 1 {
+ newvol.Replication = 1
+ }
+ }
+ if accessViaHosts == nil {
+ accessViaHosts = make(map[arvados.URL]arvados.VolumeAccess, 1)
+ }
+ accessViaHosts[myURL] = arvados.VolumeAccess{ReadOnly: oldvol.ReadOnly}
+ newvol.AccessViaHosts = accessViaHosts
+
+ volUUID := oldUUID
+ if oldvol.ReadOnly {
+ } else if oc.Listen == nil {
+ ldr.Logger.Warn("cannot find optimal volume UUID because Listen address is not given in legacy keepstore config")
+ } else if uuid, _, err := findKeepServicesItem(cluster, *oc.Listen); err != nil {
+ ldr.Logger.WithError(err).Warn("cannot find optimal volume UUID: failed to find a matching keep_service listing for this legacy keepstore config")
+ } else if len(uuid) != 27 {
+ ldr.Logger.WithField("UUID", uuid).Warn("cannot find optimal volume UUID: keep_service UUID does not have expected format")
+ } else {
+ rendezvousUUID := cluster.ClusterID + "-nyw5e-" + uuid[12:]
+ if _, ok := cluster.Volumes[rendezvousUUID]; ok {
+ ldr.Logger.Warn("suggesting a random volume UUID because the volume ID matching our keep_service UUID is already in use")
+ } else {
+ volUUID = rendezvousUUID
+ }
+ }
+ if volUUID == "" {
+ volUUID = newUUID(cluster.ClusterID, "nyw5e")
+ ldr.Logger.WithField("UUID", volUUID).Infof("suggesting a random volume UUID for volume #%d in legacy config", i)
+ }
+ cluster.Volumes[volUUID] = newvol
+ }
+ }
+
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ return nil
+}
+
+func (ldr *Loader) alreadyMigrated(oldvol oldKeepstoreVolume, newvols map[string]arvados.Volume, myURL arvados.URL) (string, bool) {
+ for uuid, newvol := range newvols {
+ if oldvol.Type != newvol.Driver {
+ continue
+ }
+ switch oldvol.Type {
+ case "S3":
+ var params arvados.S3VolumeDriverParameters
+ if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
+ oldvol.Endpoint == params.Endpoint &&
+ oldvol.Region == params.Region &&
+ oldvol.Bucket == params.Bucket &&
+ oldvol.LocationConstraint == params.LocationConstraint {
+ return uuid, true
+ }
+ case "Azure":
+ var params arvados.AzureVolumeDriverParameters
+ if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
+ oldvol.StorageAccountName == params.StorageAccountName &&
+ oldvol.StorageBaseURL == params.StorageBaseURL &&
+ oldvol.ContainerName == params.ContainerName {
+ return uuid, true
+ }
+ case "Directory":
+ var params arvados.DirectoryVolumeDriverParameters
+ if err := json.Unmarshal(newvol.DriverParameters, ¶ms); err == nil &&
+ oldvol.Root == params.Root {
+ if _, ok := newvol.AccessViaHosts[myURL]; ok {
+ return uuid, true
+ }
+ }
+ }
+ }
+ return "", false
+}
+
+func array2boolmap(keys []string) map[string]bool {
+ m := map[string]bool{}
+ for _, k := range keys {
+ m[k] = true
+ }
+ return m
+}
+
+func newUUID(clusterID, infix string) string {
+ randint, err := rand.Int(rand.Reader, big.NewInt(0).Exp(big.NewInt(36), big.NewInt(15), big.NewInt(0)))
+ if err != nil {
+ panic(err)
+ }
+ randstr := randint.Text(36)
+ for len(randstr) < 15 {
+ randstr = "0" + randstr
+ }
+ return fmt.Sprintf("%s-%s-%s", clusterID, infix, randstr)
+}
+
+// Return the UUID and URL for the controller's keep_services listing
+// corresponding to this host/process.
+func findKeepServicesItem(cluster *arvados.Cluster, listen string) (uuid string, url arvados.URL, err error) {
+ client, err := arvados.NewClientFromConfig(cluster)
+ if err != nil {
+ return
+ }
+ client.AuthToken = cluster.SystemRootToken
+ var svcList arvados.KeepServiceList
+ err = client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
+ if err != nil {
+ return
+ }
+ hostname, err := os.Hostname()
+ if err != nil {
+ err = fmt.Errorf("error getting hostname: %s", err)
+ return
+ }
+ for _, ks := range svcList.Items {
+ if ks.ServiceType != "proxy" && keepServiceIsMe(ks, hostname, listen) {
+ url := arvados.URL{
+ Scheme: "http",
+ Host: net.JoinHostPort(ks.ServiceHost, strconv.Itoa(ks.ServicePort)),
+ }
+ if ks.ServiceSSLFlag {
+ url.Scheme = "https"
+ }
+ return ks.UUID, url, nil
+ }
+ }
+ err = errors.New("failed to find a keep_services entry that matches the current host/port")
+ return
+}
+
+var localhostOrAllInterfaces = map[string]bool{
+ "localhost": true,
+ "127.0.0.1": true,
+ "::1": true,
+ "::": true,
+ "": true,
+}
+
+// Return true if the given KeepService entry matches the given
+// hostname and (keepstore config file) listen address.
+//
+// If the KeepService host is some variant of "localhost", we assume
+// this is a testing or single-node environment, ignore the given
+// hostname, and return true if the port numbers match.
+//
+// The hostname isn't assumed to be a FQDN: a hostname "foo.bar" will
+// match a KeepService host "foo.bar", but also "foo.bar.example",
+// "foo.bar.example.org", etc.
+func keepServiceIsMe(ks arvados.KeepService, hostname string, listen string) bool {
+ // Extract the port name/number from listen, and resolve it to
+ // a port number to compare with ks.ServicePort.
+ _, listenport, err := net.SplitHostPort(listen)
+ if err != nil && strings.HasPrefix(listen, ":") {
+ listenport = listen[1:]
+ }
+ if lp, err := net.LookupPort("tcp", listenport); err != nil {
+ return false
+ } else if !(lp == ks.ServicePort ||
+ (lp == 0 && ks.ServicePort == 80)) {
+ return false
+ }
+
+ kshost := strings.ToLower(ks.ServiceHost)
+ return localhostOrAllInterfaces[kshost] || strings.HasPrefix(kshost+".", strings.ToLower(hostname)+".")
+}
diff --git a/lib/config/deprecated_keepstore_test.go b/lib/config/deprecated_keepstore_test.go
new file mode 100644
index 000000000..59a09c925
--- /dev/null
+++ b/lib/config/deprecated_keepstore_test.go
@@ -0,0 +1,640 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package config
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "sort"
+ "strconv"
+ "text/template"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+type KeepstoreMigrationSuite struct {
+ hostname string // blank = use test system's hostname
+}
+
+var _ = check.Suite(&KeepstoreMigrationSuite{})
+
+func (s *KeepstoreMigrationSuite) checkEquivalentWithKeepstoreConfig(c *check.C, keepstoreconfig, clusterconfig, expectedconfig string) {
+ keepstorefile, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(keepstorefile.Name())
+ _, err = io.WriteString(keepstorefile, keepstoreconfig)
+ c.Assert(err, check.IsNil)
+ err = keepstorefile.Close()
+ c.Assert(err, check.IsNil)
+
+ gotldr := testLoader(c, clusterconfig, nil)
+ gotldr.KeepstorePath = keepstorefile.Name()
+ expectedldr := testLoader(c, expectedconfig, nil)
+ checkEquivalentLoaders(c, gotldr, expectedldr)
+}
+
+func (s *KeepstoreMigrationSuite) TestDeprecatedKeepstoreConfig(c *check.C) {
+ keyfile, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(keyfile.Name())
+ io.WriteString(keyfile, "blobsigningkey\n")
+
+ hostname, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+
+ s.checkEquivalentWithKeepstoreConfig(c, `
+Listen: ":12345"
+Debug: true
+LogFormat: text
+MaxBuffers: 1234
+MaxRequests: 2345
+BlobSignatureTTL: 123m
+BlobSigningKeyFile: `+keyfile.Name()+`
+`, `
+Clusters:
+ z1111:
+ {}
+`, `
+Clusters:
+ z1111:
+ Services:
+ Keepstore:
+ InternalURLs:
+ "http://`+hostname+`:12345": {}
+ SystemLogs:
+ Format: text
+ LogLevel: debug
+ API:
+ MaxKeepBlockBuffers: 1234
+ MaxConcurrentRequests: 2345
+ Collections:
+ BlobSigningTTL: 123m
+ BlobSigningKey: blobsigningkey
+`)
+}
+
+func (s *KeepstoreMigrationSuite) TestDeprecatedVolumes(c *check.C) {
+ accesskeyfile, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(accesskeyfile.Name())
+ io.WriteString(accesskeyfile, "accesskeydata\n")
+
+ secretkeyfile, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(secretkeyfile.Name())
+ io.WriteString(secretkeyfile, "secretkeydata\n")
+
+ // s3, empty/default
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: S3
+`, arvados.Volume{
+ Driver: "S3",
+ Replication: 1,
+ }, &arvados.S3VolumeDriverParameters{}, &arvados.S3VolumeDriverParameters{})
+
+ // s3, fully configured
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: S3
+ AccessKeyFile: `+accesskeyfile.Name()+`
+ SecretKeyFile: `+secretkeyfile.Name()+`
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: testbucket
+ LocationConstraint: true
+ IndexPageSize: 1234
+ S3Replication: 4
+ ConnectTimeout: 3m
+ ReadTimeout: 4m
+ RaceWindow: 5m
+ UnsafeDelete: true
+`, arvados.Volume{
+ Driver: "S3",
+ Replication: 4,
+ }, &arvados.S3VolumeDriverParameters{
+ AccessKey: "accesskeydata",
+ SecretKey: "secretkeydata",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "us-east-1z",
+ Bucket: "testbucket",
+ LocationConstraint: true,
+ IndexPageSize: 1234,
+ ConnectTimeout: arvados.Duration(time.Minute * 3),
+ ReadTimeout: arvados.Duration(time.Minute * 4),
+ RaceWindow: arvados.Duration(time.Minute * 5),
+ UnsafeDelete: true,
+ }, &arvados.S3VolumeDriverParameters{})
+
+ // azure, empty/default
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: Azure
+`, arvados.Volume{
+ Driver: "Azure",
+ Replication: 1,
+ }, &arvados.AzureVolumeDriverParameters{}, &arvados.AzureVolumeDriverParameters{})
+
+ // azure, fully configured
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: Azure
+ ReadOnly: true
+ StorageAccountName: storageacctname
+ StorageAccountKeyFile: `+secretkeyfile.Name()+`
+ StorageBaseURL: https://example.example
+ ContainerName: testctr
+ LocationConstraint: true
+ AzureReplication: 4
+ RequestTimeout: 3m
+ ListBlobsRetryDelay: 4m
+ ListBlobsMaxAttempts: 5
+`, arvados.Volume{
+ Driver: "Azure",
+ ReadOnly: true,
+ Replication: 4,
+ }, &arvados.AzureVolumeDriverParameters{
+ StorageAccountName: "storageacctname",
+ StorageAccountKey: "secretkeydata",
+ StorageBaseURL: "https://example.example",
+ ContainerName: "testctr",
+ RequestTimeout: arvados.Duration(time.Minute * 3),
+ ListBlobsRetryDelay: arvados.Duration(time.Minute * 4),
+ ListBlobsMaxAttempts: 5,
+ }, &arvados.AzureVolumeDriverParameters{})
+
+ // directory, empty/default
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: Directory
+ Root: /tmp/xyzzy
+`, arvados.Volume{
+ Driver: "Directory",
+ Replication: 1,
+ }, &arvados.DirectoryVolumeDriverParameters{
+ Root: "/tmp/xyzzy",
+ }, &arvados.DirectoryVolumeDriverParameters{})
+
+ // directory, fully configured
+ s.testDeprecatedVolume(c, `
+Volumes:
+- Type: Directory
+ ReadOnly: true
+ Root: /tmp/xyzzy
+ DirectoryReplication: 4
+ Serialize: true
+`, arvados.Volume{
+ Driver: "Directory",
+ ReadOnly: true,
+ Replication: 4,
+ }, &arvados.DirectoryVolumeDriverParameters{
+ Root: "/tmp/xyzzy",
+ Serialize: true,
+ }, &arvados.DirectoryVolumeDriverParameters{})
+}
+
+func (s *KeepstoreMigrationSuite) testDeprecatedVolume(c *check.C, oldconfigdata string, expectvol arvados.Volume, expectparams interface{}, paramsdst interface{}) {
+ hostname := s.hostname
+ if hostname == "" {
+ h, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+ hostname = h
+ }
+
+ oldconfig, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(oldconfig.Name())
+ io.WriteString(oldconfig, "Listen: :12345\n"+oldconfigdata)
+
+ ldr := testLoader(c, "Clusters: {z1111: {}}", nil)
+ ldr.KeepstorePath = oldconfig.Name()
+ cfg, err := ldr.Load()
+ c.Assert(err, check.IsNil)
+ cc := cfg.Clusters["z1111"]
+ c.Check(cc.Volumes, check.HasLen, 1)
+ for uuid, v := range cc.Volumes {
+ c.Check(uuid, check.HasLen, 27)
+ c.Check(v.Driver, check.Equals, expectvol.Driver)
+ c.Check(v.Replication, check.Equals, expectvol.Replication)
+
+ avh, ok := v.AccessViaHosts[arvados.URL{Scheme: "http", Host: hostname + ":12345"}]
+ c.Check(ok, check.Equals, true)
+ c.Check(avh.ReadOnly, check.Equals, expectvol.ReadOnly)
+
+ err := json.Unmarshal(v.DriverParameters, paramsdst)
+ c.Check(err, check.IsNil)
+ c.Check(paramsdst, check.DeepEquals, expectparams)
+ }
+}
+
+// How we handle a volume from a legacy keepstore config file depends
+// on whether it's writable, whether a volume using the same cloud
+// backend already exists in the cluster config, and (if so) whether
+// it already has an AccessViaHosts entry for this host.
+//
+// In all cases, we should end up with an AccessViaHosts entry for
+// this host, to indicate that the current host's volumes have been
+// migrated.
+
+// Same backend already referenced in cluster config, this host
+// already listed in AccessViaHosts --> no change, except possibly
+// updating the ReadOnly flag on the AccessViaHosts entry.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_AlreadyMigrated(c *check.C) {
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :12345
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: alreadymigrated
+ S3Replication: 3
+`)
+ checkEqualYAML(c, after, before)
+}
+
+// Writable volume, same cloud backend already referenced in cluster
+// config --> change UUID to match this keepstore's UUID.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_UpdateUUID(c *check.C) {
+ port, expectUUID := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: readonlyonother
+ S3Replication: 3
+`)
+ c.Check(after, check.HasLen, len(before))
+ newuuids := s.findAddedVolumes(c, before, after, 1)
+ newvol := after[newuuids[0]]
+
+ var params arvados.S3VolumeDriverParameters
+ json.Unmarshal(newvol.DriverParameters, ¶ms)
+ c.Check(params.Bucket, check.Equals, "readonlyonother")
+ c.Check(newuuids[0], check.Equals, expectUUID)
+}
+
+// Writable volume, same cloud backend not yet referenced --> add a
+// new volume, with UUID to match this keepstore's UUID.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_AddCloudVolume(c *check.C) {
+ port, expectUUID := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: bucket-to-migrate
+ S3Replication: 3
+`)
+ newuuids := s.findAddedVolumes(c, before, after, 1)
+ newvol := after[newuuids[0]]
+
+ var params arvados.S3VolumeDriverParameters
+ json.Unmarshal(newvol.DriverParameters, ¶ms)
+ c.Check(params.Bucket, check.Equals, "bucket-to-migrate")
+ c.Check(newvol.Replication, check.Equals, 3)
+
+ c.Check(newuuids[0], check.Equals, expectUUID)
+}
+
+// Writable volume, same filesystem backend already referenced in
+// cluster config, but this host isn't in AccessViaHosts --> add a new
+// volume, with UUID to match this keepstore's UUID (filesystem-backed
+// volumes are assumed to be different on different hosts, even if
+// paths are the same).
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_AddLocalVolume(c *check.C) {
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :12345
+Volumes:
+- Type: Directory
+ Root: /data/sdd
+ DirectoryReplication: 2
+`)
+ newuuids := s.findAddedVolumes(c, before, after, 1)
+ newvol := after[newuuids[0]]
+
+ var params arvados.DirectoryVolumeDriverParameters
+ json.Unmarshal(newvol.DriverParameters, ¶ms)
+ c.Check(params.Root, check.Equals, "/data/sdd")
+ c.Check(newvol.Replication, check.Equals, 2)
+}
+
+// Writable volume, same filesystem backend already referenced in
+// cluster config, and this host is already listed in AccessViaHosts
+// --> already migrated, don't change anything.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_LocalVolumeAlreadyMigrated(c *check.C) {
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :12345
+Volumes:
+- Type: Directory
+ Root: /data/sde
+ DirectoryReplication: 2
+`)
+ checkEqualYAML(c, after, before)
+}
+
+// Multiple writable cloud-backed volumes --> one of them will get a
+// UUID matching this keepstore's UUID.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_AddMultipleCloudVolumes(c *check.C) {
+ port, expectUUID := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: first-bucket-to-migrate
+ S3Replication: 3
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: second-bucket-to-migrate
+ S3Replication: 3
+`)
+ newuuids := s.findAddedVolumes(c, before, after, 2)
+ // Sort by bucket name (so "first" comes before "second")
+ params := map[string]arvados.S3VolumeDriverParameters{}
+ for _, uuid := range newuuids {
+ var p arvados.S3VolumeDriverParameters
+ json.Unmarshal(after[uuid].DriverParameters, &p)
+ params[uuid] = p
+ }
+ sort.Slice(newuuids, func(i, j int) bool { return params[newuuids[i]].Bucket < params[newuuids[j]].Bucket })
+ newvol0, newvol1 := after[newuuids[0]], after[newuuids[1]]
+ params0, params1 := params[newuuids[0]], params[newuuids[1]]
+
+ c.Check(params0.Bucket, check.Equals, "first-bucket-to-migrate")
+ c.Check(newvol0.Replication, check.Equals, 3)
+
+ c.Check(params1.Bucket, check.Equals, "second-bucket-to-migrate")
+ c.Check(newvol1.Replication, check.Equals, 3)
+
+ // Don't care which one gets the special UUID
+ if newuuids[0] != expectUUID {
+ c.Check(newuuids[1], check.Equals, expectUUID)
+ }
+}
+
+// Non-writable volume, same cloud backend already referenced in
+// cluster config --> add this host to AccessViaHosts with
+// ReadOnly==true
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_UpdateWithReadOnly(c *check.C) {
+ port, _ := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: readonlyonother
+ S3Replication: 3
+ ReadOnly: true
+`)
+ hostname, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+ url := arvados.URL{
+ Scheme: "http",
+ Host: fmt.Sprintf("%s:%d", hostname, port),
+ }
+ _, ok := before["zzzzz-nyw5e-readonlyonother"].AccessViaHosts[url]
+ c.Check(ok, check.Equals, false)
+ _, ok = after["zzzzz-nyw5e-readonlyonother"].AccessViaHosts[url]
+ c.Check(ok, check.Equals, true)
+}
+
+// Writable volume, same cloud backend already writable by another
+// keepstore server --> add this host to AccessViaHosts with
+// ReadOnly==true
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_UpdateAlreadyWritable(c *check.C) {
+ port, _ := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: writableonother
+ S3Replication: 3
+ ReadOnly: false
+`)
+ hostname, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+ url := arvados.URL{
+ Scheme: "http",
+ Host: fmt.Sprintf("%s:%d", hostname, port),
+ }
+ _, ok := before["zzzzz-nyw5e-writableonother"].AccessViaHosts[url]
+ c.Check(ok, check.Equals, false)
+ _, ok = after["zzzzz-nyw5e-writableonother"].AccessViaHosts[url]
+ c.Check(ok, check.Equals, true)
+}
+
+// Non-writable volume, same cloud backend not already referenced in
+// cluster config --> assign a new random volume UUID.
+func (s *KeepstoreMigrationSuite) TestIncrementalVolumeMigration_AddReadOnly(c *check.C) {
+ port, _ := s.getTestKeepstorePortAndMatchingVolumeUUID(c)
+ before, after := s.loadWithKeepstoreConfig(c, `
+Listen: :`+strconv.Itoa(port)+`
+Volumes:
+- Type: S3
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: differentbucket
+ S3Replication: 3
+`)
+ newuuids := s.findAddedVolumes(c, before, after, 1)
+ newvol := after[newuuids[0]]
+
+ var params arvados.S3VolumeDriverParameters
+ json.Unmarshal(newvol.DriverParameters, ¶ms)
+ c.Check(params.Bucket, check.Equals, "differentbucket")
+
+ hostname, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+ _, ok := newvol.AccessViaHosts[arvados.URL{Scheme: "http", Host: fmt.Sprintf("%s:%d", hostname, port)}]
+ c.Check(ok, check.Equals, true)
+}
+
+const clusterConfigForKeepstoreMigrationTest = `
+Clusters:
+ zzzzz:
+ SystemRootToken: ` + arvadostest.AdminToken + `
+ Services:
+ Keepstore:
+ InternalURLs:
+ "http://{{.hostname}}:12345": {}
+ Controller:
+ ExternalURL: "https://{{.controller}}"
+ TLS:
+ Insecure: true
+ Volumes:
+
+ zzzzz-nyw5e-alreadymigrated:
+ AccessViaHosts:
+ "http://{{.hostname}}:12345": {}
+ Driver: S3
+ DriverParameters:
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: alreadymigrated
+ Replication: 3
+
+ zzzzz-nyw5e-readonlyonother:
+ AccessViaHosts:
+ "http://other.host.example:12345": {ReadOnly: true}
+ Driver: S3
+ DriverParameters:
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: readonlyonother
+ Replication: 3
+
+ zzzzz-nyw5e-writableonother:
+ AccessViaHosts:
+ "http://other.host.example:12345": {}
+ Driver: S3
+ DriverParameters:
+ Endpoint: https://storage.googleapis.com
+ Region: us-east-1z
+ Bucket: writableonother
+ Replication: 3
+
+ zzzzz-nyw5e-localfilesystem:
+ Driver: Directory
+ DriverParameters:
+ Root: /data/sdd
+ Replication: 1
+
+ zzzzz-nyw5e-localismigrated:
+ AccessViaHosts:
+ "http://{{.hostname}}:12345": {}
+ Driver: Directory
+ DriverParameters:
+ Root: /data/sde
+ Replication: 1
+`
+
+// Determine the effect of combining the given legacy keepstore config
+// YAML (just the "Volumes" entries of an old keepstore config file)
+// with the example clusterConfigForKeepstoreMigrationTest config.
+//
+// Return two Volumes configs -- one without loading
+// keepstoreconfigdata ("before") and one with ("after") -- for the
+// caller to compare.
+func (s *KeepstoreMigrationSuite) loadWithKeepstoreConfig(c *check.C, keepstoreVolumesYAML string) (before, after map[string]arvados.Volume) {
+ ldr := testLoader(c, s.clusterConfigYAML(c), nil)
+ cBefore, err := ldr.Load()
+ c.Assert(err, check.IsNil)
+
+ keepstoreconfig, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(keepstoreconfig.Name())
+ io.WriteString(keepstoreconfig, keepstoreVolumesYAML)
+
+ ldr = testLoader(c, s.clusterConfigYAML(c), nil)
+ ldr.KeepstorePath = keepstoreconfig.Name()
+ cAfter, err := ldr.Load()
+ c.Assert(err, check.IsNil)
+
+ return cBefore.Clusters["zzzzz"].Volumes, cAfter.Clusters["zzzzz"].Volumes
+}
+
+func (s *KeepstoreMigrationSuite) clusterConfigYAML(c *check.C) string {
+ hostname, err := os.Hostname()
+ c.Assert(err, check.IsNil)
+
+ tmpl := template.Must(template.New("config").Parse(clusterConfigForKeepstoreMigrationTest))
+
+ var clusterconfigdata bytes.Buffer
+ err = tmpl.Execute(&clusterconfigdata, map[string]interface{}{
+ "hostname": hostname,
+ "controller": os.Getenv("ARVADOS_API_HOST"),
+ })
+ c.Assert(err, check.IsNil)
+
+ return clusterconfigdata.String()
+}
+
+// Return the uuids of volumes that appear in "after" but not
+// "before".
+//
+// Assert the returned slice has at least minAdded entries.
+func (s *KeepstoreMigrationSuite) findAddedVolumes(c *check.C, before, after map[string]arvados.Volume, minAdded int) (uuids []string) {
+ for uuid := range after {
+ if _, ok := before[uuid]; !ok {
+ uuids = append(uuids, uuid)
+ }
+ }
+ if len(uuids) < minAdded {
+ c.Assert(uuids, check.HasLen, minAdded)
+ }
+ return
+}
+
+func (s *KeepstoreMigrationSuite) getTestKeepstorePortAndMatchingVolumeUUID(c *check.C) (int, string) {
+ for uuid, port := range s.getTestKeepstorePorts(c) {
+ c.Assert(uuid, check.HasLen, 27)
+ return port, "zzzzz-nyw5e-" + uuid[12:]
+ }
+ c.Fatal("getTestKeepstorePorts() returned nothing")
+ return 0, ""
+}
+
+func (s *KeepstoreMigrationSuite) getTestKeepstorePorts(c *check.C) map[string]int {
+ client := arvados.NewClientFromEnv()
+ var svcList arvados.KeepServiceList
+ err := client.RequestAndDecode(&svcList, "GET", "arvados/v1/keep_services", nil, nil)
+ c.Assert(err, check.IsNil)
+ ports := map[string]int{}
+ for _, ks := range svcList.Items {
+ if ks.ServiceType == "disk" {
+ ports[ks.UUID] = ks.ServicePort
+ }
+ }
+ return ports
+}
+
+func (s *KeepstoreMigrationSuite) TestKeepServiceIsMe(c *check.C) {
+ for i, trial := range []struct {
+ match bool
+ hostname string
+ listen string
+ serviceHost string
+ servicePort int
+ }{
+ {true, "keep0", "keep0", "keep0", 80},
+ {true, "keep0", "[::1]:http", "keep0", 80},
+ {true, "keep0", "[::]:http", "keep0", 80},
+ {true, "keep0", "keep0:25107", "keep0", 25107},
+ {true, "keep0", ":25107", "keep0", 25107},
+ {true, "keep0.domain", ":25107", "keep0.domain.example", 25107},
+ {true, "keep0.domain.example", ":25107", "keep0.domain.example", 25107},
+ {true, "keep0", ":25107", "keep0.domain.example", 25107},
+ {true, "keep0", ":25107", "Keep0.domain.example", 25107},
+ {true, "keep0", ":http", "keep0.domain.example", 80},
+ {true, "keep0", ":25107", "localhost", 25107},
+ {true, "keep0", ":25107", "::1", 25107},
+ {false, "keep0", ":25107", "keep0", 1111}, // different port
+ {false, "keep0", ":25107", "localhost", 1111}, // different port
+ {false, "keep0", ":http", "keep0.domain.example", 443}, // different port
+ {false, "keep0", ":bogussss", "keep0", 25107}, // unresolvable port
+ {false, "keep0", ":25107", "keep1", 25107}, // different hostname
+ {false, "keep1", ":25107", "keep10", 25107}, // different hostname (prefix, but not on a "." boundary)
+ } {
+ c.Check(keepServiceIsMe(arvados.KeepService{ServiceHost: trial.serviceHost, ServicePort: trial.servicePort}, trial.hostname, trial.listen), check.Equals, trial.match, check.Commentf("trial #%d: %#v", i, trial))
+ }
+}
diff --git a/lib/config/deprecated_test.go b/lib/config/deprecated_test.go
index 308b0cc35..55bb2193b 100644
--- a/lib/config/deprecated_test.go
+++ b/lib/config/deprecated_test.go
@@ -13,7 +13,7 @@ import (
func (s *LoadSuite) TestDeprecatedNodeProfilesToServices(c *check.C) {
hostname, err := os.Hostname()
c.Assert(err, check.IsNil)
- s.checkEquivalent(c, `
+ checkEquivalent(c, `
Clusters:
z1111:
NodeProfiles:
diff --git a/lib/config/export.go b/lib/config/export.go
index b125d7dc9..a349db516 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -63,8 +63,10 @@ var whitelist = map[string]bool{
"API": true,
"API.AsyncPermissionsUpdateInterval": false,
"API.DisabledAPIs": false,
+ "API.MaxConcurrentRequests": false,
"API.MaxIndexDatabaseRead": false,
"API.MaxItemsPerResponse": true,
+ "API.MaxKeepBlockBuffers": false,
"API.MaxRequestAmplification": false,
"API.MaxRequestSize": true,
"API.RailsSessionSecretToken": false,
@@ -80,6 +82,12 @@ var whitelist = map[string]bool{
"Collections.BlobSigning": true,
"Collections.BlobSigningKey": false,
"Collections.BlobSigningTTL": true,
+ "Collections.BlobTrash": false,
+ "Collections.BlobTrashLifetime": false,
+ "Collections.BlobTrashConcurrency": false,
+ "Collections.BlobTrashCheckInterval": false,
+ "Collections.BlobDeleteConcurrency": false,
+ "Collections.BlobReplicateConcurrency": false,
"Collections.CollectionVersioning": false,
"Collections.DefaultReplication": true,
"Collections.DefaultTrashLifetime": true,
@@ -153,6 +161,16 @@ var whitelist = map[string]bool{
"Users.NewUsersAreActive": false,
"Users.UserNotifierEmailFrom": false,
"Users.UserProfileNotificationAddress": false,
+ "Volumes": true,
+ "Volumes.*": true,
+ "Volumes.*.*": false,
+ "Volumes.*.AccessViaHosts": true,
+ "Volumes.*.AccessViaHosts.*": true,
+ "Volumes.*.AccessViaHosts.*.ReadOnly": true,
+ "Volumes.*.ReadOnly": true,
+ "Volumes.*.Replication": true,
+ "Volumes.*.StorageClasses": true,
+ "Volumes.*.StorageClasses.*": false,
"Workbench": true,
"Workbench.ActivationContactLink": false,
"Workbench.APIClientConnectTimeout": true,
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index 602f30e1d..c7d1ee85f 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -182,6 +182,15 @@ Clusters:
# parameter higher than this value, this value is used instead.
MaxItemsPerResponse: 1000
+ # Maximum number of concurrent requests to accept in a single
+ # service process, or 0 for no limit. Currently supported only
+ # by keepstore.
+ MaxConcurrentRequests: 0
+
+ # Maximum number of 64MiB memory buffers per keepstore server
+ # process, or 0 for no limit.
+ MaxKeepBlockBuffers: 128
+
# API methods to disable. Disabled methods are not listed in the
# discovery document, and respond 404 to all requests.
# Example: {"jobs.create":{}, "pipeline_instances.create": {}}
@@ -319,15 +328,44 @@ Clusters:
# BlobSigningKey is a string of alphanumeric characters used to
# generate permission signatures for Keep locators. It must be
- # identical to the permission key given to Keep. IMPORTANT: This is
- # a site secret. It should be at least 50 characters.
+ # identical to the permission key given to Keep. IMPORTANT: This
+ # is a site secret. It should be at least 50 characters.
#
- # Modifying blob_signing_key will invalidate all existing
+ # Modifying BlobSigningKey will invalidate all existing
# signatures, which can cause programs to fail (e.g., arv-put,
- # arv-get, and Crunch jobs). To avoid errors, rotate keys only when
- # no such processes are running.
+ # arv-get, and Crunch jobs). To avoid errors, rotate keys only
+ # when no such processes are running.
BlobSigningKey: ""
+ # Enable garbage collection of unreferenced blobs in Keep.
+ BlobTrash: true
+
+ # Time to leave unreferenced blobs in "trashed" state before
+ # deleting them, or 0 to skip the "trashed" state entirely and
+ # delete unreferenced blobs.
+ #
+ # If you use any Amazon S3 buckets as storage volumes, this
+ # must be at least 24h to avoid occasional data loss.
+ BlobTrashLifetime: 336h
+
+ # How often to check for (and delete) trashed blocks whose
+ # BlobTrashLifetime has expired.
+ BlobTrashCheckInterval: 24h
+
+ # Maximum number of concurrent "trash blob" and "delete trashed
+ # blob" operations conducted by a single keepstore process. Each
+ # of these can be set to 0 to disable the respective operation.
+ #
+ # If BlobTrashLifetime is zero, "trash" and "delete trash"
+ # happen at once, so only the lower of these two values is used.
+ BlobTrashConcurrency: 4
+ BlobDeleteConcurrency: 4
+
+ # Maximum number of concurrent "create additional replica of
+ # existing blob" operations conducted by a single keepstore
+ # process.
+ BlobReplicateConcurrency: 4
+
# Default replication level for collections. This is used when a
# collection's replication_desired attribute is nil.
DefaultReplication: 2
@@ -736,6 +774,44 @@ Clusters:
Price: 0.1
Preemptible: false
+ Volumes:
+ SAMPLE:
+ AccessViaHosts:
+ SAMPLE:
+ ReadOnly: false
+ ReadOnly: false
+ Replication: 1
+ StorageClasses:
+ default: true
+ SAMPLE: true
+ Driver: s3
+ DriverParameters:
+
+ # for s3 driver
+ AccessKey: aaaaa
+ SecretKey: aaaaa
+ Endpoint: ""
+ Region: us-east-1a
+ Bucket: aaaaa
+ LocationConstraint: false
+ IndexPageSize: 1000
+ ConnectTimeout: 1m
+ ReadTimeout: 10m
+ RaceWindow: 24h
+ UnsafeDelete: false
+
+ # for azure driver
+ StorageAccountName: aaaaa
+ StorageAccountKey: aaaaa
+ StorageBaseURL: ""
+ ContainerName: aaaaa
+ RequestTimeout: 30s
+ ListBlobsRetryDelay: 10s
+ ListBlobsMaxAttempts: 10
+
+ # for local directory driver
+ Root: /var/lib/arvados/keep-data
+
Mail:
MailchimpAPIKey: ""
MailchimpListID: ""
diff --git a/lib/config/load_test.go b/lib/config/load_test.go
index c7289350e..17e0af7ba 100644
--- a/lib/config/load_test.go
+++ b/lib/config/load_test.go
@@ -321,7 +321,7 @@ Clusters:
}
func (s *LoadSuite) TestMovedKeys(c *check.C) {
- s.checkEquivalent(c, `# config has old keys only
+ checkEquivalent(c, `# config has old keys only
Clusters:
zzzzz:
RequestLimits:
@@ -334,7 +334,7 @@ Clusters:
MaxRequestAmplification: 3
MaxItemsPerResponse: 999
`)
- s.checkEquivalent(c, `# config has both old and new keys; old values win
+ checkEquivalent(c, `# config has both old and new keys; old values win
Clusters:
zzzzz:
RequestLimits:
@@ -352,30 +352,45 @@ Clusters:
`)
}
-func (s *LoadSuite) checkEquivalent(c *check.C, goty, expectedy string) {
- got, err := testLoader(c, goty, nil).Load()
+func checkEquivalent(c *check.C, goty, expectedy string) {
+ gotldr := testLoader(c, goty, nil)
+ expectedldr := testLoader(c, expectedy, nil)
+ checkEquivalentLoaders(c, gotldr, expectedldr)
+}
+
+func checkEqualYAML(c *check.C, got, expected interface{}) {
+ expectedyaml, err := yaml.Marshal(expected)
c.Assert(err, check.IsNil)
- expected, err := testLoader(c, expectedy, nil).Load()
+ gotyaml, err := yaml.Marshal(got)
c.Assert(err, check.IsNil)
- if !c.Check(got, check.DeepEquals, expected) {
+ if !bytes.Equal(gotyaml, expectedyaml) {
cmd := exec.Command("diff", "-u", "--label", "expected", "--label", "got", "/dev/fd/3", "/dev/fd/4")
- for _, obj := range []interface{}{expected, got} {
- y, _ := yaml.Marshal(obj)
+ for _, y := range [][]byte{expectedyaml, gotyaml} {
pr, pw, err := os.Pipe()
c.Assert(err, check.IsNil)
defer pr.Close()
- go func() {
- io.Copy(pw, bytes.NewBuffer(y))
+ go func(data []byte) {
+ pw.Write(data)
pw.Close()
- }()
+ }(y)
cmd.ExtraFiles = append(cmd.ExtraFiles, pr)
}
diff, err := cmd.CombinedOutput()
+ // diff should report differences and exit non-zero.
+ c.Check(err, check.NotNil)
c.Log(string(diff))
- c.Check(err, check.IsNil)
+ c.Error("got != expected; see diff (-expected +got) above")
}
}
+func checkEquivalentLoaders(c *check.C, gotldr, expectedldr *Loader) {
+ got, err := gotldr.Load()
+ c.Assert(err, check.IsNil)
+ expected, err := expectedldr.Load()
+ c.Assert(err, check.IsNil)
+ checkEqualYAML(c, got, expected)
+}
+
func checkListKeys(path string, x interface{}) (err error) {
v := reflect.Indirect(reflect.ValueOf(x))
switch v.Kind() {
diff --git a/lib/service/cmd.go b/lib/service/cmd.go
index b6737bc55..abb315a8d 100644
--- a/lib/service/cmd.go
+++ b/lib/service/cmd.go
@@ -68,7 +68,6 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
loader := config.NewLoader(stdin, log)
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
-
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index f6b736d58..7cfc94279 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -72,6 +72,8 @@ type Cluster struct {
DisabledAPIs StringSet
MaxIndexDatabaseRead int
MaxItemsPerResponse int
+ MaxConcurrentRequests int
+ MaxKeepBlockBuffers int
MaxRequestAmplification int
MaxRequestSize int
RailsSessionSecretToken string
@@ -86,13 +88,19 @@ type Cluster struct {
UnloggedAttributes StringSet
}
Collections struct {
- BlobSigning bool
- BlobSigningKey string
- BlobSigningTTL Duration
- CollectionVersioning bool
- DefaultTrashLifetime Duration
- DefaultReplication int
- ManagedProperties map[string]struct {
+ BlobSigning bool
+ BlobSigningKey string
+ BlobSigningTTL Duration
+ BlobTrash bool
+ BlobTrashLifetime Duration
+ BlobTrashCheckInterval Duration
+ BlobTrashConcurrency int
+ BlobDeleteConcurrency int
+ BlobReplicateConcurrency int
+ CollectionVersioning bool
+ DefaultTrashLifetime Duration
+ DefaultReplication int
+ ManagedProperties map[string]struct {
Value interface{}
Function string
Protected bool
@@ -143,6 +151,7 @@ type Cluster struct {
UserNotifierEmailFrom string
UserProfileNotificationAddress string
}
+ Volumes map[string]Volume
Workbench struct {
ActivationContactLink string
APIClientConnectTimeout Duration
@@ -182,6 +191,48 @@ type Cluster struct {
EnableBetaController14287 bool
}
+type Volume struct {
+ AccessViaHosts map[URL]VolumeAccess
+ ReadOnly bool
+ Replication int
+ StorageClasses map[string]bool
+ Driver string
+ DriverParameters json.RawMessage
+}
+
+type S3VolumeDriverParameters struct {
+ AccessKey string
+ SecretKey string
+ Endpoint string
+ Region string
+ Bucket string
+ LocationConstraint bool
+ IndexPageSize int
+ ConnectTimeout Duration
+ ReadTimeout Duration
+ RaceWindow Duration
+ UnsafeDelete bool
+}
+
+type AzureVolumeDriverParameters struct {
+ StorageAccountName string
+ StorageAccountKey string
+ StorageBaseURL string
+ ContainerName string
+ RequestTimeout Duration
+ ListBlobsRetryDelay Duration
+ ListBlobsMaxAttempts int
+}
+
+type DirectoryVolumeDriverParameters struct {
+ Root string
+ Serialize bool
+}
+
+type VolumeAccess struct {
+ ReadOnly bool
+}
+
type Services struct {
Composer Service
Controller Service
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 3c17b3bd0..5e207cdf9 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -8,10 +8,10 @@ import (
"bytes"
"context"
"errors"
- "flag"
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"os"
"regexp"
@@ -53,52 +53,8 @@ func readKeyFromFile(file string) (string, error) {
return accountKey, nil
}
-type azureVolumeAdder struct {
- *Config
-}
-
-// String implements flag.Value
-func (s *azureVolumeAdder) String() string {
- return "-"
-}
-
-func (s *azureVolumeAdder) Set(containerName string) error {
- s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
- ContainerName: containerName,
- StorageAccountName: azureStorageAccountName,
- StorageAccountKeyFile: azureStorageAccountKeyFile,
- AzureReplication: azureStorageReplication,
- ReadOnly: deprecated.flagReadonly,
- })
- return nil
-}
-
func init() {
VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
-
- flag.Var(&azureVolumeAdder{theConfig},
- "azure-storage-container-volume",
- "Use the given container as a storage volume. Can be given multiple times.")
- flag.StringVar(
- &azureStorageAccountName,
- "azure-storage-account-name",
- "",
- "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
- flag.StringVar(
- &azureStorageAccountKeyFile,
- "azure-storage-account-key-file",
- "",
- "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
- flag.IntVar(
- &azureStorageReplication,
- "azure-storage-replication",
- 3,
- "Replication level to report to clients when data is stored in an Azure container.")
- flag.IntVar(
- &azureMaxGetBytes,
- "azure-max-get-bytes",
- BlockSize,
- fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
}
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
diff --git a/services/keepstore/command.go b/services/keepstore/command.go
new file mode 100644
index 000000000..db65fc361
--- /dev/null
+++ b/services/keepstore/command.go
@@ -0,0 +1,224 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+
+ "git.curoverse.com/arvados.git/lib/service"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+)
+
+var (
+ version = "dev"
+ Command = service.Command(arvados.ServiceNameKeepstore, newHandler)
+)
+
+func main() {
+ os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
+
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ args, ok := convertKeepstoreFlagsToServiceFlags(args, ctxlog.FromContext(context.Background()))
+ if !ok {
+ return 2
+ }
+ return Command.RunCommand(prog, args, stdin, stdout, stderr)
+}
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string) service.Handler {
+ return &handler{
+ Cluster: cluster,
+ Logger: ctxlog.FromContext(ctx),
+ }
+}
+
+// Parse keepstore command line flags, and return equivalent
+// service.Command flags. The second return value ("ok") is true if
+// all provided flags were successfully converted.
+func convertKeepstoreFlagsToServiceFlags(args []string, lgr logrus.FieldLogger) ([]string, bool) {
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.String("listen", cfg.Listen, "Services.Keepstore.InternalURLs")
+ flags.Int("max-buffers", cfg.MaxBuffers, "API.MaxKeepBlockBuffers")
+ flags.Int("max-requests", cfg.MaxRequests, "API.MaxConcurrentRequests")
+ flags.Bool("never-delete", depr.neverDelete, "Collections.BlobTrash")
+ flags.Bool("enforce-permissions", cfg.RequireSignatures, "Collections.BlobSigning")
+ flags.String("permission-key-file", cfg.BlobSigningKeyFile, "Collections.BlobSigningKey")
+ flags.String("blob-signing-key-file", cfg.BlobSigningKeyFile, "Collections.BlobSigningKey")
+ flags.String("data-manager-token-file", cfg.SystemAuthTokenFile, "SystemRootToken")
+ flags.Int("permission-ttl", depr.signatureTTLSeconds, "Collections.BlobSigningTTL")
+ flags.Int("blob-signature-ttl", depr.signatureTTLSeconds, "Collections.BlobSigningTTL")
+ flags.String("trash-lifetime", "Collections.BlobTrashLifetime")
+ flags.Bool("serialize", depr.flagSerializeIO, "Volumes.*.DriverParameters.Serialize")
+ flags.Bool("readonly", depr.flagReadonly, "Volumes.*.ReadOnly")
+ flags.String("pid", cfg.PIDFile, "-")
+ flags.String("trash-check-interval", "Collections.BlobTrashCheckInterval")
+
+ flags.String("azure-storage-container-volume", "Volumes.*.Driver")
+ flags.String("azure-storage-account-name", "Volumes.*.DriverParameters.StorageAccountName")
+ flags.String("azure-storage-account-key-file", "Volumes.*.DriverParameters.StorageAccountKey")
+ flags.String("azure-storage-replication", "Volumes.*.Replication")
+ flags.String("azure-max-get-bytes", "Volumes.*.DriverParameters.MaxDataReadSize")
+
+ flag.String("s3-bucket-volume", "Volumes.*.DriverParameters.Bucket")
+ flag.String("s3-region", "Volumes.*.DriverParameters.Region")
+ flag.String("s3-endpoint", "Volumes.*.DriverParameters.Endpoint")
+ flag.String("s3-access-key-file", "Volumes.*.DriverParameters.AccessKey")
+ flag.String("s3-secret-key-file", "Volumes.*.DriverParameters.SecretKey")
+ flag.String("s3-race-window", "Volumes.*.DriverParameters.RaceWindow")
+ flag.String("s3-replication", "Volumes.*.Replication")
+ flag.String("s3-unsafe-delete", "Volumes.*.DriverParameters.UnsafeDelete")
+
+ err := flags.Parse(args)
+ if err == flag.ErrHelp {
+ return []string{"-help"}, true
+ } else if err != nil {
+ return nil, false
+ }
+
+ args = nil
+ ok := true
+ flags.Visit(func(f *flag.Flag) {
+ if f.Name == "config" {
+ args = []string{"-legacy-keepstore-config-file", f.Value.String()}
+ } else if f.Usage == "-" {
+ ok = false
+ lgr.Errorf("command line flag -%s is no longer supported", f.Name)
+ } else {
+ ok = false
+ lgr.Errorf("command line flag -%s is no longer supported -- use Clusters.*.%s in cluster config file instead", f.Name, f.Usage)
+ }
+ })
+ return args, ok
+}
+
+type handler struct {
+ Cluster *arvados.Cluster
+ Logger logrus.FieldLogger
+
+ volumes []Volume
+ volmgr VolumeManager
+
+ err error
+ setupOnce sync.Once
+}
+
+func (h *handler) CheckHealth() error {
+ h.setupOnce.Do(h.setup)
+ return h.err
+}
+func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ h.setupOnce.Do(h.setup)
+}
+func (h *handler) setup() {
+ reg := prometheus.NewRegistry()
+
+ if cfg.MaxBuffers < 0 {
+ return fmt.Errorf("MaxBuffers must be greater than zero")
+ }
+ bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
+
+ if h.Cluster.API.MaxConcurrentRequests < 1 {
+ h.Cluster.API.MaxConcurrentRequests = h.Cluster.API.MaxKeepBlockBuffers * 2
+ h.Logger.Warnf("MaxRequests <1 or not specified; defaulting to MaxKeepBlockBuffers * 2 == %d", h.Cluster.API.MaxConcurrentRequests)
+ }
+
+ if h.Cluster.Collections.BlobSigningKey != "" {
+ } else if h.Cluster.Collections.BlobSigning {
+ h.err = errors.New("cannot enable Collections.BlobSigning with no Collections.BlobSigningKey")
+ return
+ } else {
+ h.Logger.Warn("Running without a blob signing key. Block locators returned by this server will not be signed, and will be rejected by a server that enforces permissions. To fix this, configure Collections.BlobSigning and Collections.BlobSigningKey.")
+ }
+
+ if len(h.volumes) == 0 {
+ if (&unixVolumeAdder{cfg}).Discover() == 0 {
+ h.err = errors.New("no volumes found")
+ return
+ }
+ }
+ vm := newVolumeMetricsVecs(reg)
+ for _, v := range h.volumes {
+ if err := v.Start(vm); err != nil {
+ h.err = fmt.Errorf("volume %s: %s", v, err)
+ return
+ }
+ h.Logger.Printf("Using volume %v (writable=%v)", v, v.Writable())
+ }
+
+ log.Printf("keepstore %s starting, pid %d", version, os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
+
+ // Start a round-robin VolumeManager with the volumes we have found.
+ h.volmgr = MakeRRVolumeManager(h.volumes)
+
+ // Middleware/handler stack
+ router := MakeRESTRouter(cluster, reg)
+
+ // Set up a TCP listener.
+ listener, err := net.Listen("tcp", theConfig.Listen)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Initialize keepclient for pull workers
+ keepClient := &keepclient.KeepClient{
+ Arvados: &arvadosclient.ArvadosClient{},
+ Want_replicas: 1,
+ }
+
+ // Initialize the pullq and workers
+ pullq = NewWorkQueue()
+ for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
+ go RunPullWorker(pullq, keepClient)
+ }
+
+ // Initialize the trashq and workers
+ trashq = NewWorkQueue()
+ for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
+ go RunTrashWorker(trashq)
+ }
+
+ // Start emptyTrash goroutine
+ doneEmptyingTrash := make(chan bool)
+ go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
+
+ // Shut down the server gracefully (by closing the listener)
+ // if SIGTERM is received.
+ term := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ s := <-sig
+ log.Println("caught signal:", s)
+ doneEmptyingTrash <- true
+ listener.Close()
+ }(term)
+ signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
+
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
+ log.Printf("Error notifying init daemon: %v", err)
+ }
+ log.Println("listening at", listener.Addr())
+ srv := &server{}
+ srv.Handler = router
+ srv.Serve(listener)
+}
diff --git a/services/keepstore/command_test.go b/services/keepstore/command_test.go
new file mode 100644
index 000000000..a7a403da7
--- /dev/null
+++ b/services/keepstore/command_test.go
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "bytes"
+ "io/ioutil"
+ "os"
+
+ "gopkg.in/check.v1"
+)
+
+var _ = Suite(&CommandSuite{})
+
+type CommandSuite struct{}
+
+func (*CommandSuite) TestLegacyConfigPath(c *check.C) {
+ var stdin, stdout, stderr bytes.Buffer
+ tmp, err := ioutil.TempFile("", "")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(tmp.Name())
+ tmp.Write([]byte("Listen: \"1.2.3.4.5:invalidport\"\n"))
+ tmp.Close()
+ exited := runCommand("keepstore", []string{"-config", tmp.Name()}, &stdin, &stdout, &stderr)
+ c.Check(exited, check.Equals, 1)
+ c.Check(stderr.String(), check.Matches, `(?ms).*unable to migrate Listen value.*`)
+}
diff --git a/services/keepstore/config.go b/services/keepstore/config.go
index 43a219111..0ca10970b 100644
--- a/services/keepstore/config.go
+++ b/services/keepstore/config.go
@@ -5,155 +5,13 @@
package main
import (
- "bytes"
"encoding/json"
"fmt"
- "io/ioutil"
- "strings"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
-)
-
-type Config struct {
- Debug bool
- Listen string
-
- LogFormat string
-
- PIDFile string
-
- MaxBuffers int
- MaxRequests int
-
- BlobSignatureTTL arvados.Duration
- BlobSigningKeyFile string
- RequireSignatures bool
- SystemAuthTokenFile string
- EnableDelete bool
- TrashLifetime arvados.Duration
- TrashCheckInterval arvados.Duration
- PullWorkers int
- TrashWorkers int
- EmptyTrashWorkers int
- TLSCertificateFile string
- TLSKeyFile string
-
- Volumes VolumeList
-
- blobSigningKey []byte
- systemAuthToken string
- debugLogf func(string, ...interface{})
-
- ManagementToken string
-}
-
-var (
- theConfig = DefaultConfig()
- formatter = map[string]logrus.Formatter{
- "text": &logrus.TextFormatter{
- FullTimestamp: true,
- TimestampFormat: rfc3339NanoFixed,
- },
- "json": &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- },
- }
- log = logrus.StandardLogger()
+ "log"
)
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-// DefaultConfig returns the default configuration.
-func DefaultConfig() *Config {
- return &Config{
- Listen: ":25107",
- LogFormat: "json",
- MaxBuffers: 128,
- RequireSignatures: true,
- BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
- TrashLifetime: arvados.Duration(14 * 24 * time.Hour),
- TrashCheckInterval: arvados.Duration(24 * time.Hour),
- Volumes: []Volume{},
- }
-}
-
-// Start should be called exactly once: after setting all public
-// fields, and before using the config.
-func (cfg *Config) Start(reg *prometheus.Registry) error {
- if cfg.Debug {
- log.Level = logrus.DebugLevel
- cfg.debugLogf = log.Printf
- cfg.debugLogf("debugging enabled")
- } else {
- log.Level = logrus.InfoLevel
- cfg.debugLogf = func(string, ...interface{}) {}
- }
-
- f := formatter[strings.ToLower(cfg.LogFormat)]
- if f == nil {
- return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
- }
- log.Formatter = f
-
- if cfg.MaxBuffers < 0 {
- return fmt.Errorf("MaxBuffers must be greater than zero")
- }
- bufs = newBufferPool(cfg.MaxBuffers, BlockSize)
-
- if cfg.MaxRequests < 1 {
- cfg.MaxRequests = cfg.MaxBuffers * 2
- log.Printf("MaxRequests <1 or not specified; defaulting to MaxBuffers * 2 == %d", cfg.MaxRequests)
- }
-
- if cfg.BlobSigningKeyFile != "" {
- buf, err := ioutil.ReadFile(cfg.BlobSigningKeyFile)
- if err != nil {
- return fmt.Errorf("reading blob signing key file: %s", err)
- }
- cfg.blobSigningKey = bytes.TrimSpace(buf)
- if len(cfg.blobSigningKey) == 0 {
- return fmt.Errorf("blob signing key file %q is empty", cfg.BlobSigningKeyFile)
- }
- } else if cfg.RequireSignatures {
- return fmt.Errorf("cannot enable RequireSignatures (-enforce-permissions) without a blob signing key")
- } else {
- log.Println("Running without a blob signing key. Block locators " +
- "returned by this server will not be signed, and will be rejected " +
- "by a server that enforces permissions.")
- log.Println("To fix this, use the BlobSigningKeyFile config entry.")
- }
-
- if fn := cfg.SystemAuthTokenFile; fn != "" {
- buf, err := ioutil.ReadFile(fn)
- if err != nil {
- return fmt.Errorf("cannot read system auth token file %q: %s", fn, err)
- }
- cfg.systemAuthToken = strings.TrimSpace(string(buf))
- }
-
- if cfg.EnableDelete {
- log.Print("Trash/delete features are enabled. WARNING: this has not " +
- "been extensively tested. You should disable this unless you can afford to lose data.")
- }
-
- if len(cfg.Volumes) == 0 {
- if (&unixVolumeAdder{cfg}).Discover() == 0 {
- return fmt.Errorf("no volumes found")
- }
- }
- vm := newVolumeMetricsVecs(reg)
- for _, v := range cfg.Volumes {
- if err := v.Start(vm); err != nil {
- return fmt.Errorf("volume %s: %s", v, err)
- }
- log.Printf("Using volume %v (writable=%v)", v, v.Writable())
- }
- return nil
-}
-
// VolumeTypes is built up by init() funcs in the source files that
// define the volume types.
var VolumeTypes = []func() VolumeWithExamples{}
diff --git a/services/keepstore/deprecated.go b/services/keepstore/deprecated.go
index d1377978a..1fd8a0668 100644
--- a/services/keepstore/deprecated.go
+++ b/services/keepstore/deprecated.go
@@ -3,45 +3,3 @@
// SPDX-License-Identifier: AGPL-3.0
package main
-
-import (
- "flag"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/arvados"
-)
-
-type deprecatedOptions struct {
- flagSerializeIO bool
- flagReadonly bool
- neverDelete bool
- signatureTTLSeconds int
-}
-
-var deprecated = deprecatedOptions{
- neverDelete: !theConfig.EnableDelete,
- signatureTTLSeconds: int(theConfig.BlobSignatureTTL.Duration() / time.Second),
-}
-
-func (depr *deprecatedOptions) beforeFlagParse(cfg *Config) {
- flag.StringVar(&cfg.Listen, "listen", cfg.Listen, "see Listen configuration")
- flag.IntVar(&cfg.MaxBuffers, "max-buffers", cfg.MaxBuffers, "see MaxBuffers configuration")
- flag.IntVar(&cfg.MaxRequests, "max-requests", cfg.MaxRequests, "see MaxRequests configuration")
- flag.BoolVar(&depr.neverDelete, "never-delete", depr.neverDelete, "see EnableDelete configuration")
- flag.BoolVar(&cfg.RequireSignatures, "enforce-permissions", cfg.RequireSignatures, "see RequireSignatures configuration")
- flag.StringVar(&cfg.BlobSigningKeyFile, "permission-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
- flag.StringVar(&cfg.BlobSigningKeyFile, "blob-signing-key-file", cfg.BlobSigningKeyFile, "see BlobSigningKey`File` configuration")
- flag.StringVar(&cfg.SystemAuthTokenFile, "data-manager-token-file", cfg.SystemAuthTokenFile, "see SystemAuthToken`File` configuration")
- flag.IntVar(&depr.signatureTTLSeconds, "permission-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
- flag.IntVar(&depr.signatureTTLSeconds, "blob-signature-ttl", depr.signatureTTLSeconds, "signature TTL in seconds; see BlobSignatureTTL configuration")
- flag.Var(&cfg.TrashLifetime, "trash-lifetime", "see TrashLifetime configuration")
- flag.BoolVar(&depr.flagSerializeIO, "serialize", depr.flagSerializeIO, "serialize read and write operations on the following volumes.")
- flag.BoolVar(&depr.flagReadonly, "readonly", depr.flagReadonly, "do not write, delete, or touch anything on the following volumes.")
- flag.StringVar(&cfg.PIDFile, "pid", cfg.PIDFile, "see `PIDFile` configuration")
- flag.Var(&cfg.TrashCheckInterval, "trash-check-interval", "see TrashCheckInterval configuration")
-}
-
-func (depr *deprecatedOptions) afterFlagParse(cfg *Config) {
- cfg.BlobSignatureTTL = arvados.Duration(depr.signatureTTLSeconds) * arvados.Duration(time.Second)
- cfg.EnableDelete = !depr.neverDelete
-}
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index fcbdddacb..d82d0f913 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -5,24 +5,9 @@
package main
import (
- "flag"
- "fmt"
- "net"
- "os"
- "os/signal"
- "syscall"
"time"
-
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "github.com/coreos/go-systemd/daemon"
- "github.com/prometheus/client_golang/prometheus"
)
-var version = "dev"
-
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
@@ -87,147 +72,6 @@ var KeepVM VolumeManager
var pullq *WorkQueue
var trashq *WorkQueue
-func main() {
- deprecated.beforeFlagParse(theConfig)
-
- dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
-
- defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
- var configPath string
- flag.StringVar(
- &configPath,
- "config",
- defaultConfigPath,
- "YAML or JSON configuration file `path`")
- flag.Usage = usage
- flag.Parse()
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keepstore %s\n", version)
- return
- }
-
- deprecated.afterFlagParse(theConfig)
-
- err := config.LoadFile(theConfig, configPath)
- if err != nil && (!os.IsNotExist(err) || configPath != defaultConfigPath) {
- log.Fatal(err)
- }
-
- if *dumpConfig {
- log.Fatal(config.DumpAndExit(theConfig))
- }
-
- log.Printf("keepstore %s started", version)
-
- metricsRegistry := prometheus.NewRegistry()
-
- err = theConfig.Start(metricsRegistry)
- if err != nil {
- log.Fatal(err)
- }
-
- if pidfile := theConfig.PIDFile; pidfile != "" {
- f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
- if err != nil {
- log.Fatalf("open pidfile (%s): %s", pidfile, err)
- }
- defer f.Close()
- err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
- if err != nil {
- log.Fatalf("flock pidfile (%s): %s", pidfile, err)
- }
- defer os.Remove(pidfile)
- err = f.Truncate(0)
- if err != nil {
- log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
- }
- _, err = fmt.Fprint(f, os.Getpid())
- if err != nil {
- log.Fatalf("write pidfile (%s): %s", pidfile, err)
- }
- err = f.Sync()
- if err != nil {
- log.Fatalf("sync pidfile (%s): %s", pidfile, err)
- }
- }
-
- var cluster *arvados.Cluster
- cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
- if err != nil && os.IsNotExist(err) {
- log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
- cluster = &arvados.Cluster{
- ClusterID: "xxxxx",
- }
- } else if err != nil {
- log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
- } else {
- cluster, err = cfg.GetCluster("")
- if err != nil {
- log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
- }
- }
-
- log.Println("keepstore starting, pid", os.Getpid())
- defer log.Println("keepstore exiting, pid", os.Getpid())
-
- // Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(theConfig.Volumes)
-
- // Middleware/handler stack
- router := MakeRESTRouter(cluster, metricsRegistry)
-
- // Set up a TCP listener.
- listener, err := net.Listen("tcp", theConfig.Listen)
- if err != nil {
- log.Fatal(err)
- }
-
- // Initialize keepclient for pull workers
- keepClient := &keepclient.KeepClient{
- Arvados: &arvadosclient.ArvadosClient{},
- Want_replicas: 1,
- }
-
- // Initialize the pullq and workers
- pullq = NewWorkQueue()
- for i := 0; i < 1 || i < theConfig.PullWorkers; i++ {
- go RunPullWorker(pullq, keepClient)
- }
-
- // Initialize the trashq and workers
- trashq = NewWorkQueue()
- for i := 0; i < 1 || i < theConfig.TrashWorkers; i++ {
- go RunTrashWorker(trashq)
- }
-
- // Start emptyTrash goroutine
- doneEmptyingTrash := make(chan bool)
- go emptyTrash(doneEmptyingTrash, theConfig.TrashCheckInterval.Duration())
-
- // Shut down the server gracefully (by closing the listener)
- // if SIGTERM is received.
- term := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- s := <-sig
- log.Println("caught signal:", s)
- doneEmptyingTrash <- true
- listener.Close()
- }(term)
- signal.Notify(term, syscall.SIGTERM)
- signal.Notify(term, syscall.SIGINT)
-
- if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
- log.Printf("Error notifying init daemon: %v", err)
- }
- log.Println("listening at", listener.Addr())
- srv := &server{}
- srv.Handler = router
- srv.Serve(listener)
-}
-
// Periodically (once per interval) invoke EmptyTrash on all volumes.
func emptyTrash(done <-chan bool, interval time.Duration) {
ticker := time.NewTicker(interval)
diff --git a/services/keepstore/keepstore.service b/services/keepstore/keepstore.service
index 8b448e72c..728c6fded 100644
--- a/services/keepstore/keepstore.service
+++ b/services/keepstore/keepstore.service
@@ -6,7 +6,6 @@
Description=Arvados Keep Storage Daemon
Documentation=https://doc.arvados.org/
After=network.target
-AssertPathExists=/etc/arvados/keepstore/keepstore.yml
# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
StartLimitInterval=0
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 4c39dcd5c..f56d062c7 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -10,10 +10,12 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
- "flag"
+ "encoding/json"
+ "errors"
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"os"
"regexp"
@@ -44,7 +46,6 @@ var (
s3Endpoint string
s3Replication int
s3UnsafeDelete bool
- s3RaceWindow time.Duration
s3ACL = s3.Private
@@ -56,38 +57,26 @@ const (
nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
)
-type s3VolumeAdder struct {
- *Config
-}
-
-// String implements flag.Value
-func (s *s3VolumeAdder) String() string {
- return "-"
+func init() {
+ Driver["S3"] = NewS3Volume
}
-func (s *s3VolumeAdder) Set(bucketName string) error {
- if bucketName == "" {
- return fmt.Errorf("no container name given")
- }
- if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
- return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
- }
- if deprecated.flagSerializeIO {
- log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
- }
- s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
- Bucket: bucketName,
- AccessKeyFile: s3AccessKeyFile,
- SecretKeyFile: s3SecretKeyFile,
- Endpoint: s3Endpoint,
- Region: s3RegionName,
- RaceWindow: arvados.Duration(s3RaceWindow),
- S3Replication: s3Replication,
- UnsafeDelete: s3UnsafeDelete,
- ReadOnly: deprecated.flagReadonly,
- IndexPageSize: 1000,
- })
- return nil
+func NewS3Volume(volume arvados.Volume) (Volume, error) {
+ v := S3Volume{volume: volume}
+ err := json.Unmarshal(volume.DriverParameters, &v)
+ if err != nil {
+ return nil, err
+ }
+ if v.Bucket == "" || v.AccessKey == "" || v.SecretKey == "" {
+ return nil, errors.New("DriverParameters: Bucket, AccessKey, and SecretKey must be provided")
+ }
+ if v.IndexPageSize == 0 {
+ v.IndexPageSize = 1000
+ }
+ if v.RaceWindow < 0 {
+ return nil, errors.New("DriverParameters: RaceWindow must not be negative")
+ }
+ return &v, nil
}
func s3regions() (okList []string) {
@@ -97,53 +86,10 @@ func s3regions() (okList []string) {
return
}
-func init() {
- VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
-
- flag.Var(&s3VolumeAdder{theConfig},
- "s3-bucket-volume",
- "Use the given bucket as a storage volume. Can be given multiple times.")
- flag.StringVar(
- &s3RegionName,
- "s3-region",
- "",
- fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
- flag.StringVar(
- &s3Endpoint,
- "s3-endpoint",
- "",
- "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
- flag.StringVar(
- &s3AccessKeyFile,
- "s3-access-key-file",
- "",
- "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
- flag.StringVar(
- &s3SecretKeyFile,
- "s3-secret-key-file",
- "",
- "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
- flag.DurationVar(
- &s3RaceWindow,
- "s3-race-window",
- 24*time.Hour,
- "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
- flag.IntVar(
- &s3Replication,
- "s3-replication",
- 2,
- "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
- flag.BoolVar(
- &s3UnsafeDelete,
- "s3-unsafe-delete",
- false,
- "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
-}
-
// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
- AccessKeyFile string
- SecretKeyFile string
+ AccessKey string
+ SecretKey string
Endpoint string
Region string
Bucket string
@@ -153,61 +99,24 @@ type S3Volume struct {
ConnectTimeout arvados.Duration
ReadTimeout arvados.Duration
RaceWindow arvados.Duration
- ReadOnly bool
UnsafeDelete bool
- StorageClasses []string
-
- bucket *s3bucket
+ volume arvados.Volume
+ bucket *s3bucket
startOnce sync.Once
}
-// Examples implements VolumeWithExamples.
-func (*S3Volume) Examples() []Volume {
- return []Volume{
- &S3Volume{
- AccessKeyFile: "/etc/aws_s3_access_key.txt",
- SecretKeyFile: "/etc/aws_s3_secret_key.txt",
- Endpoint: "",
- Region: "us-east-1",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
- ConnectTimeout: arvados.Duration(time.Minute),
- ReadTimeout: arvados.Duration(5 * time.Minute),
- },
- &S3Volume{
- AccessKeyFile: "/etc/gce_s3_access_key.txt",
- SecretKeyFile: "/etc/gce_s3_secret_key.txt",
- Endpoint: "https://storage.googleapis.com",
- Region: "",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
- ConnectTimeout: arvados.Duration(time.Minute),
- ReadTimeout: arvados.Duration(5 * time.Minute),
- },
- }
-}
-
-// Type implements Volume.
-func (*S3Volume) Type() string {
- return "S3"
-}
-
// Start populates private fields and verifies the configuration is
// valid.
func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
- return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
+ return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
}
} else if ok {
return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
+ "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
} else {
region = aws.Region{
Name: v.Region,
@@ -216,15 +125,9 @@ func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
}
}
- var err error
- var auth aws.Auth
- auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
- if err != nil {
- return err
- }
- auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
- if err != nil {
- return err
+ auth := aws.Auth{
+ AccessKey: v.AccessKey,
+ SecretKey: v.SecretKey,
}
// Zero timeouts mean "wait forever", which is a bad
diff --git a/services/keepstore/usage.go b/services/keepstore/usage.go
deleted file mode 100644
index 8e83f6ce5..000000000
--- a/services/keepstore/usage.go
+++ /dev/null
@@ -1,162 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "flag"
- "fmt"
- "os"
- "sort"
- "strings"
-
- "github.com/ghodss/yaml"
-)
-
-func usage() {
- c := DefaultConfig()
- knownTypes := []string{}
- for _, vt := range VolumeTypes {
- c.Volumes = append(c.Volumes, vt().Examples()...)
- knownTypes = append(knownTypes, vt().Type())
- }
- exampleConfigFile, err := yaml.Marshal(c)
- if err != nil {
- panic(err)
- }
- sort.Strings(knownTypes)
- knownTypeList := strings.Join(knownTypes, ", ")
- fmt.Fprintf(os.Stderr, `
-
-keepstore provides a content-addressed data store backed by a local filesystem or networked storage.
-
-Usage: keepstore -config path/to/keepstore.yml
- keepstore [OPTIONS] -dump-config
-
-NOTE: All options (other than -config) are deprecated in favor of YAML
- configuration. Use -dump-config to translate existing
- configurations to YAML format.
-
-Options:
-`)
- flag.PrintDefaults()
- fmt.Fprintf(os.Stderr, `
-Example config file:
-
-%s
-
-Listen:
-
- Local port to listen on. Can be "address:port" or ":port", where
- "address" is a host IP address or name and "port" is a port number
- or name.
-
-LogFormat:
-
- Format of request/response and error logs: "json" or "text".
-
-PIDFile:
-
- Path to write PID file during startup. This file is kept open and
- locked with LOCK_EX until keepstore exits, so "fuser -k pidfile" is
- one way to shut down. Exit immediately if there is an error
- opening, locking, or writing the PID file.
-
-MaxBuffers:
-
- Maximum RAM to use for data buffers, given in multiples of block
- size (64 MiB). When this limit is reached, HTTP requests requiring
- buffers (like GET and PUT) will wait for buffer space to be
- released.
-
-MaxRequests:
-
- Maximum concurrent requests. When this limit is reached, new
- requests will receive 503 responses. Note: this limit does not
- include idle connections from clients using HTTP keepalive, so it
- does not strictly limit the number of concurrent connections. If
- omitted or zero, the default is 2 * MaxBuffers.
-
-BlobSigningKeyFile:
-
- Local file containing the secret blob signing key (used to
- generate and verify blob signatures). This key should be
- identical to the API server's blob_signing_key configuration
- entry.
-
-RequireSignatures:
-
- Honor read requests only if a valid signature is provided. This
- should be true, except for development use and when migrating from
- a very old version.
-
-BlobSignatureTTL:
-
- Duration for which new permission signatures (returned in PUT
- responses) will be valid. This should be equal to the API
- server's blob_signature_ttl configuration entry.
-
-SystemAuthTokenFile:
-
- Local file containing the Arvados API token used by keep-balance
- or data manager. Delete, trash, and index requests are honored
- only for this token.
-
-EnableDelete:
-
- Enable trash and delete features. If false, trash lists will be
- accepted but blocks will not be trashed or deleted.
-
-TrashLifetime:
-
- Time duration after a block is trashed during which it can be
- recovered using an /untrash request.
-
-TrashCheckInterval:
-
- How often to check for (and delete) trashed blocks whose
- TrashLifetime has expired.
-
-TrashWorkers:
-
- Maximum number of concurrent trash operations. Default is 1, i.e.,
- trash lists are processed serially.
-
-EmptyTrashWorkers:
-
- Maximum number of concurrent block deletion operations (per
- volume) when emptying trash. Default is 1.
-
-PullWorkers:
-
- Maximum number of concurrent pull operations. Default is 1, i.e.,
- pull lists are processed serially.
-
-TLSCertificateFile:
-
- Path to server certificate file in X509 format. Enables TLS mode.
-
- Example: /var/lib/acme/live/keep0.example.com/fullchain
-
-TLSKeyFile:
-
- Path to server key file in X509 format. Enables TLS mode.
-
- The key pair is read from disk during startup, and whenever SIGHUP
- is received.
-
- Example: /var/lib/acme/live/keep0.example.com/privkey
-
-Volumes:
-
- List of storage volumes. If omitted or empty, the default is to
- use all directories named "keep" that exist in the top level
- directory of a mount point at startup time.
-
- Volume types: %s
-
- (See volume configuration examples above.)
-
-`, exampleConfigFile, knownTypeList)
-}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list