[arvados] created: 2.5.0-114-ge53547d2b
git repository hosting
git at public.arvados.org
Mon Feb 13 21:20:46 UTC 2023
at e53547d2be566f3de202f61cbcfa1d9edab289f2 (commit)
commit e53547d2be566f3de202f61cbcfa1d9edab289f2
Author: Tom Clegg <tom at curii.com>
Date: Mon Feb 13 16:14:56 2023 -0500
19961: Detect and log EC2 spot interruption notices.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 3def8851c..df7bbb1e9 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -50,9 +50,10 @@ var Command = command{}
// ConfigData contains environment variables and (when needed) cluster
// configuration, passed from dispatchcloud to crunch-run on stdin.
type ConfigData struct {
- Env map[string]string
- KeepBuffers int
- Cluster *arvados.Cluster
+ Env map[string]string
+ KeepBuffers int
+ EC2SpotCheck bool
+ Cluster *arvados.Cluster
}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -319,7 +320,9 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
"Block not found error",
"Unhandled exception during FUSE operation",
},
- ReportFunc: runner.reportArvMountWarning,
+ ReportFunc: func(pattern, text string) {
+ runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+ },
}
c.Stdout = runner.arvMountLog
c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -1197,16 +1200,98 @@ func (runner *ContainerRunner) updateLogs() {
}
}
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
- var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+ type ec2metadata struct {
+ Action string `json:"action"`
+ Time time.Time `json:"time"`
+ }
+ var metadata ec2metadata
+ check := func() error {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ token, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ req, err = http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token", strings.TrimSpace(string(token)))
+ resp, err = http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ metadata = ec2metadata{}
+ if resp.StatusCode == http.StatusNotFound {
+ // "If Amazon EC2 is not preparing to stop or
+ // terminate the instance, or if you
+ // terminated the instance yourself,
+ // instance-action is not present in the
+ // instance metadata and you receive an HTTP
+ // 404 error when you try to retrieve it."
+ return nil
+ } else if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ err = json.NewDecoder(resp.Body).Decode(&metadata)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ failures := 0
+ var lastmetadata ec2metadata
+ for range time.NewTicker(spotInterruptionCheckInterval).C {
+ err := check()
+ if err != nil {
+ runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+ failures++
+ if failures > 3 {
+ runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many errors")
+ return
+ }
+ continue
+ }
+ if metadata != lastmetadata {
+ lastmetadata = metadata
+ text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ runner.CrunchLog.Printf("%s", text)
+ runner.updateRuntimeStatus("instance interruption", text)
+ if proc, err := os.FindProcess(os.Getpid()); err == nil {
+ // trigger updateLogs
+ proc.Signal(syscall.SIGUSR1)
+ }
+ }
+ }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{
"runtime_status": arvadosclient.Dict{
- "warning": "arv-mount: " + pattern,
- "warningDetail": text,
+ "warning": warning,
+ "warningDetail": detail,
},
},
- }, &updated)
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
}
@@ -1535,6 +1620,7 @@ func (runner *ContainerRunner) Run() (err error) {
runner.loadPrices()
}
}()
+ go runner.checkSpotInterruptionNotices()
runner.finalState = "Queued"
@@ -2011,6 +2097,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
cr.expectCgroupParent = p
}
+ if conf.EC2SpotCheck {
+ go cr.checkSpotInterruptionNotices()
+ }
+
runerr := cr.Run()
if *memprofile != "" {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index aaba1c420..0e34c4954 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -13,6 +13,8 @@ import (
"io"
"io/ioutil"
"log"
+ "net/http"
+ "net/http/httptest"
"os"
"os/exec"
"regexp"
@@ -772,6 +774,59 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
c.Check(ran, Equals, false)
}
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+ var failedOnce bool
+ var stoptime time.Time
+ token := "fake-ec2-metadata-token"
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if !failedOnce {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ failedOnce = true
+ return
+ }
+ switch r.URL.Path {
+ case "/latest/api/token":
+ fmt.Fprintln(w, token)
+ case "/latest/meta-data/spot/instance-action":
+ if r.Header.Get("X-aws-ec2-metadata-token") != token {
+ w.WriteHeader(http.StatusUnauthorized)
+ } else if stoptime.IsZero() {
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+ }
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer stub.Close()
+
+ defer func(i time.Duration, u string) {
+ spotInterruptionCheckInterval = i
+ ec2MetadataBaseURL = u
+ }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ spotInterruptionCheckInterval = time.Second / 4
+ ec2MetadataBaseURL = stub.URL
+
+ s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked"
+}`, nil, 0, func() {
+ time.Sleep(time.Second)
+ stoptime = time.Now().Add(time.Minute).UTC()
+ time.Sleep(time.Second)
+ })
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+}
+
func (s *TestSuite) TestRunTimeExceeded(c *C) {
s.fullRunHelper(c, `{
"command": ["sleep", "3"],
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index 29c4b8e0a..6a03deae5 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -63,6 +63,9 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
configData.Cluster = wkr.wp.cluster
configData.KeepBuffers = bufs * wkr.instType.VCPUs
}
+ if wkr.wp.cluster.CloudVMs.Driver == "ec2" && wkr.instType.Preemptible {
+ configData.EC2SpotCheck = true
+ }
configJSON, err := json.Marshal(configData)
if err != nil {
panic(err)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list