[arvados] updated: 2.5.0-6-gc138f58b2

git repository hosting git at public.arvados.org
Mon Jan 23 15:32:54 UTC 2023


Summary of changes:
 lib/cloud/ec2/ec2.go            | 15 +++++++++++----
 lib/cloud/ec2/ec2_test.go       | 14 +++++++++++++-
 lib/cloud/price.go              |  4 ++--
 lib/cloud/price_test.go         | 32 ++++++++++++++++++++++++++++++++
 lib/crunchrun/crunchrun.go      | 10 ++++++++++
 lib/crunchrun/crunchrun_test.go | 11 ++++++++++-
 6 files changed, 78 insertions(+), 8 deletions(-)
 create mode 100644 lib/cloud/price_test.go

       via  c138f58b21edb574b101588f6fc61dce8a98ed3e (commit)
       via  032450eaf50691bd981a5abb3d7fc73a75c62881 (commit)
      from  a98114557438e69fc7fc088d5d4c19ac2d4c5274 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit c138f58b21edb574b101588f6fc61dce8a98ed3e
Author: Tom Clegg <tom at curii.com>
Date:   Mon Jan 23 10:31:41 2023 -0500

    19320: Log instance price changes in crunch-run.txt.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/cloud/price.go b/lib/cloud/price.go
index 234564b68..59f5afc94 100644
--- a/lib/cloud/price.go
+++ b/lib/cloud/price.go
@@ -10,9 +10,9 @@ import (
 
 // NormalizePriceHistory de-duplicates and sorts instance prices, most
 // recent first.
-//
-// The provided slice is modified in place.
 func NormalizePriceHistory(prices []InstancePrice) []InstancePrice {
+	// copy provided slice instead of modifying it in place
+	prices = append([]InstancePrice(nil), prices...)
 	// sort by timestamp, newest first
 	sort.Slice(prices, func(i, j int) bool {
 		return prices[i].StartTime.After(prices[j].StartTime)
diff --git a/lib/cloud/price_test.go b/lib/cloud/price_test.go
new file mode 100644
index 000000000..e2a4a7e13
--- /dev/null
+++ b/lib/cloud/price_test.go
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+	"testing"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) { TestingT(t) }
+
+type cloudSuite struct{}
+
+var _ = Suite(&cloudSuite{})
+
+func (s *cloudSuite) TestNormalizePriceHistory(c *C) {
+	t0, err := time.Parse(time.RFC3339, "2023-01-01T01:00:00Z")
+	c.Assert(err, IsNil)
+	h := []InstancePrice{
+		{t0.Add(1 * time.Minute), 1.0},
+		{t0.Add(4 * time.Minute), 1.2}, // drop: unchanged price
+		{t0.Add(5 * time.Minute), 1.1},
+		{t0.Add(3 * time.Minute), 1.2},
+		{t0.Add(5 * time.Minute), 1.1}, // drop: duplicate
+		{t0.Add(2 * time.Minute), 1.0}, // drop: out of order, unchanged price
+	}
+	c.Check(NormalizePriceHistory(h), DeepEquals, []InstancePrice{h[2], h[3], h[0]})
+}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 1dd232d3e..1f130a5b1 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -2251,7 +2251,17 @@ func (cr *ContainerRunner) loadPrices() {
 	}
 	cr.pricesLock.Lock()
 	defer cr.pricesLock.Unlock()
+	var lastKnown time.Time
+	if len(cr.prices) > 0 {
+		lastKnown = cr.prices[0].StartTime
+	}
 	cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+	for i := len(cr.prices) - 1; i >= 0; i-- {
+		price := cr.prices[i]
+		if price.StartTime.After(lastKnown) {
+			cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC())
+		}
+	}
 }
 
 func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 1e3b2eb5c..a5045863b 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -12,6 +12,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"log"
 	"os"
 	"os/exec"
 	"regexp"
@@ -2076,7 +2077,10 @@ func (s *TestSuite) TestCalculateCost(c *C) {
 	defer func(s string) { lockdir = s }(lockdir)
 	lockdir = c.MkDir()
 	now := time.Now()
-	cr := ContainerRunner{costStartTime: now.Add(-time.Hour)}
+	cr := s.runner
+	cr.costStartTime = now.Add(-time.Hour)
+	var logbuf bytes.Buffer
+	cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
 
 	// if there's no InstanceType env var, cost is calculated as 0
 	os.Unsetenv("InstanceType")
@@ -2106,6 +2110,7 @@ func (s *TestSuite) TestCalculateCost(c *C) {
 	// next update (via --list + SIGUSR2) tells us the spot price
 	// increased to $3/h 15 minutes ago
 	j, err = json.Marshal([]cloud.InstancePrice{
+		{StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
 		{StartTime: now.Add(-time.Hour / 4), Price: 3.0},
 	})
 	c.Assert(err, IsNil)
@@ -2113,6 +2118,10 @@ func (s *TestSuite) TestCalculateCost(c *C) {
 	cr.loadPrices()
 	cost = cr.calculateCost(now)
 	c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+	c.Logf("%s", logbuf.String())
+	c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+	c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
 }
 
 type FakeProcess struct {

commit 032450eaf50691bd981a5abb3d7fc73a75c62881
Author: Tom Clegg <tom at curii.com>
Date:   Wed Jan 18 09:58:49 2023 -0500

    19320: Fix AWS InstanceStatus and SpotPrice API usage.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index f80e9bd1a..2a5eea484 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -297,12 +297,17 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
 	}
 	if needAZs {
 		az := map[string]string{}
-		instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{}, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
+		err := instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{
+			IncludeAllInstances: aws.Bool(true),
+		}, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
 			for _, ent := range page.InstanceStatuses {
 				az[*ent.InstanceId] = *ent.AvailabilityZone
 			}
 			return true
 		})
+		if err != nil {
+			instanceSet.logger.Warnf("error getting instance statuses: %s", err)
+		}
 		for _, inst := range instances {
 			inst := inst.(*ec2Instance)
 			inst.availabilityZone = az[*inst.instance.InstanceId]
@@ -363,7 +368,8 @@ func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance)
 	dsphi := &ec2.DescribeSpotPriceHistoryInput{
 		StartTime: aws.Time(updateTime.Add(-3 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())),
 		Filters: []*ec2.Filter{
-			&ec2.Filter{Name: aws.String("InstanceType"), Values: typeFilterValues},
+			&ec2.Filter{Name: aws.String("instance-type"), Values: typeFilterValues},
+			&ec2.Filter{Name: aws.String("product-description"), Values: []*string{aws.String("Linux/UNIX")}},
 		},
 	}
 	err := instanceSet.client.DescribeSpotPriceHistoryPages(dsphi, func(page *ec2.DescribeSpotPriceHistoryOutput, lastPage bool) bool {
@@ -503,11 +509,12 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
 func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
 	inst.provider.pricesLock.Lock()
 	defer inst.provider.pricesLock.Unlock()
-	return inst.provider.prices[priceKey{
+	pk := priceKey{
 		instanceType:     *inst.instance.InstanceType,
 		spot:             aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
 		availabilityZone: inst.availabilityZone,
-	}]
+	}
+	return inst.provider.prices[pk]
 }
 
 type rateLimitError struct {
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 103642258..e7534a7b6 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -287,6 +287,16 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
 	ap, img, cluster := GetInstanceSet(c)
 	pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
 	tags := cloud.InstanceTags{"arvados-ec2-driver": "test"}
+
+	defer func() {
+		instances, err := ap.Instances(tags)
+		c.Assert(err, check.IsNil)
+		for _, inst := range instances {
+			c.Logf("cleanup: destroy instance %s", inst)
+			c.Check(inst.Destroy(), check.IsNil)
+		}
+	}()
+
 	inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, tags, "true", pk)
 	c.Assert(err, check.IsNil)
 	defer inst1.Destroy()
@@ -305,13 +315,15 @@ func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
 		instances, err = ap.Instances(tags)
 		running := 0
 		for _, inst := range instances {
-			if inst.Address() != "" {
+			if *inst.(*ec2Instance).instance.InstanceLifecycle == "spot" {
 				running++
 			}
 		}
 		if running >= 2 {
+			c.Logf("instances are running, and identifiable as spot instances")
 			break
 		}
+		c.Logf("waiting for instances to be identifiable as spot instances...")
 		time.Sleep(10 * time.Second)
 	}
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list