[ARVADOS] created: 1.3.0-1602-g47afcd759

Git user git at public.curoverse.com
Thu Sep 12 14:25:26 UTC 2019


        at  47afcd7595b49cf8a1756cb8f00139cd6269f544 (commit)


commit 47afcd7595b49cf8a1756cb8f00139cd6269f544
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Sep 12 10:23:50 2019 -0400

    15599: Support IAM role credentials in keepstore.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/doc/install/configure-s3-object-storage.html.textile.liquid b/doc/install/configure-s3-object-storage.html.textile.liquid
index 9becc9a58..ff2f59db7 100644
--- a/doc/install/configure-s3-object-storage.html.textile.liquid
+++ b/doc/install/configure-s3-object-storage.html.textile.liquid
@@ -31,7 +31,13 @@ Volumes are configured in the @Volumes@ section of the cluster configuration fil
 
         Driver: S3
         DriverParameters:
-          # The credentials to use to access the bucket.
+          # If an IAM role name is given here, credentials to access
+          # the bucket will be retrieved at runtime from instance
+          # metadata.
+          IAMRole: s3access
+
+          # The credentials to use to access the bucket. Omit or leave
+          # blank if IAMRole is configured.
           AccessKey: aaaaa
           SecretKey: aaaaa
 
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index c1d79ff27..ceb6570a7 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -803,6 +803,7 @@ Clusters:
         DriverParameters:
 
           # for s3 driver
+          IAMRole: aaaaa
           AccessKey: aaaaa
           SecretKey: aaaaa
           Endpoint: ""
diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go
index ecc2e8ea0..6fe16d15d 100644
--- a/lib/config/generated_config.go
+++ b/lib/config/generated_config.go
@@ -809,6 +809,7 @@ Clusters:
         DriverParameters:
 
           # for s3 driver
+          IAMRole: aaaaa
           AccessKey: aaaaa
           SecretKey: aaaaa
           Endpoint: ""
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 22a38e208..b21b39ce2 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -45,8 +45,11 @@ func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.
 }
 
 func (v *S3Volume) check() error {
-	if v.Bucket == "" || v.AccessKey == "" || v.SecretKey == "" {
-		return errors.New("DriverParameters: Bucket, AccessKey, and SecretKey must be provided")
+	if v.Bucket == "" {
+		return errors.New("DriverParameters: Bucket must be provided")
+	}
+	if v.IAMRole == "" && (v.AccessKey == "" || v.SecretKey == "") {
+		return errors.New("DriverParameters: either IAMRole or literal credentials (AccessKey and SecretKey) must be provided")
 	}
 	if v.IndexPageSize == 0 {
 		v.IndexPageSize = 1000
@@ -55,7 +58,8 @@ func (v *S3Volume) check() error {
 		return errors.New("DriverParameters: RaceWindow must not be negative")
 	}
 
-	region, ok := aws.Regions[v.Region]
+	var ok bool
+	v.region, ok = aws.Regions[v.Region]
 	if v.Endpoint == "" {
 		if !ok {
 			return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
@@ -64,18 +68,13 @@ func (v *S3Volume) check() error {
 		return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
 			"specify empty endpoint or use a different region name", v.Region, v.Endpoint)
 	} else {
-		region = aws.Region{
+		v.region = aws.Region{
 			Name:                 v.Region,
 			S3Endpoint:           v.Endpoint,
 			S3LocationConstraint: v.LocationConstraint,
 		}
 	}
 
-	auth := aws.Auth{
-		AccessKey: v.AccessKey,
-		SecretKey: v.SecretKey,
-	}
-
 	// Zero timeouts mean "wait forever", which is a bad
 	// default. Default to long timeouts instead.
 	if v.ConnectTimeout == 0 {
@@ -85,16 +84,9 @@ func (v *S3Volume) check() error {
 		v.ReadTimeout = s3DefaultReadTimeout
 	}
 
-	client := s3.New(auth, region)
-	if region.EC2Endpoint.Signer == aws.V4Signature {
-		// Currently affects only eu-central-1
-		client.Signature = aws.V4Signature
-	}
-	client.ConnectTimeout = time.Duration(v.ConnectTimeout)
-	client.ReadTimeout = time.Duration(v.ReadTimeout)
 	v.bucket = &s3bucket{
-		Bucket: &s3.Bucket{
-			S3:   client,
+		bucket: &s3.Bucket{
+			S3:   v.newS3Client(),
 			Name: v.Bucket,
 		},
 	}
@@ -102,6 +94,11 @@ func (v *S3Volume) check() error {
 	lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
 	v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
 
+	err := v.bootstrapIAMCredentials()
+	if err != nil {
+		return fmt.Errorf("error getting IAM credentials: %s", err)
+	}
+
 	return nil
 }
 
@@ -136,6 +133,7 @@ func s3regions() (okList []string) {
 type S3Volume struct {
 	AccessKey          string
 	SecretKey          string
+	IAMRole            string
 	Endpoint           string
 	Region             string
 	Bucket             string
@@ -151,6 +149,7 @@ type S3Volume struct {
 	logger    logrus.FieldLogger
 	metrics   *volumeMetricsVecs
 	bucket    *s3bucket
+	region    aws.Region
 	startOnce sync.Once
 }
 
@@ -159,6 +158,106 @@ func (v *S3Volume) GetDeviceID() string {
 	return "s3://" + v.Endpoint + "/" + v.Bucket
 }
 
+func (v *S3Volume) bootstrapIAMCredentials() error {
+	if v.IAMRole == "" {
+		return nil
+	}
+	ttl, err := v.updateIAMCredentials()
+	if err != nil {
+		return err
+	}
+	go func() {
+		for {
+			ttl, err = v.updateIAMCredentials()
+			if err != nil {
+				v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
+				ttl = time.Second
+			} else if ttl < time.Second {
+				v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
+				ttl = time.Second
+			}
+			time.Sleep(ttl)
+		}
+	}()
+	return nil
+}
+
+func (v *S3Volume) newS3Client() *s3.S3 {
+	auth := aws.Auth{
+		AccessKey: v.AccessKey,
+		SecretKey: v.SecretKey,
+	}
+	client := s3.New(auth, v.region)
+	if v.region.EC2Endpoint.Signer == aws.V4Signature {
+		// Currently affects only eu-central-1
+		client.Signature = aws.V4Signature
+	}
+	client.ConnectTimeout = time.Duration(v.ConnectTimeout)
+	client.ReadTimeout = time.Duration(v.ReadTimeout)
+	return client
+}
+
+type iamCredentials struct {
+	Code            string
+	LastUpdated     time.Time
+	Type            string
+	AccessKeyID     string
+	SecretAccessKey string
+	Token           string
+	Expiration      time.Time
+}
+
+// Returns TTL of updated credentials, i.e., time to sleep until next
+// update.
+func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
+	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+	defer cancel()
+
+	var url string
+	if strings.Contains(v.IAMRole, "://") {
+		// Configuration provides complete URL (used by tests)
+		url = v.IAMRole
+	} else {
+		// Configuration provides IAM role name and we use the
+		// AWS metadata endpoint
+		url = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + v.IAMRole
+	}
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return 0, err
+	}
+	resp, err := http.DefaultClient.Do(req.WithContext(ctx))
+	if err != nil {
+		return 0, err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		return 0, fmt.Errorf("%s: HTTP status %s", url, resp.Status)
+	}
+	var cred iamCredentials
+	err = json.NewDecoder(resp.Body).Decode(&cred)
+	if err != nil {
+		return 0, err
+	}
+	v.AccessKey, v.SecretKey = cred.AccessKeyID, cred.SecretAccessKey
+	v.bucket.SetBucket(&s3.Bucket{
+		S3:   v.newS3Client(),
+		Name: v.Bucket,
+	})
+	// TTL is time from now to expiration, minus 5m.  "We make new
+	// credentials available at least five minutes before the
+	// expiration of the old credentials."  --
+	// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+	// (If that's not true, the returned ttl might be zero or
+	// negative, which the caller can handle.)
+	ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
+	v.logger.WithFields(logrus.Fields{
+		"Expiration": cred.Expiration,
+		"TTL":        ttl,
+	}).Debug("updated credentials")
+	return ttl, nil
+}
+
 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
 	ready := make(chan bool)
 	go func() {
@@ -410,13 +509,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	// Use a merge sort to find matching sets of X and recent/X.
 	dataL := s3Lister{
-		Bucket:   v.bucket.Bucket,
+		Bucket:   v.bucket.Bucket(),
 		Prefix:   prefix,
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
 	}
 	recentL := s3Lister{
-		Bucket:   v.bucket.Bucket,
+		Bucket:   v.bucket.Bucket(),
 		Prefix:   "recent/" + prefix,
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
@@ -531,15 +630,15 @@ func (v *S3Volume) checkRaceWindow(loc string) error {
 // (PutCopy returns 200 OK if the request was received, even if the
 // copy failed).
 func (v *S3Volume) safeCopy(dst, src string) error {
-	resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+	resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
 		ContentType:       "application/octet-stream",
 		MetadataDirective: "REPLACE",
-	}, v.bucket.Name+"/"+src)
+	}, v.bucket.Bucket().Name+"/"+src)
 	err = v.translateError(err)
 	if os.IsNotExist(err) {
 		return err
 	} else if err != nil {
-		return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
+		return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
 	}
 	if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
 		return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
@@ -769,7 +868,7 @@ func (v *S3Volume) EmptyTrash() {
 	}
 
 	trashL := s3Lister{
-		Bucket:   v.bucket.Bucket,
+		Bucket:   v.bucket.Bucket(),
 		Prefix:   "trash/",
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
@@ -848,14 +947,28 @@ func (lister *s3Lister) pop() (k *s3.Key) {
 	return
 }
 
-// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be swapped out atomically with SetBucket.
 type s3bucket struct {
-	*s3.Bucket
-	stats s3bucketStats
+	bucket *s3.Bucket
+	stats  s3bucketStats
+	sync.Mutex
+}
+
+func (b *s3bucket) Bucket() *s3.Bucket {
+	b.Lock()
+	defer b.Unlock()
+	return b.bucket
+}
+
+func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
+	b.Lock()
+	defer b.Unlock()
+	b.bucket = bucket
 }
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
-	rdr, err := b.Bucket.GetReader(path)
+	rdr, err := b.Bucket().GetReader(path)
 	b.stats.TickOps("get")
 	b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
 	b.stats.TickErr(err)
@@ -863,7 +976,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 }
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
-	resp, err := b.Bucket.Head(path, headers)
+	resp, err := b.Bucket().Head(path, headers)
 	b.stats.TickOps("head")
 	b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
 	b.stats.TickErr(err)
@@ -882,7 +995,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 	} else {
 		r = NewCountingReader(r, b.stats.TickOutBytes)
 	}
-	err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+	err := b.Bucket().PutReader(path, r, length, contType, perm, options)
 	b.stats.TickOps("put")
 	b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
 	b.stats.TickErr(err)
@@ -890,7 +1003,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 }
 
 func (b *s3bucket) Del(path string) error {
-	err := b.Bucket.Del(path)
+	err := b.Bucket().Del(path)
 	b.stats.TickOps("delete")
 	b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
 	b.stats.TickErr(err)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index b8c4458a5..e276c273f 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -10,6 +10,7 @@ import (
 	"crypto/md5"
 	"encoding/json"
 	"fmt"
+	"io"
 	"log"
 	"net/http"
 	"net/http/httptest"
@@ -45,6 +46,7 @@ var _ = check.Suite(&StubbedS3Suite{})
 
 type StubbedS3Suite struct {
 	s3server *httptest.Server
+	metadata *httptest.Server
 	cluster  *arvados.Cluster
 	handler  *handler
 	volumes  []*TestableS3Volume
@@ -52,6 +54,7 @@ type StubbedS3Suite struct {
 
 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
 	s.s3server = nil
+	s.metadata = nil
 	s.cluster = testCluster(c)
 	s.cluster.Volumes = map[string]arvados.Volume{
 		"zzzzz-nyw5e-000000000000000": {Driver: "S3"},
@@ -99,6 +102,40 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
 	}
 }
 
+func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+	s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+		exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+		// Literal example from
+		// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+		// but with updated timestamps
+		io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+	}))
+	defer s.metadata.Close()
+
+	v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+	c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+	c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+	c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+	c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+	s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		w.WriteHeader(http.StatusNotFound)
+	}))
+	deadv := &S3Volume{
+		IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+		Endpoint: "http://localhost:12345",
+		Region:   "test-region-1",
+		Bucket:   "test-bucket-name",
+		cluster:  s.cluster,
+		logger:   ctxlog.TestLogger(c),
+		metrics:  newVolumeMetricsVecs(prometheus.NewRegistry()),
+	}
+	err := deadv.check()
+	c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
+	c.Check(err, check.ErrorMatches, `.*404.*`)
+}
+
 func (s *StubbedS3Suite) TestStats(c *check.C) {
 	v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
 	stats := func() string {
@@ -229,7 +266,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
 			return
 		}
 		v.serverClock.now = &t
-		v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+		v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
 	}
 
 	t0 := time.Now()
@@ -425,10 +462,16 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 		endpoint = s.s3server.URL
 	}
 
+	iamRole, accessKey, secretKey := "", "xxx", "xxx"
+	if s.metadata != nil {
+		iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+	}
+
 	v := &TestableS3Volume{
 		S3Volume: &S3Volume{
-			AccessKey:          "xxx",
-			SecretKey:          "xxx",
+			AccessKey:          accessKey,
+			SecretKey:          secretKey,
+			IAMRole:            iamRole,
 			Bucket:             TestBucketName,
 			Endpoint:           endpoint,
 			Region:             "test-region-1",
@@ -445,7 +488,7 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 		serverClock: clock,
 	}
 	c.Assert(v.S3Volume.check(), check.IsNil)
-	c.Assert(v.bucket.PutBucket(s3.ACL("private")), check.IsNil)
+	c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
 	// We couldn't set RaceWindow until now because check()
 	// rejects negative values.
 	v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
@@ -454,11 +497,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-	err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+	err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
 		log.Printf("PutRaw: %s: %+v", loc, err)
 	}
-	err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+	err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
 		log.Printf("PutRaw: recent/%s: %+v", loc, err)
 	}
@@ -469,7 +512,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 	v.serverClock.now = &lastPut
-	err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+	err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
 		panic(err)
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list