[arvados] created: 2.5.0-3-g265470fc9
git repository hosting
git at public.arvados.org
Fri Jan 13 05:25:37 UTC 2023
at 265470fc92264107f3d1949111cc1230f16690de (commit)
commit 265470fc92264107f3d1949111cc1230f16690de
Author: Tom Clegg <tom at curii.com>
Date: Fri Jan 13 00:25:26 2023 -0500
19320: Use AWS spot price data to calculate container cost.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/go.mod b/go.mod
index aced60dbc..eb867f8c5 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.9
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/arvados/cgofuse v1.2.0-arvados1
- github.com/aws/aws-sdk-go v1.25.30
+ github.com/aws/aws-sdk-go v1.44.174
github.com/aws/aws-sdk-go-v2 v0.23.0
github.com/bradleypeabody/godap v0.0.0-20170216002349-c249933bc092
github.com/coreos/go-oidc v2.1.0+incompatible
@@ -36,9 +36,9 @@ require (
github.com/prometheus/common v0.10.0
github.com/sirupsen/logrus v1.8.1
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871
- golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2
+ golang.org/x/net v0.5.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
- golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e
+ golang.org/x/sys v0.4.0
google.golang.org/api v0.20.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
gopkg.in/square/go-jose.v2 v2.5.1
@@ -74,7 +74,7 @@ require (
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
- github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
+ github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
github.com/kr/pretty v0.2.1 // indirect
github.com/kr/text v0.1.0 // indirect
@@ -94,8 +94,8 @@ require (
github.com/src-d/gcfg v1.3.0 // indirect
github.com/xanzy/ssh-agent v0.1.0 // indirect
go.opencensus.io v0.22.3 // indirect
- golang.org/x/text v0.3.6 // indirect
- golang.org/x/tools v0.1.7 // indirect
+ golang.org/x/text v0.6.0 // indirect
+ golang.org/x/tools v0.1.12 // indirect
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a // indirect
google.golang.org/grpc v1.33.2 // indirect
diff --git a/go.sum b/go.sum
index 422a891e0..1936e7978 100644
--- a/go.sum
+++ b/go.sum
@@ -107,6 +107,8 @@ github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZo
github.com/aws/aws-sdk-go v1.17.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.30 h1:I9qj6zW3mMfsg91e+GMSN/INcaX9tTFvr/l/BAHKaIY=
github.com/aws/aws-sdk-go v1.25.30/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
+github.com/aws/aws-sdk-go v1.44.174 h1:9lR4a6MKQW/t6YCG0ZKAt1GAkjdEPP8sWch/pfcuR0c=
+github.com/aws/aws-sdk-go v1.44.174/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v0.23.0 h1:+E1q1LLSfHSDn/DzOtdJOX+pLZE2HiNV2yO5AjZINwM=
github.com/aws/aws-sdk-go-v2 v0.23.0/go.mod h1:2LhT7UgHOXK3UXONKI5OMgIyoQL6zTAw/jwIeX6yqzw=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -453,6 +455,9 @@ github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
+github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/johannesboyne/gofakes3 v0.0.0-20200716060623-6b2b4cb092cc h1:JJPhSHowepOF2+ElJVyb9jgt5ZyBkPMkPuhS0uODSFs=
@@ -701,6 +706,7 @@ github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs=
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
@@ -765,6 +771,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -803,6 +810,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
+golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
+golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -819,6 +830,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -884,8 +896,17 @@ golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
+golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.1.0 h1:g6Z6vPFA9dYBAF7DWcH6sCcOntplXsDKcliusYijMlw=
+golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -894,6 +915,11 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
+golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
+golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -940,6 +966,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
+golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 52b73f781..2d0d4ca3f 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -13,6 +13,7 @@ import (
"encoding/json"
"fmt"
"math/big"
+ "strconv"
"sync"
"sync/atomic"
"time"
@@ -40,14 +41,15 @@ const (
)
type ec2InstanceSetConfig struct {
- AccessKeyID string
- SecretAccessKey string
- Region string
- SecurityGroupIDs arvados.StringSet
- SubnetID string
- AdminUsername string
- EBSVolumeType string
- IAMInstanceProfile string
+ AccessKeyID string
+ SecretAccessKey string
+ Region string
+ SecurityGroupIDs arvados.StringSet
+ SubnetID string
+ AdminUsername string
+ EBSVolumeType string
+ IAMInstanceProfile string
+ SpotPriceUpdateInterval arvados.Duration
}
type ec2Interface interface {
@@ -55,6 +57,8 @@ type ec2Interface interface {
ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error)
RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error)
DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error)
+ DescribeInstanceStatusPages(input *ec2.DescribeInstanceStatusInput, fn func(*ec2.DescribeInstanceStatusOutput, bool) bool) error
+ DescribeSpotPriceHistoryPages(input *ec2.DescribeSpotPriceHistoryInput, fn func(*ec2.DescribeSpotPriceHistoryOutput, bool) bool) error
CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error)
TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error)
}
@@ -68,6 +72,10 @@ type ec2InstanceSet struct {
keys map[string]string
throttleDelayCreate atomic.Value
throttleDelayInstances atomic.Value
+
+ prices map[priceKey][]cloud.InstancePrice
+ pricesLock sync.Mutex
+ pricesUpdated map[priceKey]time.Time
}
func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
@@ -237,12 +245,15 @@ func (instanceSet *ec2InstanceSet) Create(
}
}
+ if instanceSet.ec2config.SpotPriceUpdateInterval <= 0 {
+ instanceSet.ec2config.SpotPriceUpdateInterval = arvados.Duration(time.Hour)
+ }
+
rsv, err := instanceSet.client.RunInstances(&rii)
err = wrapError(err, &instanceSet.throttleDelayCreate)
if err != nil {
return nil, err
}
-
return &ec2Instance{
provider: instanceSet,
instance: rsv.Instances[0],
@@ -257,6 +268,7 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
Values: []*string{aws.String(v)},
})
}
+ needAZs := false
dii := &ec2.DescribeInstancesInput{Filters: filters}
for {
dio, err := instanceSet.client.DescribeInstances(dii)
@@ -268,23 +280,139 @@ func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances
for _, rsv := range dio.Reservations {
for _, inst := range rsv.Instances {
if *inst.State.Name != "shutting-down" && *inst.State.Name != "terminated" {
- instances = append(instances, &ec2Instance{instanceSet, inst})
+ instances = append(instances, &ec2Instance{
+ provider: instanceSet,
+ instance: inst,
+ })
+ if aws.StringValue(inst.InstanceLifecycle) == "spot" {
+ needAZs = true
+ }
}
}
}
if dio.NextToken == nil {
- return instances, err
+ break
}
dii.NextToken = dio.NextToken
}
+ if needAZs {
+ az := map[string]string{}
+ instanceSet.client.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{}, func(page *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
+ for _, ent := range page.InstanceStatuses {
+ az[*ent.InstanceId] = *ent.AvailabilityZone
+ }
+ return true
+ })
+ for _, inst := range instances {
+ inst := inst.(*ec2Instance)
+ inst.availabilityZone = az[*inst.instance.InstanceId]
+ }
+ }
+ go instanceSet.updateSpotPrices(instances)
+ return instances, err
+}
+
+type priceKey struct {
+ instanceType string
+ spot bool
+ availabilityZone string
+}
+
+// Refresh recent spot instance pricing data for the given instances,
+// unless we already have recent pricing data for all relevant types.
+func (instanceSet *ec2InstanceSet) updateSpotPrices(instances []cloud.Instance) {
+ if len(instances) == 0 {
+ return
+ }
+
+ instanceSet.pricesLock.Lock()
+ defer instanceSet.pricesLock.Unlock()
+ if instanceSet.prices == nil {
+ instanceSet.prices = map[priceKey][]cloud.InstancePrice{}
+ instanceSet.pricesUpdated = map[priceKey]time.Time{}
+ }
+
+ updateTime := time.Now()
+ staleTime := updateTime.Add(-instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+ needUpdate := false
+ var typeFilterValues []*string
+ for _, inst := range instances {
+ ec2inst := inst.(*ec2Instance).instance
+ if aws.StringValue(ec2inst.InstanceLifecycle) == "spot" {
+ pk := priceKey{
+ instanceType: *ec2inst.InstanceType,
+ spot: true,
+ availabilityZone: inst.(*ec2Instance).availabilityZone,
+ }
+ if instanceSet.pricesUpdated[pk].Before(staleTime) {
+ needUpdate = true
+ }
+ typeFilterValues = append(typeFilterValues, ec2inst.InstanceType)
+ }
+ }
+ if !needUpdate {
+ return
+ }
+ // Get 3x update interval worth of pricing data. (Ideally the
+ // AWS API would tell us "we have shown you all of the price
+ // changes up to time T", but it doesn't, so we'll just ask
+ // for 3 intervals worth of data on each update, de-duplicate
+ // the data points, and not worry too much about occasionally
+ // missing some data points when our lookups fail twice in a
+ // row.
+ dsphi := &ec2.DescribeSpotPriceHistoryInput{
+ StartTime: aws.Time(updateTime.Add(-3 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())),
+ Filters: []*ec2.Filter{
+ &ec2.Filter{Name: aws.String("InstanceType"), Values: typeFilterValues},
+ },
+ }
+ err := instanceSet.client.DescribeSpotPriceHistoryPages(dsphi, func(page *ec2.DescribeSpotPriceHistoryOutput, lastPage bool) bool {
+ for _, ent := range page.SpotPriceHistory {
+ if ent.InstanceType == nil || ent.SpotPrice == nil || ent.Timestamp == nil {
+ // bogus record?
+ continue
+ }
+ price, err := strconv.ParseFloat(*ent.SpotPrice, 64)
+ if err != nil {
+ // bogus record?
+ continue
+ }
+ pk := priceKey{
+ instanceType: *ent.InstanceType,
+ spot: true,
+ availabilityZone: *ent.AvailabilityZone,
+ }
+ instanceSet.prices[pk] = append(instanceSet.prices[pk], cloud.InstancePrice{
+ StartTime: *ent.Timestamp,
+ Price: price,
+ })
+ instanceSet.pricesUpdated[pk] = updateTime
+ }
+ return true
+ })
+ if err != nil {
+ instanceSet.logger.Warnf("error retrieving spot instance prices: %s", err)
+ }
+
+ expiredTime := updateTime.Add(-64 * instanceSet.ec2config.SpotPriceUpdateInterval.Duration())
+ for pk, last := range instanceSet.pricesUpdated {
+ if last.Before(expiredTime) {
+ delete(instanceSet.pricesUpdated, pk)
+ delete(instanceSet.prices, pk)
+ }
+ }
+ for pk, prices := range instanceSet.prices {
+ instanceSet.prices[pk] = cloud.NormalizePriceHistory(prices)
+ }
}
func (instanceSet *ec2InstanceSet) Stop() {
}
type ec2Instance struct {
- provider *ec2InstanceSet
- instance *ec2.Instance
+ provider *ec2InstanceSet
+ instance *ec2.Instance
+ availabilityZone string // sometimes available for spot instances
}
func (inst *ec2Instance) ID() cloud.InstanceID {
@@ -348,6 +476,40 @@ func (inst *ec2Instance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
return cloud.ErrNotImplemented
}
+// PriceHistory returns the price history for this specific instance.
+//
+// AWS documentation is elusive about whether the hourly cost of a
+// given spot instance changes as the current spot price changes for
+// the corresponding instance type and availability zone. Our
+// implementation assumes the answer is yes, based on the following
+// hints.
+//
+// https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-requests.html
+// says: "After your Spot Instance is running, if the Spot price rises
+// above your maximum price, Amazon EC2 interrupts your Spot
+// Instance." (This doesn't address what happens when the spot price
+// rises *without* exceeding your maximum price.)
+//
+// https://docs.aws.amazon.com/whitepapers/latest/cost-optimization-leveraging-ec2-spot-instances/how-spot-instances-work.html
+// says: "You pay the Spot price that's in effect, billed to the
+// nearest second." (But it's not explicitly stated whether "the price
+// in effect" changes over time for a given instance.)
+//
+// The same page also says, in a discussion about the effect of
+// specifying a maximum price: "Note that you never pay more than the
+// Spot price that is in effect when your Spot Instance is running."
+// (The use of the phrase "is running", as opposed to "was launched",
+// hints that pricing is dynamic.)
+func (inst *ec2Instance) PriceHistory() []cloud.InstancePrice {
+ inst.provider.pricesLock.Lock()
+ defer inst.provider.pricesLock.Unlock()
+ return inst.provider.prices[priceKey{
+ instanceType: *inst.instance.InstanceType,
+ spot: aws.StringValue(inst.instance.InstanceLifecycle) == "spot",
+ availabilityZone: inst.availabilityZone,
+ }]
+}
+
type rateLimitError struct {
error
earliestRetry time.Time
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 3cd238ded..90cbd664b 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -27,6 +27,7 @@ import (
"flag"
"sync/atomic"
"testing"
+ "time"
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
@@ -56,6 +57,8 @@ type testConfig struct {
}
type ec2stub struct {
+ c *check.C
+ reftime time.Time
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
@@ -68,13 +71,64 @@ func (e *ec2stub) DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.Descr
func (e *ec2stub) RunInstances(input *ec2.RunInstancesInput) (*ec2.Reservation, error) {
return &ec2.Reservation{Instances: []*ec2.Instance{{
- InstanceId: aws.String("i-123"),
- Tags: input.TagSpecifications[0].Tags,
+ InstanceId: aws.String("i-123"),
+ InstanceType: aws.String("t2.micro"),
+ Tags: input.TagSpecifications[0].Tags,
}}}, nil
}
func (e *ec2stub) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
- return &ec2.DescribeInstancesOutput{}, nil
+ return &ec2.DescribeInstancesOutput{
+ Reservations: []*ec2.Reservation{{
+ Instances: []*ec2.Instance{{
+ InstanceId: aws.String("i-123"),
+ InstanceLifecycle: aws.String("spot"),
+ InstanceType: aws.String("t2.micro"),
+ }},
+ }},
+ }, nil
+}
+
+func (e *ec2stub) DescribeInstanceStatusPages(input *ec2.DescribeInstanceStatusInput, fn func(*ec2.DescribeInstanceStatusOutput, bool) bool) error {
+ fn(&ec2.DescribeInstanceStatusOutput{
+ InstanceStatuses: []*ec2.InstanceStatus{{
+ InstanceId: aws.String("i-123"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ }},
+ }, true)
+ return nil
+}
+
+func (e *ec2stub) DescribeSpotPriceHistoryPages(input *ec2.DescribeSpotPriceHistoryInput, fn func(*ec2.DescribeSpotPriceHistoryOutput, bool) bool) error {
+ if !fn(&ec2.DescribeSpotPriceHistoryOutput{
+ SpotPriceHistory: []*ec2.SpotPrice{
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.005"),
+ Timestamp: aws.Time(e.reftime.Add(-9 * time.Minute)),
+ },
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.015"),
+ Timestamp: aws.Time(e.reftime.Add(-5 * time.Minute)),
+ },
+ },
+ }, false) {
+ return nil
+ }
+ fn(&ec2.DescribeSpotPriceHistoryOutput{
+ SpotPriceHistory: []*ec2.SpotPrice{
+ &ec2.SpotPrice{
+ InstanceType: aws.String("t2.micro"),
+ AvailabilityZone: aws.String("aa-east-1a"),
+ SpotPrice: aws.String("0.01"),
+ Timestamp: aws.Time(e.reftime.Add(-2 * time.Minute)),
+ },
+ },
+ }, true)
+ return nil
}
func (e *ec2stub) CreateTags(input *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
@@ -85,7 +139,7 @@ func (e *ec2stub) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.T
return nil, nil
}
-func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) {
+func GetInstanceSet(c *check.C) (cloud.InstanceSet, cloud.ImageID, arvados.Cluster) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
@@ -98,7 +152,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
Preemptible: false,
},
"tiny-with-extra-scratch": {
- Name: "tiny",
+ Name: "tiny-with-extra-scratch",
ProviderType: "t2.micro",
VCPUs: 1,
RAM: 4000000000,
@@ -107,7 +161,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
AddedScratch: 20000000000,
},
"tiny-preemptible": {
- Name: "tiny",
+ Name: "tiny-preemptible",
ProviderType: "t2.micro",
VCPUs: 1,
RAM: 4000000000,
@@ -119,37 +173,30 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
if *live != "" {
var exampleCfg testConfig
err := config.LoadFile(&exampleCfg, *live)
- if err != nil {
- return nil, cloud.ImageID(""), cluster, err
- }
+ c.Assert(err, check.IsNil)
ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
- return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
+ c.Assert(err, check.IsNil)
+ return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster
}
ap := ec2InstanceSet{
ec2config: ec2InstanceSetConfig{},
instanceSetID: "test123",
logger: logrus.StandardLogger(),
- client: &ec2stub{},
+ client: &ec2stub{c: c, reftime: time.Now()},
keys: make(map[string]string),
}
- return &ap, cloud.ImageID("blob"), cluster, nil
+ return &ap, cloud.ImageID("blob"), cluster
}
func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
"TestTagName": "test tag value",
}, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
-
c.Assert(err, check.IsNil)
tags := inst.Tags()
@@ -159,13 +206,8 @@ func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
@@ -181,13 +223,8 @@ func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
- ap, img, cluster, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, img, cluster := GetInstanceSet(c)
pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
- c.Assert(err, check.IsNil)
inst, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"],
img, map[string]string{
@@ -203,11 +240,7 @@ func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
}
func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
@@ -219,13 +252,8 @@ func (*EC2InstanceSetSuite) TestTagInstances(c *check.C) {
}
func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider: ", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
-
c.Assert(err, check.IsNil)
for _, i := range l {
@@ -235,11 +263,7 @@ func (*EC2InstanceSetSuite) TestListInstances(c *check.C) {
}
func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
- ap, _, _, err := GetInstanceSet()
- if err != nil {
- c.Fatal("Error making provider", err)
- }
-
+ ap, _, _ := GetInstanceSet(c)
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
@@ -248,6 +272,22 @@ func (*EC2InstanceSetSuite) TestDestroyInstances(c *check.C) {
}
}
+func (*EC2InstanceSetSuite) TestInstancePriceHistory(c *check.C) {
+ ap, img, cluster := GetInstanceSet(c)
+ pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
+ is := ap.(*ec2InstanceSet)
+ inst1, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, nil, "true", pk)
+ c.Assert(err, check.IsNil)
+ inst2, err := ap.Create(cluster.InstanceTypes["tiny-preemptible"], img, nil, "true", pk)
+ c.Assert(err, check.IsNil)
+
+ time.Sleep(time.Millisecond)
+ is.updateSpotPrices([]cloud.Instance{inst1, inst2})
+ c.Check(inst1.PriceHistory(), check.HasLen, 1)
+ c.Check(inst2.PriceHistory(), check.HasLen, 1)
+ c.Error("TODO check prices")
+}
+
func (*EC2InstanceSetSuite) TestWrapError(c *check.C) {
retryError := awserr.New("Throttling", "", nil)
wrapped := wrapError(retryError, &atomic.Value{})
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index 2d53a49c5..7f5904968 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -102,6 +102,9 @@ type Instance interface {
// Replace tags with the given tags
SetTags(InstanceTags) error
+ // Get recent price history, if available
+ PriceHistory() []InstancePrice
+
// Shut down the node
Destroy() error
}
@@ -141,6 +144,11 @@ type InstanceSet interface {
Stop()
}
+type InstancePrice struct {
+ StartTime time.Time
+ Price float64
+}
+
type InitCommand string
// A Driver returns an InstanceSet that uses the given InstanceSetID
diff --git a/lib/cloud/price.go b/lib/cloud/price.go
new file mode 100644
index 000000000..234564b68
--- /dev/null
+++ b/lib/cloud/price.go
@@ -0,0 +1,28 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+ "sort"
+)
+
+// NormalizePriceHistory de-duplicates and sorts instance prices, most
+// recent first.
+//
+// The provided slice is modified in place.
+func NormalizePriceHistory(prices []InstancePrice) []InstancePrice {
+ // sort by timestamp, newest first
+ sort.Slice(prices, func(i, j int) bool {
+ return prices[i].StartTime.After(prices[j].StartTime)
+ })
+ // remove duplicate data points, keeping the oldest
+ for i := 0; i < len(prices)-1; i++ {
+ if prices[i].StartTime == prices[i+1].StartTime || prices[i].Price == prices[i+1].Price {
+ prices = append(prices[:i], prices[i+1:]...)
+ i--
+ }
+ }
+ return prices
+}
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 2d9119adf..75aeb832b 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1395,6 +1395,11 @@ Clusters:
# the cloud dispatcher. Leave blank when not needed.
IAMInstanceProfile: ""
+ # (ec2) how often to look up spot instance pricing data
+ # (only while running spot instances) for the purpose of
+ # calculating container cost estimates.
+ SpotPriceUpdateInterval: 1h
+
# (azure) Credentials.
SubscriptionID: ""
ClientID: ""
diff --git a/lib/config/export.go b/lib/config/export.go
index 069e300c5..04bee6980 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -135,6 +135,7 @@ var whitelist = map[string]bool{
"Containers.MaxRetryAttempts": true,
"Containers.MinRetryPeriod": true,
"Containers.PreemptiblePriceFactor": false,
+ "Containers.PriceCheckInterval": false,
"Containers.ReserveExtraRAM": true,
"Containers.RuntimeEngine": true,
"Containers.ShellAccess": true,
diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go
index 8a919bc5e..adb65324b 100644
--- a/lib/crunchrun/background.go
+++ b/lib/crunchrun/background.go
@@ -21,6 +21,7 @@ var (
lockprefix = "crunch-run-"
locksuffix = ".lock"
brokenfile = "crunch-run-broken"
+ pricesfile = "crunch-run-prices.json"
)
// procinfo is saved in each process's lockfile.
@@ -183,7 +184,20 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
}
// ListProcesses lists UUIDs of active crunch-run processes.
-func ListProcesses(stdout, stderr io.Writer) int {
+func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int {
+ if buf, err := io.ReadAll(stdin); err == nil && len(buf) > 0 {
+ // write latest pricing data to disk where
+ // current/future crunch-run processes can load it
+ fnm := filepath.Join(lockdir, pricesfile)
+ fnmtmp := fmt.Sprintf("%s~%d", fnm, os.Getpid())
+ err := os.WriteFile(fnmtmp, buf, 0777)
+ if err != nil {
+ fmt.Fprintf(stderr, "error writing price data to %s: %s", fnmtmp, err)
+ } else if err = os.Rename(fnmtmp, fnm); err != nil {
+ fmt.Fprintf(stderr, "error renaming %s to %s: %s", fnmtmp, fnm, err)
+ os.Remove(fnmtmp)
+ }
+ }
// filepath.Walk does not follow symlinks, so we must walk
// lockdir+"/." in case lockdir itself is a symlink.
walkdir := lockdir + "/."
@@ -245,7 +259,7 @@ func ListProcesses(stdout, stderr io.Writer) int {
fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
return nil
}
- err = proc.Signal(syscall.Signal(0))
+ err = proc.Signal(syscall.SIGUSR2)
if err != nil {
// Process is dead, even though lockfile was
// still locked. Most likely a stuck arv-mount
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 52e1bf63a..1dd232d3e 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -31,6 +31,7 @@ import (
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
@@ -177,6 +178,9 @@ type ContainerRunner struct {
containerWatchdogInterval time.Duration
gateway Gateway
+
+ prices []cloud.InstancePrice
+ pricesLock sync.Mutex
}
// setupSignals sets up signal handling to gracefully terminate the
@@ -1469,10 +1473,7 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
if runner.finalState == "Complete" && runner.OutputPDH != nil {
update["output"] = *runner.OutputPDH
}
- var it arvados.InstanceType
- if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
- update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
- }
+ update["cost"] = runner.calculateCost(time.Now())
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
@@ -1514,6 +1515,16 @@ func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("Executing on host '%s'", hostname)
}
+ sigusr2 := make(chan os.Signal, 1)
+ signal.Notify(sigusr2, syscall.SIGUSR2)
+ defer signal.Stop(sigusr2)
+ runner.loadPrices()
+ go func() {
+ for range sigusr2 {
+ runner.loadPrices()
+ }
+ }()
+
runner.finalState = "Queued"
defer func() {
@@ -1756,7 +1767,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
- list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+ list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)")
enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage")
enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
@@ -1792,11 +1803,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, stdin, stdout, stderr)
case *kill >= 0:
- return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr)
case *list:
- return ListProcesses(os.Stdout, os.Stderr)
+ return ListProcesses(stdin, stdout, stderr)
}
if len(containerUUID) != 27 {
@@ -2223,3 +2234,70 @@ func localKeepstoreAddr() string {
})
return ips[0].String()
}
+
+func (cr *ContainerRunner) loadPrices() {
+ buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+ if err != nil {
+ if !os.IsNotExist(err) {
+ cr.CrunchLog.Printf("loadPrices: read: %s", err)
+ }
+ return
+ }
+ var prices []cloud.InstancePrice
+ err = json.Unmarshal(buf, &prices)
+ if err != nil {
+ cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+ return
+ }
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+ cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+
+ // First, make a "prices" slice with the real data as far back
+ // as it goes, and (if needed) a "since the beginning of time"
+ // placeholder containing a reasonable guess about what the
+ // price was between cr.costStartTime and the earliest real
+ // data point.
+ prices := cr.prices
+ if len(prices) == 0 {
+ // use price info in InstanceType record initially
+ // provided by cloud dispatcher
+ var p float64
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ p = it.Price
+ }
+ prices = []cloud.InstancePrice{{Price: p}}
+ } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+ // guess earlier pricing was the same as the earliest
+ // price we know about
+ filler := prices[len(prices)-1]
+ filler.StartTime = time.Time{}
+ prices = append(prices, filler)
+ }
+
+ // Now that our history of price changes goes back at least as
+ // far as cr.costStartTime, add up the costs for each
+ // interval.
+ cost := 0.0
+ spanEnd := now
+ for _, ip := range prices {
+ spanStart := ip.StartTime
+ last := false
+ if spanStart.Before(cr.costStartTime) {
+ spanStart = cr.costStartTime
+ last = true
+ }
+ cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+ if last {
+ break
+ }
+ spanEnd = spanStart
+ }
+ return cost
+}
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 8da933a73..91a46e10e 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -2071,6 +2072,48 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
}
+func (s *TestSuite) TestCalculateCost(c *C) {
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+ now := time.Now()
+ cr := ContainerRunner{costStartTime: now.Add(-time.Hour)}
+
+ // if there's no InstanceType env var, cost is calculated as 0
+ os.Unsetenv("InstanceType")
+ cost := cr.calculateCost(now)
+ c.Check(cost, Equals, 0.0)
+
+ // with InstanceType env var and loadPrices() hasn't run (or
+ // hasn't found any data), cost is calculated based on
+ // InstanceType env var
+ os.Setenv("InstanceType", `{"Price":1.2}`)
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.2)
+
+ // first update tells us the spot price was $1/h until 30
+ // minutes ago when it increased to $2/h
+ j, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+ {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.5)
+
+ // next update (via --list + SIGUSR2) tells us the spot price
+ // increased to $3/h 15 minutes ago
+ j, err = json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+}
+
type FakeProcess struct {
cmdLine []string
}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index f57db0f09..bb134e454 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -470,3 +470,7 @@ func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
}
return dst
}
+
+func (si stubInstance) PriceHistory() []cloud.InstancePrice {
+ return nil
+}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index b01a820cd..397a46292 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -6,7 +6,9 @@ package worker
import (
"bytes"
+ "encoding/json"
"fmt"
+ "io"
"path/filepath"
"strings"
"sync"
@@ -381,7 +383,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd = "sudo " + cmd
}
before := time.Now()
- stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
+ var stdin io.Reader
+ if prices := wkr.instance.PriceHistory(); len(prices) > 0 {
+ j, _ := json.Marshal(prices)
+ stdin = bytes.NewReader(j)
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list