[arvados] created: 2.5.0-11-gde36befd0
git repository hosting
git at public.arvados.org
Wed Jan 25 22:04:24 UTC 2023
at de36befd0c2d9d92a41e1f90f6353229d13b725a (commit)
commit de36befd0c2d9d92a41e1f90f6353229d13b725a
Author: Tom Clegg <tom at curii.com>
Date: Wed Jan 25 17:03:37 2023 -0500
19320: Fix live test.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 2b4b19850..38ada13ed 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -85,13 +85,13 @@ func (e *ec2stub) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Des
InstanceLifecycle: aws.String("spot"),
InstanceType: aws.String("t2.micro"),
PrivateIpAddress: aws.String("10.1.2.3"),
- State: &ec2.InstanceState{Name: aws.String("running")},
+ State: &ec2.InstanceState{Name: aws.String("running"), Code: aws.Int64(16)},
}, {
InstanceId: aws.String("i-124"),
InstanceLifecycle: aws.String("spot"),
InstanceType: aws.String("t2.micro"),
PrivateIpAddress: aws.String("10.1.2.4"),
- State: &ec2.InstanceState{Name: aws.String("running")},
+ State: &ec2.InstanceState{Name: aws.String("running"), Code: aws.Int64(16)},
}},
}},
}, nil
@@ -191,9 +191,6 @@ func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster
return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
}
ap := ec2InstanceSet{
- ec2config: ec2InstanceSetConfig{
- SpotPriceUpdateInterval: arvados.Duration(time.Hour),
- },
instanceSetID: "test123",
logger: logrus.StandardLogger(),
client: &ec2stub{c: c, reftime: time.Now().UTC()},
@@ -299,6 +296,7 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
}
}()
+ ap.ec2config.SpotPriceUpdateInterval = arvados.Duration(time.Hour)
ap.ec2config.EBSPrice = 0.1 // $/GiB/month
inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
c.Assert(err, check.IsNil)
@@ -318,7 +316,8 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
instances, err = ap.Instances(tags)
running := 0
for _, inst := range instances {
- if *inst.(*ec2Instance).instance.InstanceLifecycle == "spot" {
+ ec2i := inst.(*ec2Instance).instance
+ if *ec2i.InstanceLifecycle == "spot" && *ec2i.State.Code&16 != 0 {
running++
}
}
@@ -326,7 +325,7 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
c.Logf("instances are running, and identifiable as spot instances")
break
}
- c.Logf("waiting for instances to be identifiable as spot instances...")
+ c.Logf("waiting for instances to reach running state so their availability zone becomes visible...")
time.Sleep(10 * time.Second)
}
commit 6a9efc57d57968d7504327c0569a3acab3925db3
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 24 18:04:07 2023 -0500
19320: Deduplicate instance types in spot price request.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index a4f6b2da7..b90eff6d5 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -337,7 +337,8 @@ func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance)
updateTime := time.Now()
staleTime := updateTime.Add(-instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
needUpdate := false
- var typeFilterValues []*string
+ allTypes := map[string]bool{}
+
for _, inst := range instances {
ec2inst := inst.(*ec2Instance).instance
if aws.StringValue(ec2inst.InstanceLifecycle) == "spot" {
@@ -349,12 +350,16 @@ func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance)
if instanceSet.pricesUpdated[pk].Before(staleTime) {
needUpdate = true
}
- typeFilterValues = append(typeFilterValues, ec2inst.InstanceType)
+ allTypes[*ec2inst.InstanceType] = true
}
}
if !needUpdate {
return
}
+ var typeFilterValues []*string
+ for instanceType := range allTypes {
+ typeFilterValues = append(typeFilterValues, aws.String(instanceType))
+ }
// Get 3x update interval worth of pricing data. (Ideally the
// AWS API would tell us "we have shown you all of the price
// changes up to time T", but it doesn't, so we'll just ask
commit 99a4dc213a58af8eb019ca270b0982286beeb5a2
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 24 17:44:27 2023 -0500
19320: Comment re future use of spot attr in priceKey.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 7c7c48b9b..a4f6b2da7 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -506,6 +506,9 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
func (inst *ec2Instance) PriceHistory(instType arvados.InstanceType) []cloud.InstancePrice {
inst.provider.pricesLock.Lock()
defer inst.provider.pricesLock.Unlock()
+ // Note updateSpotPrices currently populates
+ // inst.provider.prices only for spot instances, so if
+ // spot==false here, we will return no data.
pk := priceKey{
instanceType: *inst.instance.InstanceType,
spot: aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
commit 36a6bcdb43e4add604bd8ac6409eda6abc248c8c
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 24 17:40:34 2023 -0500
19320: Disable spot price checks if configured update interval <= 0.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 93c308bcb..7c7c48b9b 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -246,10 +246,6 @@ func (instanceSet *ec2InstanceSet) Create(
}
}
- if instanceSet.ec2config.SpotPriceUpdateInterval <= 0 {
- instanceSet.ec2config.SpotPriceUpdateInterval = arvados.Duration(24 * time.Hour)
- }
-
rsv, err := instanceSet.client.RunInstances(&rii)
err = wrapError(err, &instanceSet.throttleDelayCreate)
if err != nil {
@@ -296,7 +292,7 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
}
dii.NextToken = dio.NextToken
}
- if needAZs {
+ if needAZs && instanceSet.ec2config.SpotPriceUpdateInterval > 0 {
az := map[string]string{}
err := instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{
IncludeAllInstances: aws.Bool(true),
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 79ee0d04e..2b4b19850 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -191,7 +191,9 @@ func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster
return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
}
ap := ec2InstanceSet{
- ec2config: ec2InstanceSetConfig{},
+ ec2config: ec2InstanceSetConfig{
+ SpotPriceUpdateInterval: arvados.Duration(time.Hour),
+ },
instanceSetID: "test123",
logger: logrus.StandardLogger(),
client: &ec2stub{c: c, reftime: time.Now().UTC()},
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 078dbf494..f6cf0c91a 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1397,7 +1397,8 @@ Clusters:
# (ec2) how often to look up spot instance pricing data
# (only while running spot instances) for the purpose of
- # calculating container cost estimates.
+ # calculating container cost estimates. A value of 0
+ # disables spot price lookups entirely.
SpotPriceUpdateInterval: 24h
# (ec2) per-GiB-month cost of EBS volumes. Matches
commit 48697cdb28fca37f1e420855a1bf1af2446184c7
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 24 17:31:33 2023 -0500
19320: Account for AddedScratch in spot instance cost estimates.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/azure/azure.go b/lib/cloud/azure/azure.go
index c27800592..7b170958b 100644
--- a/lib/cloud/azure/azure.go
+++ b/lib/cloud/azure/azure.go
@@ -785,7 +785,7 @@ func (ai *azureInstance) Address() string {
}
}
-func (ai *azureInstance) PriceHistory() []cloud.InstancePrice {
+func (ai *azureInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
return nil
}
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 2a5eea484..93c308bcb 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -48,6 +48,7 @@ type ec2InstanceSetConfig struct {
SubnetID string
AdminUsername string
EBSVolumeType string
+ EBSPrice float64
IAMInstanceProfile string
SpotPriceUpdateInterval arvados.Duration
}
@@ -506,7 +507,7 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
// Spot price that is in effect when your Spot Instance is running."
// (The use of the phrase "is running", as opposed to "was launched",
// hints that pricing is dynamic.)
-func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
+func (inst *ec2Instance) PriceHistory(instType arvados.InstanceType) []cloud.InstancePrice {
inst.provider.pricesLock.Lock()
defer inst.provider.pricesLock.Unlock()
pk := priceKey{
@@ -514,7 +515,16 @@ func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
spot: aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
availabilityZone: inst.availabilityZone,
}
- return inst.provider.prices[pk]
+ var prices []cloud.InstancePrice
+ for _, price := range inst.provider.prices[pk] {
+ // ceil(added scratch space in GiB)
+ gib := (instType.AddedScratch + 1<<30 - 1) >> 30
+ monthly := inst.provider.ec2config.EBSPrice * float64(gib)
+ hourly := monthly / 30 / 24
+ price.Price += hourly
+ prices = append(prices, price)
+ }
+ return prices
}
type rateLimitError struct {
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index e7534a7b6..79ee0d04e 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -150,7 +150,7 @@ func (e *ec2stub) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.T
return nil, nil
}
-func GetInstanceSet(c *check.C) (cloud.InstanceSet, cloud.ImageID, arvados.Cluster) {
+func GetInstanceSet(c *check.C) (*ec2InstanceSet, cloud.ImageID, arvados.Cluster) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
@@ -188,7 +188,7 @@ func GetInstanceSet(c *check.C) (cloud.InstanceSet, cloud.ImageID, arvados.Clust
ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
c.Assert(err, check.IsNil)
- return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
+ return ap.(*ec2InstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
}
ap := ec2InstanceSet{
ec2config: ec2InstanceSetConfig{},
@@ -297,6 +297,7 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
}
}()
+ ap.ec2config.EBSPrice = 0.1 // $/GiB/month
inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
c.Assert(err, check.IsNil)
defer inst1.Destroy()
@@ -328,14 +329,19 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
}
for _, inst := range instances {
- hist := inst.PriceHistory()
+ hist := inst.PriceHistory(arvados.InstanceType{})
c.Logf("%s price history: %v", inst.ID(), hist)
c.Check(len(hist) > 0, check.Equals, true)
+
+ histWithScratch := inst.PriceHistory(arvados.InstanceType{AddedScratch: 640 << 30})
+ c.Logf("%s price history with 640 GiB scratch: %v", inst.ID(), histWithScratch)
+
for i, ip := range hist {
c.Check(ip.Price, check.Not(check.Equals), 0.0)
if i > 0 {
c.Check(ip.StartTime.Before(hist[i-1].StartTime), check.Equals, true)
}
+ c.Check(ip.Price < histWithScratch[i].Price, check.Equals, true)
}
}
}
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index 7f5904968..27cf26152 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -102,8 +102,11 @@ type Instance interface {
// Replace tags with the given tags
SetTags(InstanceTags) error
- // Get recent price history, if available
- PriceHistory() []InstancePrice
+ // Get recent price history, if available. The InstanceType is
+ // supplied as an argument so the driver implementation can
+ // account for AddedScratch cost without requesting the volume
+ // attachment information from the provider's API.
+ PriceHistory(arvados.InstanceType) []InstancePrice
// Shut down the node
Destroy() error
diff --git a/lib/cloud/loopback/loopback.go b/lib/cloud/loopback/loopback.go
index ed2a0050f..8afaa4525 100644
--- a/lib/cloud/loopback/loopback.go
+++ b/lib/cloud/loopback/loopback.go
@@ -130,13 +130,13 @@ type instance struct {
sshService test.SSHService
}
-func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) }
-func (i *instance) String() string { return i.instanceType.ProviderType }
-func (i *instance) ProviderType() string { return i.instanceType.ProviderType }
-func (i *instance) Address() string { return i.sshService.Address() }
-func (i *instance) PriceHistory() []cloud.InstancePrice { return nil }
-func (i *instance) RemoteUser() string { return i.adminUser }
-func (i *instance) Tags() cloud.InstanceTags { return i.tags }
+func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) }
+func (i *instance) String() string { return i.instanceType.ProviderType }
+func (i *instance) ProviderType() string { return i.instanceType.ProviderType }
+func (i *instance) Address() string { return i.sshService.Address() }
+func (i *instance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice { return nil }
+func (i *instance) RemoteUser() string { return i.adminUser }
+func (i *instance) Tags() cloud.InstanceTags { return i.tags }
func (i *instance) SetTags(tags cloud.InstanceTags) error {
i.tags = tags
return nil
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a8adaeff8..078dbf494 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1400,6 +1400,14 @@ Clusters:
# calculating container cost estimates.
SpotPriceUpdateInterval: 24h
+ # (ec2) per-GiB-month cost of EBS volumes. Matches
+ # EBSVolumeType. Used to account for AddedScratch when
+ # calculating container cost estimates. Note that
+ # https://aws.amazon.com/ebs/pricing/ defines GB to mean
+ # GiB, so an advertised price $0.10/GB indicates a real
+ # price of $0.10/GiB and can be entered here as 0.10.
+ EBSPrice: 0.10
+
# (azure) Credentials.
SubscriptionID: ""
ClientID: ""
@@ -1453,6 +1461,13 @@ Clusters:
RAM: 128MiB
IncludedScratch: 16GB
AddedScratch: 0
+ # Hourly price ($), used to select node types for containers,
+ # and to calculate estimated container costs. For spot
+ # instances on EC2, this is also used as the maximum price
+ # when launching spot instances, while the estimated container
+ # cost is computed based on the current spot price according
+ # to AWS. On Azure, and on-demand instances on EC2, the price
+ # given here is used to compute container cost estimates.
Price: 0.1
Preemptible: false
# Include this section if the node type includes GPU (CUDA) support
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 1f130a5b1..507dfc4dd 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -2298,6 +2298,12 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
spanEnd := now
for _, ip := range prices {
spanStart := ip.StartTime
+ if spanStart.After(now) {
+ // pricing information from the future -- not
+ // expected from AWS, but possible in
+ // principle, and exercised by tests.
+ continue
+ }
last := false
if spanStart.Before(cr.costStartTime) {
spanStart = cr.costStartTime
@@ -2309,5 +2315,6 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
}
spanEnd = spanStart
}
+
return cost
}
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index a5045863b..f4cd6e609 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -2119,6 +2119,9 @@ func (s *TestSuite) TestCalculateCost(c *C) {
cost = cr.calculateCost(now)
c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+ cost = cr.calculateCost(now.Add(-time.Hour / 2))
+ c.Check(cost, Equals, 0.5)
+
c.Logf("%s", logbuf.String())
c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index bb134e454..01af8e6d5 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -471,6 +471,6 @@ func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
return dst
}
-func (si stubInstance) PriceHistory() []cloud.InstancePrice {
+func (si stubInstance) PriceHistory(arvados.InstanceType) []cloud.InstancePrice {
return nil
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index 397a46292..b2ed6c2bf 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -384,7 +384,7 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
}
before := time.Now()
var stdin io.Reader
- if prices := wkr.instance.PriceHistory(); len(prices) > 0 {
+ if prices := wkr.instance.PriceHistory(wkr.instType); len(prices) > 0 {
j, _ := json.Marshal(prices)
stdin = bytes.NewReader(j)
}
commit c138f58b21edb574b101588f6fc61dce8a98ed3e
Author: Tom Clegg <tom at curii.com>
Date: Mon Jan 23 10:31:41 2023 -0500
19320: Log instance price changes in crunch-run.txt.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/price.go b/lib/cloud/price.go
index 234564b68..59f5afc94 100644
--- a/lib/cloud/price.go
+++ b/lib/cloud/price.go
@@ -10,9 +10,9 @@ import (
// NormalizePriceHistory de-duplicates and sorts instance prices, most
// recent first.
-//
-// The provided slice is modified in place.
func NormalizePriceHistory(prices []InstancePrice) []InstancePrice {
+ // copy provided slice instead of modifying it in place
+ prices = append([]InstancePrice(nil), prices...)
// sort by timestamp, newest first
sort.Slice(prices, func(i, j int) bool {
return prices[i].StartTime.After(prices[j].StartTime)
diff --git a/lib/cloud/price_test.go b/lib/cloud/price_test.go
new file mode 100644
index 000000000..e2a4a7e13
--- /dev/null
+++ b/lib/cloud/price_test.go
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+ "testing"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) { TestingT(t) }
+
+type cloudSuite struct{}
+
+var _ = Suite(&cloudSuite{})
+
+func (s *cloudSuite) TestNormalizePriceHistory(c *C) {
+ t0, err := time.Parse(time.RFC3339, "2023-01-01T01:00:00Z")
+ c.Assert(err, IsNil)
+ h := []InstancePrice{
+ {t0.Add(1 * time.Minute), 1.0},
+ {t0.Add(4 * time.Minute), 1.2}, // drop: unchanged price
+ {t0.Add(5 * time.Minute), 1.1},
+ {t0.Add(3 * time.Minute), 1.2},
+ {t0.Add(5 * time.Minute), 1.1}, // drop: duplicate
+ {t0.Add(2 * time.Minute), 1.0}, // drop: out of order, unchanged price
+ }
+ c.Check(NormalizePriceHistory(h), DeepEquals, []InstancePrice{h[2], h[3], h[0]})
+}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 1dd232d3e..1f130a5b1 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -2251,7 +2251,17 @@ func (cr *ContainerRunner) loadPrices() {
}
cr.pricesLock.Lock()
defer cr.pricesLock.Unlock()
+ var lastKnown time.Time
+ if len(cr.prices) > 0 {
+ lastKnown = cr.prices[0].StartTime
+ }
cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+ for i := len(cr.prices) - 1; i >= 0; i-- {
+ price := cr.prices[i]
+ if price.StartTime.After(lastKnown) {
+ cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC())
+ }
+ }
}
func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 1e3b2eb5c..a5045863b 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -12,6 +12,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "log"
"os"
"os/exec"
"regexp"
@@ -2076,7 +2077,10 @@ func (s *TestSuite) TestCalculateCost(c *C) {
defer func(s string) { lockdir = s }(lockdir)
lockdir = c.MkDir()
now := time.Now()
- cr := ContainerRunner{costStartTime: now.Add(-time.Hour)}
+ cr := s.runner
+ cr.costStartTime = now.Add(-time.Hour)
+ var logbuf bytes.Buffer
+ cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
// if there's no InstanceType env var, cost is calculated as 0
os.Unsetenv("InstanceType")
@@ -2106,6 +2110,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
// next update (via --list + SIGUSR2) tells us the spot price
// increased to $3/h 15 minutes ago
j, err = json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
{StartTime: now.Add(-time.Hour / 4), Price: 3.0},
})
c.Assert(err, IsNil)
@@ -2113,6 +2118,10 @@ func (s *TestSuite) TestCalculateCost(c *C) {
cr.loadPrices()
cost = cr.calculateCost(now)
c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+ c.Logf("%s", logbuf.String())
+ c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+ c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
}
type FakeProcess struct {
commit 032450eaf50691bd981a5abb3d7fc73a75c62881
Author: Tom Clegg <tom at curii.com>
Date: Wed Jan 18 09:58:49 2023 -0500
19320: Fix AWS InstanceStatus and SpotPrice API usage.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index f80e9bd1a..2a5eea484 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -297,12 +297,17 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
}
if needAZs {
az := map[string]string{}
- instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{}, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
+ err := instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{
+ IncludeAllInstances: aws.Bool(true),
+ }, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
for _, ent := range page.InstanceStatuses {
az[*ent.InstanceId] = *ent.AvailabilityZone
}
return true
})
+ if err != nil {
+ instanceSet.logger.Warnf("error getting instance statuses: %s", err)
+ }
for _, inst := range instances {
inst := inst.(*ec2Instance)
inst.availabilityZone = az[*inst.instance.InstanceId]
@@ -363,7 +368,8 @@ func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance)
dsphi := &ec2.DescribeSpotPriceHistoryInput{
StartTime: aws.Time(updateTime.Add(-3 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())),
Filters: []*ec2.Filter{
- &ec2.Filter{Name: aws.String("InstanceType"), Values: typeFilterValues},
+ &ec2.Filter{Name: aws.String("instance-type"), Values: typeFilterValues},
+ &ec2.Filter{Name: aws.String("product-description"), Values: []*string{aws.String("Linux/UNIX")}},
},
}
err := instanceSet.client.DescribeSpotPriceHistoryPages(dsphi, func(page *ec2.DescribeSpotPriceHistoryOutput, lastPage bool) bool {
@@ -503,11 +509,12 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
inst.provider.pricesLock.Lock()
defer inst.provider.pricesLock.Unlock()
- return inst.provider.prices[priceKey{
+ pk := priceKey{
instanceType: *inst.instance.InstanceType,
spot: aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
availabilityZone: inst.availabilityZone,
- }]
+ }
+ return inst.provider.prices[pk]
}
type rateLimitError struct {
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 103642258..e7534a7b6 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -287,6 +287,16 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
+
+ defer func() {
+ instances, err := ap.Instances(tags)
+ c.Assert(err, check.IsNil)
+ for _, inst := range instances {
+ c.Logf("cleanup: destroy instance %s", inst)
+ c.Check(inst.Destroy(), check.IsNil)
+ }
+ }()
+
inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
c.Assert(err, check.IsNil)
defer inst1.Destroy()
@@ -305,13 +315,15 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
instances, err = ap.Instances(tags)
running := 0
for _, inst := range instances {
- if inst.Address() != "" {
+ if *inst.(*ec2Instance).instance.InstanceLifecycle == "spot" {
running++
}
}
if running >= 2 {
+ c.Logf("instances are running, and identifiable as spot instances")
break
}
+ c.Logf("waiting for instances to be identifiable as spot instances...")
time.Sleep(10 * time.Second)
}
commit a98114557438e69fc7fc088d5d4c19ac2d4c5274
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 17 17:51:23 2023 -0500
19320: Fix env var leak in test.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 91a46e10e..1e3b2eb5c 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -2087,6 +2087,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
// hasn't found any data), cost is calculated based on
// InstanceType env var
os.Setenv("InstanceType", `{"Price":1.2}`)
+ defer os.Unsetenv("InstanceType")
cost = cr.calculateCost(now)
c.Check(cost, Equals, 1.2)
commit 458436270ce8fb80d421d55e192236c5ac4a225e
Author: Tom Clegg <tom at curii.com>
Date: Tue Jan 17 11:16:56 2023 -0500
19320: Use AWS spot price data to calculate container cost.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/go.mod b/go.mod
index aced60dbc..eb867f8c5 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.9
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/arvados/cgofuse v1.2.0-arvados1
- github.com/aws/aws-sdk-go v1.25.30
+ github.com/aws/aws-sdk-go v1.44.174
github.com/aws/aws-sdk-go-v2 v0.23.0
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
github.com/coreos/go-oidc v2.1.0+incompatible
@@ -36,9 +36,9 @@ require (
github.com/prometheus/common v0.10.0
github.com/sirupsen/logrus v1.8.1
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
- golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
+ golang.org/x/net v0.5.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
- golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e
+ golang.org/x/sys v0.4.0
google.golang.org/api v0.20.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/square/go-jose.v2 v2.5.1
@@ -74,7 +74,7 @@ require (
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
- github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
+ github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.1.0 // indirect
@@ -94,8 +94,8 @@ require (
github.com/src-d/gcfg v1.3.0 // indirect
github.com/xanzy/ssh-agent v0.1.0 // indirect
go.opencensus.io v0.22.3 // indirect
- golang.org/x/text v0.3.6 // indirect
- golang.org/x/tools v0.1.7 // indirect
+ golang.org/x/text v0.6.0 // indirect
+ golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a // indirect
google.golang.org/grpc v1.33.2 // indirect
diff --git a/go.sum b/go.sum
index 422a891e0..1936e7978 100644
--- a/go.sum
+++ b/go.sum
@@ -107,6 +107,8 @@ github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo
github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go v1.44.174 h1:9lR4a6MKQW/t6YCG0ZKAt1GAkjdEPP8sWch/pfcuR0c=
+github.com/aws/aws-sdk-go v1.44.174/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -453,6 +455,9 @@ github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
@@ -701,6 +706,7 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs=
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
@@ -765,6 +771,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -803,6 +810,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
+golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
+golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -819,6 +830,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -884,8 +896,17 @@ golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
+golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.1.0 h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw=
+golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -894,6 +915,11 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
+golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
+golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -940,6 +966,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
+golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/lib/cloud/azure/azure.go b/lib/cloud/azure/azure.go
index 1ff0798ea..c27800592 100644
--- a/lib/cloud/azure/azure.go
+++ b/lib/cloud/azure/azure.go
@@ -785,6 +785,10 @@ func (ai *azureInstance) Address() string {
}
}
+func (ai *azureInstance) PriceHistory() []cloud.InstancePrice {
+ return nil
+}
+
func (ai *azureInstance) RemoteUser() string {
return ai.provider.azconfig.AdminUsername
}
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 52b73f781..f80e9bd1a 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"math/big"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -40,14 +41,15 @@ const (
)
type ec2InstanceSetConfig struct {
- AccessKeyID string
- SecretAccessKey string
- Region string
- SecurityGroupIDs arvados.StringSet
- SubnetID string
- AdminUsername string
- EBSVolumeType string
- IAMInstanceProfile string
+ AccessKeyID string
+ SecretAccessKey string
+ Region string
+ SecurityGroupIDs arvados.StringSet
+ SubnetID string
+ AdminUsername string
+ EBSVolumeType string
+ IAMInstanceProfile string
+ SpotPriceUpdateInterval arvados.Duration
}
type ec2Interface interface {
@@ -55,6 +57,8 @@ type ec2Interface interface {
ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error)
RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error)
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
+ DescribeInstanceStatusPages(input *ec2.DescribeInstanceStatusInput, fn func(*ec2.DescribeInstanceStatusOutput, bool) bool) error
+ DescribeSpotPriceHistoryPages(input *ec2.DescribeSpotPriceHistoryInput, fn func(*ec2.DescribeSpotPriceHistoryOutput, bool) bool) error
CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error)
}
@@ -68,6 +72,10 @@ type ec2InstanceSet struct {
keys map[string]string
throttleDelayCreate atomic.Value
throttleDelayInstances atomic.Value
+
+ prices map[priceKey][]cloud.InstancePrice
+ pricesLock sync.Mutex
+ pricesUpdated map[priceKey]time.Time
}
func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
@@ -237,12 +245,15 @@ func (instanceSet *ec2InstanceSet) Create(
}
}
+ if instanceSet.ec2config.SpotPriceUpdateInterval <= 0 {
+ instanceSet.ec2config.SpotPriceUpdateInterval = arvados.Duration(24 * time.Hour)
+ }
+
rsv, err := instanceSet.client.RunInstances(&rii)
err = wrapError(err, &instanceSet.throttleDelayCreate)
if err != nil {
return nil, err
}
-
return &ec2Instance{
provider: instanceSet,
instance: rsv.Instances[0],
@@ -257,6 +268,7 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
Values: []*string{aws.String(v)},
})
}
+ needAZs := false
dii := &ec2.DescribeInstancesInput{Filters: filters}
for {
dio, err := instanceSet.client.DescribeInstances(dii)
@@ -268,23 +280,139 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
for _, rsv := range dio.Reservations {
for _, inst := range rsv.Instances {
if *inst.State.Name != "shutting-down" && *inst.State.Name != "terminated" {
- instances = append(instances, &ec2Instance{instanceSet, inst})
+ instances = append(instances, &ec2Instance{
+ provider: instanceSet,
+ instance: inst,
+ })
+ if aws.StringValue(inst.InstanceLifecycle) == "spot" {
+ needAZs = true
+ }
}
}
}
if dio.NextToken == nil {
- return instances, err
+ break
}
dii.NextToken = dio.NextToken
}
+ if needAZs {
+ az := map[string]string{}
+ instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{}, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
+ for _, ent := range page.InstanceStatuses {
+ az[*ent.InstanceId] = *ent.AvailabilityZone
+ }
+ return true
+ })
+ for _, inst := range instances {
+ inst := inst.(*ec2Instance)
+ inst.availabilityZone = az[*inst.instance.InstanceId]
+ }
+ instanceSet.updateSpotPrices(instances)
+ }
+ return instances, err
+}
+
+type priceKey struct {
+ instanceType string
+ spot bool
+ availabilityZone string
+}
+
+// Refresh recent spot instance pricing data for the given instances,
+// unless we already have recent pricing data for all relevant types.
+func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance) {
+ if len(instances) == 0 {
+ return
+ }
+
+ instanceSet.pricesLock.Lock()
+ defer instanceSet.pricesLock.Unlock()
+ if instanceSet.prices == nil {
+ instanceSet.prices = map[priceKey][]cloud.InstancePrice{}
+ instanceSet.pricesUpdated = map[priceKey]time.Time{}
+ }
+
+ updateTime := time.Now()
+ staleTime := updateTime.Add(-instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+ needUpdate := false
+ var typeFilterValues []*string
+ for _, inst := range instances {
+ ec2inst := inst.(*ec2Instance).instance
+ if aws.StringValue(ec2inst.InstanceLifecycle) == "spot" {
+ pk := priceKey{
+ instanceType: *ec2inst.InstanceType,
+ spot: true,
+ availabilityZone: inst.(*ec2Instance).availabilityZone,
+ }
+ if instanceSet.pricesUpdated[pk].Before(staleTime) {
+ needUpdate = true
+ }
+ typeFilterValues = append(typeFilterValues, ec2inst.InstanceType)
+ }
+ }
+ if !needUpdate {
+ return
+ }
+ // Get 3x update interval worth of pricing data. (Ideally the
+ // AWS API would tell us "we have shown you all of the price
+ // changes up to time T", but it doesn't, so we'll just ask
+ // for 3 intervals worth of data on each update, de-duplicate
+ // the data points, and not worry too much about occasionally
+ // missing some data points when our lookups fail twice in a
+ // row.
+ dsphi := &ec2.DescribeSpotPriceHistoryInput{
+ StartTime: aws.Time(updateTime.Add(-3 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())),
+ Filters: []*ec2.Filter{
+ &ec2.Filter{Name: aws.String("InstanceType"), Values: typeFilterValues},
+ },
+ }
+ err := instanceSet.client.DescribeSpotPriceHistoryPages(dsphi, func(page *ec2.DescribeSpotPriceHistoryOutput, lastPage bool) bool {
+ for _, ent := range page.SpotPriceHistory {
+ if ent.InstanceType == nil || ent.SpotPrice == nil || ent.Timestamp == nil {
+ // bogus record?
+ continue
+ }
+ price, err := strconv.ParseFloat(*ent.SpotPrice, 64)
+ if err != nil {
+ // bogus record?
+ continue
+ }
+ pk := priceKey{
+ instanceType: *ent.InstanceType,
+ spot: true,
+ availabilityZone: *ent.AvailabilityZone,
+ }
+ instanceSet.prices[pk] = append(instanceSet.prices[pk], cloud.InstancePrice{
+ StartTime: *ent.Timestamp,
+ Price: price,
+ })
+ instanceSet.pricesUpdated[pk] = updateTime
+ }
+ return true
+ })
+ if err != nil {
+ instanceSet.logger.Warnf("error retrieving spot instance prices: %s", err)
+ }
+
+ expiredTime := updateTime.Add(-64 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+ for pk, last := range instanceSet.pricesUpdated {
+ if last.Before(expiredTime) {
+ delete(instanceSet.pricesUpdated, pk)
+ delete(instanceSet.prices, pk)
+ }
+ }
+ for pk, prices := range instanceSet.prices {
+ instanceSet.prices[pk] = cloud.NormalizePriceHistory(prices)
+ }
}
func (instanceSet *ec2InstanceSet) Stop() {
}
type ec2Instance struct {
- provider *ec2InstanceSet
- instance *ec2.Instance
+ provider *ec2InstanceSet
+ instance *ec2.Instance
+ availabilityZone string // sometimes available for spot instances
}
func (inst *ec2Instance) ID() cloud.InstanceID {
@@ -348,6 +476,40 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
return cloud.ErrNotImplemented
}
+// PriceHistory returns the price history for this specific instance.
+//
+// AWS documentation is elusive about whether the hourly cost of a
+// given spot instance changes as the current spot price changes for
+// the corresponding instance type and availability zone. Our
+// implementation assumes the answer is yes, based on the following
+// hints.
+//
+// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-requests.html
+// says: "After your Spot Instance is running, if the Spot price rises
+// above your maximum price, Amazon EC2 interrupts your Spot
+// Instance." (This doesn't address what happens when the spot price
+// rises *without* exceeding your maximum price.)
+//
+// https://docs.aws.amazon.com/whitepapers/latest/cost-optimization-leveraging-ec2-spot-instances/how-spot-instances-work.html
+// says: "You pay the Spot price that's in effect, billed to the
+// nearest second." (But it's not explicitly stated whether "the price
+// in effect" changes over time for a given instance.)
+//
+// The same page also says, in a discussion about the effect of
+// specifying a maximum price: "Note that you never pay more than the
+// Spot price that is in effect when your Spot Instance is running."
+// (The use of the phrase "is running", as opposed to "was launched",
+// hints that pricing is dynamic.)
+func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
+ inst.provider.pricesLock.Lock()
+ defer inst.provider.pricesLock.Unlock()
+ return inst.provider.prices[priceKey{
+ instanceType: *inst.instance.InstanceType,
+ spot: aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
+ availabilityZone: inst.availabilityZone,
+ }]
+}
+
type rateLimitError struct {
error
earliestRetry time.Time
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 3cd238ded..103642258 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -9,7 +9,7 @@
//
// Tests should be run individually and in the order they are listed in the file:
//
-// Example azconfig.yml:
+// Example ec2config.yml:
//
// ImageIDForTestSuite: ami-xxxxxxxxxxxxxxxxx
// DriverParameters:
@@ -27,6 +27,7 @@ import (
"flag"
"sync/atomic"
"testing"
+ "time"
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
@@ -56,6 +57,8 @@ type testConfig struct {
}
type ec2stub struct {
+ c *check.C
+ reftime time.Time
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
@@ -68,13 +71,75 @@ func (e *ec2stub) DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.Descr
func (e *ec2stub) RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error) {
return &ec2.Reservation{Instances: []*ec2.Instance{{
- InstanceId: aws.String("i-123"),
- Tags: input.TagSpecifications[0].Tags,
+ InstanceId: aws.String("i-123"),
+ InstanceType: aws.String("t2.micro"),
+ Tags: input.TagSpecifications[0].Tags,
}}}, nil
}
func (e *ec2stub) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
- return &ec2.DescribeInstancesOutput{}, nil
+ return &ec2.DescribeInstancesOutput{
+ Reservations: []*ec2.Reservation{{
+ Instances: []*ec2.Instance{{
+ InstanceId: aws.String("i-123"),
+ InstanceLifecycle: aws.String("spot"),
+ InstanceType: aws.String("t2.micro"),
+ PrivateIpAddress: aws.String("10.1.2.3"),
+ State: &ec2.InstanceState{Name: aws.String("running")},
+ }, {
+ InstanceId: aws.String("i-124"),
+ InstanceLifecycle: aws.String("spot"),
+ InstanceType: aws.String("t2.micro"),
+ PrivateIpAddress: aws.String("10.1.2.4"),
+ State: &ec2.InstanceState{Name: aws.String("running")},
+ }},
+ }},
+ }, nil
+}
+
+func (e *ec2stub) DescribeInstanceStatusPages(input *ec2.DescribeInstanceStatusInput, fn func(*ec2.DescribeInstanceStatusOutput, bool) bool) error {
+ fn(&ec2.DescribeInstanceStatusOutput{
+ InstanceStatuses: []*ec2.InstanceStatus{{
+ InstanceId: aws.String("i-123"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ }, {
+ InstanceId: aws.String("i-124"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ }},
+ }, true)
+ return nil
+}
+
+func (e *ec2stub) DescribeSpotPriceHistoryPages(input *ec2.DescribeSpotPriceHistoryInput, fn func(*ec2.DescribeSpotPriceHistoryOutput, bool) bool) error {
+ if !fn(&ec2.DescribeSpotPriceHistoryOutput{
+ SpotPriceHistory: []*ec2.SpotPrice{
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.005"),
+ Timestamp: aws.Time(e.reftime.Add(-9 * time.Minute)),
+ },
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.015"),
+ Timestamp: aws.Time(e.reftime.Add(-5 * time.Minute)),
+ },
+ },
+ }, false) {
+ return nil
+ }
+ fn(&ec2.DescribeSpotPriceHistoryOutput{
+ SpotPriceHistory: []*ec2.SpotPrice{
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.01"),
+ Timestamp: aws.Time(e.reftime.Add(-2 * time.Minute)),
+ },
+ },
+ }, true)
+ return nil
}
func (e *ec2stub) CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
@@ -85,7 +150,7 @@ func (e *ec2stub) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.T
return nil, nil
}
-func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) {
+func GetInstanceSet(c *check.C) (cloud.InstanceSet, cloud.ImageID, arvados.Cluster) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
@@ -98,7 +163,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
Preemptible: false,
},
"tiny-with-extra-scratch": {
- Name: "tiny",
+ Name: "tiny-with-extra-scratch",
ProviderType: "t2.micro",
VCPUs: 1,
RAM: 4000000000,
@@ -107,7 +172,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
AddedScratch: 20000000000,
},
"tiny-preemptible": {
- Name: "tiny",
+ Name: "tiny-preemptible",
ProviderType: "t2.micro",
VCPUs: 1,
RAM: 4000000000,
@@ -119,37 +184,30 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
if *live != "" {
var exampleCfg testConfig
err := config.LoadFile(&exampleCfg, *live)
- if err != nil {
- return nil, cloud.ImageID(""), cluster, err
- }
+ c.Assert(err, check.IsNil)
ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
- return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
+ c.Assert(err, check.IsNil)
+ return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
}
ap := ec2InstanceSet{
ec2config: ec2InstanceSetConfig{},
instanceSetID: "test123",
logger: logrus.StandardLogger(),
- client: &ec2stub{},
+ client: &ec2stub{c: c, reftime: time.Now().UTC()},
keys: make(map[string]string),
}
- return &ap, cloud.ImageID("blob"), cluster, nil
+ return &ap, cloud.ImageID("blob"), cluster
}
func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
"TestTagName": "test tag value",
}, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
-
c.Assert(err, check.IsNil)
tags := inst.Tags()
@@ -159,13 +217,8 @@ func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
@@ -181,13 +234,8 @@ func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"],
img, map[string]string{
@@ -203,11 +251,7 @@ func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
}
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
@@ -219,13 +263,8 @@ func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
}
func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider: ", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
-
c.Assert(err, check.IsNil)
for _, i := range l {
@@ -235,11 +274,7 @@ func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
}
func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
@@ -248,6 +283,51 @@ func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
}
}
+func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
+ ap, img, cluster := GetInstanceSet(c)
+ pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
+ tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
+ inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
+ c.Assert(err, check.IsNil)
+ defer inst1.Destroy()
+ inst2, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
+ c.Assert(err, check.IsNil)
+ defer inst2.Destroy()
+
+ // in live mode, we need to wait for the instances to reach
+ // running state before we can discover their availability
+ // zones and look up the appropriate prices.
+ var instances []cloud.Instance
+ for deadline := time.Now().Add(5 * time.Minute); ; {
+ if deadline.Before(time.Now()) {
+ c.Fatal("timed out")
+ }
+ instances, err = ap.Instances(tags)
+ running := 0
+ for _, inst := range instances {
+ if inst.Address() != "" {
+ running++
+ }
+ }
+ if running >= 2 {
+ break
+ }
+ time.Sleep(10 * time.Second)
+ }
+
+ for _, inst := range instances {
+ hist := inst.PriceHistory()
+ c.Logf("%s price history: %v", inst.ID(), hist)
+ c.Check(len(hist) > 0, check.Equals, true)
+ for i, ip := range hist {
+ c.Check(ip.Price, check.Not(check.Equals), 0.0)
+ if i > 0 {
+ c.Check(ip.StartTime.Before(hist[i-1].StartTime), check.Equals, true)
+ }
+ }
+ }
+}
+
func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
retryError := awserr.New("Throttling", "", nil)
wrapped := wrapError(retryError, &atomic.Value{})
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index 2d53a49c5..7f5904968 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -102,6 +102,9 @@ type Instance interface {
// Replace tags with the given tags
SetTags(InstanceTags) error
+ // Get recent price history, if available
+ PriceHistory() []InstancePrice
+
// Shut down the node
Destroy() error
}
@@ -141,6 +144,11 @@ type InstanceSet interface {
Stop()
}
+type InstancePrice struct {
+ StartTime time.Time
+ Price float64
+}
+
type InitCommand string
// A Driver returns an InstanceSet that uses the given InstanceSetID
diff --git a/lib/cloud/loopback/loopback.go b/lib/cloud/loopback/loopback.go
index fb7a35bea..ed2a0050f 100644
--- a/lib/cloud/loopback/loopback.go
+++ b/lib/cloud/loopback/loopback.go
@@ -130,12 +130,13 @@ type instance struct {
sshService test.SSHService
}
-func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) }
-func (i *instance) String() string { return i.instanceType.ProviderType }
-func (i *instance) ProviderType() string { return i.instanceType.ProviderType }
-func (i *instance) Address() string { return i.sshService.Address() }
-func (i *instance) RemoteUser() string { return i.adminUser }
-func (i *instance) Tags() cloud.InstanceTags { return i.tags }
+func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) }
+func (i *instance) String() string { return i.instanceType.ProviderType }
+func (i *instance) ProviderType() string { return i.instanceType.ProviderType }
+func (i *instance) Address() string { return i.sshService.Address() }
+func (i *instance) PriceHistory() []cloud.InstancePrice { return nil }
+func (i *instance) RemoteUser() string { return i.adminUser }
+func (i *instance) Tags() cloud.InstanceTags { return i.tags }
func (i *instance) SetTags(tags cloud.InstanceTags) error {
i.tags = tags
return nil
diff --git a/lib/cloud/price.go b/lib/cloud/price.go
new file mode 100644
index 000000000..234564b68
--- /dev/null
+++ b/lib/cloud/price.go
@@ -0,0 +1,28 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+ "sort"
+)
+
+// NormalizePriceHistory de-duplicates and sorts instance prices, most
+// recent first.
+//
+// The provided slice is modified in place.
+func NormalizePriceHistory(prices []InstancePrice) []InstancePrice {
+ // sort by timestamp, newest first
+ sort.Slice(prices, func(i, j int) bool {
+ return prices[i].StartTime.After(prices[j].StartTime)
+ })
+ // remove duplicate data points, keeping the oldest
+ for i := 0; i < len(prices)-1; i++ {
+ if prices[i].StartTime == prices[i+1].StartTime || prices[i].Price == prices[i+1].Price {
+ prices = append(prices[:i], prices[i+1:]...)
+ i--
+ }
+ }
+ return prices
+}
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 2d9119adf..a8adaeff8 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1395,6 +1395,11 @@ Clusters:
# the cloud dispatcher. Leave blank when not needed.
IAMInstanceProfile: ""
+ # (ec2) how often to look up spot instance pricing data
+ # (only while running spot instances) for the purpose of
+ # calculating container cost estimates.
+ SpotPriceUpdateInterval: 24h
+
# (azure) Credentials.
SubscriptionID: ""
ClientID: ""
diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go
index 8a919bc5e..adb65324b 100644
--- a/lib/crunchrun/background.go
+++ b/lib/crunchrun/background.go
@@ -21,6 +21,7 @@ var (
lockprefix = "crunch-run-"
locksuffix = ".lock"
brokenfile = "crunch-run-broken"
+ pricesfile = "crunch-run-prices.json"
)
// procinfo is saved in each process's lockfile.
@@ -183,7 +184,20 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
}
// ListProcesses lists UUIDs of active crunch-run processes.
-func ListProcesses(stdout, stderr io.Writer) int {
+func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int {
+ if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 {
+ // write latest pricing data to disk where
+ // current/future crunch-run processes can load it
+ fnm := filepath.Join(lockdir, pricesfile)
+ fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid())
+ err := os.WriteFile(fnmtmp, buf, 0777)
+ if err != nil {
+ fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err)
+ } else if err = os.Rename(fnmtmp, fnm); err != nil {
+ fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err)
+ os.Remove(fnmtmp)
+ }
+ }
// filepath.Walk does not follow symlinks, so we must walk
// lockdir+"/." in case lockdir itself is a symlink.
walkdir := lockdir + "/."
@@ -245,7 +259,7 @@ func ListProcesses(stdout, stderr io.Writer) int {
fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
return nil
}
- err = proc.Signal(syscall.Signal(0))
+ err = proc.Signal(syscall.SIGUSR2)
if err != nil {
// Process is dead, even though lockfile was
// still locked. Most likely a stuck arv-mount
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 52e1bf63a..1dd232d3e 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -31,6 +31,7 @@ import (
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
@@ -177,6 +178,9 @@ type ContainerRunner struct {
containerWatchdogInterval time.Duration
gateway Gateway
+
+ prices []cloud.InstancePrice
+ pricesLock sync.Mutex
}
// setupSignals sets up signal handling to gracefully terminate the
@@ -1469,10 +1473,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
if runner.finalState == "Complete" && runner.OutputPDH != nil {
update["output"] = *runner.OutputPDH
}
- var it arvados.InstanceType
- if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
- update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
- }
+ update["cost"] = runner.calculateCost(time.Now())
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
@@ -1514,6 +1515,16 @@ func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
+ sigusr2 := make(chan os.Signal, 1)
+ signal.Notify(sigusr2, syscall.SIGUSR2)
+ defer signal.Stop(sigusr2)
+ runner.loadPrices()
+ go func() {
+ for range sigusr2 {
+ runner.loadPrices()
+ }
+ }()
+
runner.finalState = "Queued"
defer func() {
@@ -1756,7 +1767,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
- list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)")
enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
@@ -1792,11 +1803,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, stdin, stdout, stderr)
case *kill >= 0:
- return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr)
case *list:
- return ListProcesses(os.Stdout, os.Stderr)
+ return ListProcesses(stdin, stdout, stderr)
}
if len(containerUUID) != 27 {
@@ -2223,3 +2234,70 @@ func localKeepstoreAddr() string {
})
return ips[0].String()
}
+
+func (cr *ContainerRunner) loadPrices() {
+ buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+ if err != nil {
+ if !os.IsNotExist(err) {
+ cr.CrunchLog.Printf("loadPrices: read: %s", err)
+ }
+ return
+ }
+ var prices []cloud.InstancePrice
+ err = json.Unmarshal(buf, &prices)
+ if err != nil {
+ cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+ return
+ }
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+ cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+
+ // First, make a "prices" slice with the real data as far back
+ // as it goes, and (if needed) a "since the beginning of time"
+ // placeholder containing a reasonable guess about what the
+ // price was between cr.costStartTime and the earliest real
+ // data point.
+ prices := cr.prices
+ if len(prices) == 0 {
+ // use price info in InstanceType record initially
+ // provided by cloud dispatcher
+ var p float64
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ p = it.Price
+ }
+ prices = []cloud.InstancePrice{{Price: p}}
+ } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+ // guess earlier pricing was the same as the earliest
+ // price we know about
+ filler := prices[len(prices)-1]
+ filler.StartTime = time.Time{}
+ prices = append(prices, filler)
+ }
+
+ // Now that our history of price changes goes back at least as
+ // far as cr.costStartTime, add up the costs for each
+ // interval.
+ cost := 0.0
+ spanEnd := now
+ for _, ip := range prices {
+ spanStart := ip.StartTime
+ last := false
+ if spanStart.Before(cr.costStartTime) {
+ spanStart = cr.costStartTime
+ last = true
+ }
+ cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+ if last {
+ break
+ }
+ spanEnd = spanStart
+ }
+ return cost
+}
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 8da933a73..91a46e10e 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -2071,6 +2072,48 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
}
+func (s *TestSuite) TestCalculateCost(c *C) {
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+ now := time.Now()
+ cr := ContainerRunner{costStartTime: now.Add(-time.Hour)}
+
+ // if there's no InstanceType env var, cost is calculated as 0
+ os.Unsetenv("InstanceType")
+ cost := cr.calculateCost(now)
+ c.Check(cost, Equals, 0.0)
+
+ // with InstanceType env var and loadPrices() hasn't run (or
+ // hasn't found any data), cost is calculated based on
+ // InstanceType env var
+ os.Setenv("InstanceType", `{"Price":1.2}`)
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.2)
+
+ // first update tells us the spot price was $1/h until 30
+ // minutes ago when it increased to $2/h
+ j, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+ {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.5)
+
+ // next update (via --list + SIGUSR2) tells us the spot price
+ // increased to $3/h 15 minutes ago
+ j, err = json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+}
+
type FakeProcess struct {
cmdLine []string
}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index f57db0f09..bb134e454 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -470,3 +470,7 @@ func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
}
return dst
}
+
+func (si stubInstance) PriceHistory() []cloud.InstancePrice {
+ return nil
+}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index b01a820cd..397a46292 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -6,7 +6,9 @@ package worker
import (
"bytes"
+ "encoding/json"
"fmt"
+ "io"
"path/filepath"
"strings"
"sync"
@@ -381,7 +383,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd = "sudo " + cmd
}
before := time.Now()
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ var stdin io.Reader
+ if prices := wkr.instance.PriceHistory(); len(prices) > 0 {
+ j, _ := json.Marshal(prices)
+ stdin = bytes.NewReader(j)
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list