[arvados] created: 2.5.0-114-ge53547d2b

git repository hosting git at public.arvados.org
Mon Feb 13 21:20:46 UTC 2023

        at  e53547d2be566f3de202f61cbcfa1d9edab289f2 (commit)

commit e53547d2be566f3de202f61cbcfa1d9edab289f2
Author: Tom Clegg <tom at curii.com>
Date:   Mon Feb 13 16:14:56 2023 -0500

    19961: Detect and log EC2 spot interruption notices.
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 3def8851c..df7bbb1e9 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -50,9 +50,10 @@ var Command = command{}
 // ConfigData contains environment variables and (when needed) cluster
 // configuration, passed from dispatchcloud to crunch-run on stdin.
 type ConfigData struct {
-	Env         map[string]string
-	KeepBuffers int
-	Cluster     *arvados.Cluster
+	Env          map[string]string
+	KeepBuffers  int
+	EC2SpotCheck bool
+	Cluster      *arvados.Cluster
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -319,7 +320,9 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
 			"Block not found error",
 			"Unhandled exception during FUSE operation",
-		ReportFunc: runner.reportArvMountWarning,
+		ReportFunc: func(pattern, text string) {
+			runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+		},
 	c.Stdout = runner.arvMountLog
 	c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -1197,16 +1200,98 @@ func (runner *ContainerRunner) updateLogs() {
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
-	var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = ""
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+	type ec2metadata struct {
+		Action string    `json:"action"`
+		Time   time.Time `json:"time"`
+	}
+	var metadata ec2metadata
+	check := func() error {
+		ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+		defer cancel()
+		req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+		if err != nil {
+			return err
+		}
+		req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")
+		resp, err := http.DefaultClient.Do(req)
+		if err != nil {
+			return err
+		}
+		defer resp.Body.Close()
+		if resp.StatusCode != http.StatusOK {
+			return fmt.Errorf("%s", resp.Status)
+		}
+		token, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		req, err = http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+		if err != nil {
+			return err
+		}
+		req.Header.Set("X-aws-ec2-metadata-token", strings.TrimSpace(string(token)))
+		resp, err = http.DefaultClient.Do(req)
+		if err != nil {
+			return err
+		}
+		defer resp.Body.Close()
+		metadata = ec2metadata{}
+		if resp.StatusCode == http.StatusNotFound {
+			// "If Amazon EC2 is not preparing to stop or
+			// terminate the instance, or if you
+			// terminated the instance yourself,
+			// instance-action is not present in the
+			// instance metadata and you receive an HTTP
+			// 404 error when you try to retrieve it."
+			return nil
+		} else if resp.StatusCode != http.StatusOK {
+			return fmt.Errorf("%s", resp.Status)
+		}
+		err = json.NewDecoder(resp.Body).Decode(&metadata)
+		if err != nil {
+			return err
+		}
+		return nil
+	}
+	failures := 0
+	var lastmetadata ec2metadata
+	for range time.NewTicker(spotInterruptionCheckInterval).C {
+		err := check()
+		if err != nil {
+			runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+			failures++
+			if failures > 3 {
+				runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many errors")
+				return
+			}
+			continue
+		}
+		if metadata != lastmetadata {
+			lastmetadata = metadata
+			text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+			runner.CrunchLog.Printf("%s", text)
+			runner.updateRuntimeStatus("instance interruption", text)
+			if proc, err := os.FindProcess(os.Getpid()); err == nil {
+				// trigger updateLogs
+				proc.Signal(syscall.SIGUSR1)
+			}
+		}
+	}
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
 	err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
 		"container": arvadosclient.Dict{
 			"runtime_status": arvadosclient.Dict{
-				"warning":       "arv-mount: " + pattern,
-				"warningDetail": text,
+				"warning":       warning,
+				"warningDetail": detail,
-	}, &updated)
+	}, nil)
 	if err != nil {
 		runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
@@ -1535,6 +1620,7 @@ func (runner *ContainerRunner) Run() (err error) {
+	go runner.checkSpotInterruptionNotices()
 	runner.finalState = "Queued"
@@ -2011,6 +2097,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		cr.expectCgroupParent = p
+	if conf.EC2SpotCheck {
+		go cr.checkSpotInterruptionNotices()
+	}
 	runerr := cr.Run()
 	if *memprofile != "" {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index aaba1c420..0e34c4954 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -13,6 +13,8 @@ import (
+	"net/http"
+	"net/http/httptest"
@@ -772,6 +774,59 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
 	c.Check(ran, Equals, false)
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+	var failedOnce bool
+	var stoptime time.Time
+	token := "fake-ec2-metadata-token"
+	stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if !failedOnce {
+			w.WriteHeader(http.StatusServiceUnavailable)
+			failedOnce = true
+			return
+		}
+		switch r.URL.Path {
+		case "/latest/api/token":
+			fmt.Fprintln(w, token)
+		case "/latest/meta-data/spot/instance-action":
+			if r.Header.Get("X-aws-ec2-metadata-token") != token {
+				w.WriteHeader(http.StatusUnauthorized)
+			} else if stoptime.IsZero() {
+				w.WriteHeader(http.StatusNotFound)
+			} else {
+				fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+			}
+		default:
+			w.WriteHeader(http.StatusNotFound)
+		}
+	}))
+	defer stub.Close()
+	defer func(i time.Duration, u string) {
+		spotInterruptionCheckInterval = i
+		ec2MetadataBaseURL = u
+	}(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+	spotInterruptionCheckInterval = time.Second / 4
+	ec2MetadataBaseURL = stub.URL
+	s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {},
+    "state": "Locked"
+}`, nil, 0, func() {
+		time.Sleep(time.Second)
+		stoptime = time.Now().Add(time.Minute).UTC()
+		time.Sleep(time.Second)
+	})
+	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+	c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
 func (s *TestSuite) TestRunTimeExceeded(c *C) {
 	s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index 29c4b8e0a..6a03deae5 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -63,6 +63,9 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 		configData.Cluster = wkr.wp.cluster
 		configData.KeepBuffers = bufs * wkr.instType.VCPUs
+	if wkr.wp.cluster.CloudVMs.Driver == "ec2" && wkr.instType.Preemptible {
+		configData.EC2SpotCheck = true
+	}
 	configJSON, err := json.Marshal(configData)
 	if err != nil {



More information about the arvados-commits mailing list