[ARVADOS] created: 1.3.0-1772-g9481ff4a2

Git user git at public.curoverse.com
Tue Oct 22 13:42:02 UTC 2019


        at  9481ff4a22314c0d5acffe78fbc7595278414e6f (commit)


commit 9481ff4a22314c0d5acffe78fbc7595278414e6f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 22 09:40:11 2019 -0400

    15734: Save dispatcher's InstanceType record in container log.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go
index c30ff9f2b..9cd1d8d6b 100644
--- a/lib/dispatchcloud/worker/runner.go
+++ b/lib/dispatchcloud/worker/runner.go
@@ -11,7 +11,6 @@ import (
 	"syscall"
 	"time"
 
-	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"github.com/sirupsen/logrus"
 )
 
@@ -20,7 +19,7 @@ import (
 type remoteRunner struct {
 	uuid          string
 	executor      Executor
-	arvClient     *arvados.Client
+	envJSON       json.RawMessage
 	remoteUser    string
 	timeoutTERM   time.Duration
 	timeoutSignal time.Duration
@@ -36,10 +35,33 @@ type remoteRunner struct {
 // newRemoteRunner returns a new remoteRunner. Caller should ensure
 // Close() is called to release resources.
 func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
+	// In order to stay compatible with recent dev/experimental
+	// versions of crunch-run, we need to pass a map with string
+	// values only, so we JSON-encode the instance type
+	// record. Once worker images are updated we can skip this and
+	// just pass {"InstanceType": wkr.instType}.
+	var instJSON bytes.Buffer
+	enc := json.NewEncoder(&instJSON)
+	enc.SetIndent("", "    ")
+	if err := enc.Encode(wkr.instType); err != nil {
+		panic(err)
+	}
+	env := map[string]string{
+		"ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+		"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+		"InstanceType":      instJSON.String(),
+	}
+	if wkr.wp.arvClient.Insecure {
+		env["ARVADOS_API_HOST_INSECURE"] = "1"
+	}
+	envJSON, err := json.Marshal(env)
+	if err != nil {
+		panic(err)
+	}
 	rr := &remoteRunner{
 		uuid:          uuid,
 		executor:      wkr.executor,
-		arvClient:     wkr.wp.arvClient,
+		envJSON:       envJSON,
 		remoteUser:    wkr.instance.RemoteUser(),
 		timeoutTERM:   wkr.wp.timeoutTERM,
 		timeoutSignal: wkr.wp.timeoutSignal,
@@ -57,22 +79,11 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-	env := map[string]string{
-		"ARVADOS_API_HOST":  rr.arvClient.APIHost,
-		"ARVADOS_API_TOKEN": rr.arvClient.AuthToken,
-	}
-	if rr.arvClient.Insecure {
-		env["ARVADOS_API_HOST_INSECURE"] = "1"
-	}
-	envJSON, err := json.Marshal(env)
-	if err != nil {
-		panic(err)
-	}
-	stdin := bytes.NewBuffer(envJSON)
 	cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
 	if rr.remoteUser != "root" {
 		cmd = "sudo " + cmd
 	}
+	stdin := bytes.NewBuffer(rr.envJSON)
 	stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
 	if err != nil {
 		rr.logger.WithField("stdout", string(stdout)).
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
index 4f9ba911c..943fa7c71 100644
--- a/lib/dispatchcloud/worker/worker_test.go
+++ b/lib/dispatchcloud/worker/worker_test.go
@@ -25,6 +25,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 	bootTimeout := time.Minute
 	probeTimeout := time.Second
 
+	ac := arvados.NewClientFromEnv()
 	is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
 	c.Assert(err, check.IsNil)
 	inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
@@ -192,6 +193,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
 			"crunch-run --list": trial.respRun,
 		}
 		wp := &Pool{
+			arvClient:        ac,
 			newExecutor:      func(cloud.Instance) Executor { return exr },
 			bootProbeCommand: "bootprobe",
 			timeoutBooting:   bootTimeout,
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 3261291b5..082ca0ce9 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -95,7 +95,7 @@ type ContainerRunner struct {
 	Docker ThinDockerClient
 
 	// Dispatcher client is initialized with the Dispatcher token.
-	// This is a priviledged token used to manage container status
+	// This is a privileged token used to manage container status
 	// and logs.
 	//
 	// We have both dispatcherClient and DispatcherArvClient
@@ -116,6 +116,9 @@ type ContainerRunner struct {
 	ContainerArvClient  IArvadosClient
 	ContainerKeepClient IKeepClient
 
+	// environment provided by arvados-dispatch-cloud
+	dispatchEnv map[string]interface{}
+
 	Container       arvados.Container
 	ContainerConfig dockercontainer.Config
 	HostConfig      dockercontainer.HostConfig
@@ -850,21 +853,55 @@ func (runner *ContainerRunner) LogContainerRecord() error {
 	return err
 }
 
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
 func (runner *ContainerRunner) LogNodeRecord() error {
-	hostname := os.Getenv("SLURMD_NODENAME")
-	if hostname == "" {
-		hostname, _ = os.Hostname()
-	}
-	_, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
-		// The "info" field has admin-only info when obtained
-		// with a privileged token, and should not be logged.
-		node, ok := resp.(map[string]interface{})
-		if ok {
-			delete(node, "info")
-		}
-	})
-	return err
+	if it, ok := runner.dispatchEnv["InstanceType"]; ok {
+		// Dispatched via arvados-dispatch-cloud. Save
+		// InstanceType config fragment received from
+		// dispatcher on stdin.
+		w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+		if err != nil {
+			return err
+		}
+		defer w.Close()
+		if it, ok := it.(string); ok {
+			// dispatcher supplied JSON data (in order to
+			// stay compatible with old crunch-run
+			// versions)
+			_, err = io.WriteString(w, it)
+			if err != nil {
+				return err
+			}
+		} else {
+			// dispatcher supplied struct
+			enc := json.NewEncoder(w)
+			enc.SetIndent("", "    ")
+			err = enc.Encode(it)
+			if err != nil {
+				return err
+			}
+		}
+		return w.Close()
+	} else {
+		// Dispatched via crunch-dispatch-slurm. Look up
+		// apiserver's node record corresponding to
+		// $SLURMD_NODENAME.
+		hostname := os.Getenv("SLURMD_NODENAME")
+		if hostname == "" {
+			hostname, _ = os.Hostname()
+		}
+		_, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+			// The "info" field has admin-only info when
+			// obtained with a privileged token, and
+			// should not be logged.
+			node, ok := resp.(map[string]interface{})
+			if ok {
+				delete(node, "info")
+			}
+		})
+		return err
+	}
 }
 
 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
@@ -1774,11 +1811,12 @@ func main() {
 
 	flag.Parse()
 
+	var env map[string]interface{}
 	if *stdinEnv && !ignoreDetachFlag {
 		// Load env vars on stdin if asked (but not in a
 		// detached child process, in which case stdin is
 		// /dev/null).
-		loadEnv(os.Stdin)
+		env = loadEnv(os.Stdin)
 	}
 
 	switch {
@@ -1833,6 +1871,8 @@ func main() {
 		os.Exit(1)
 	}
 
+	cr.dispatchEnv = env
+
 	parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
 	if tmperr != nil {
 		log.Fatalf("%s: %v", containerId, tmperr)
@@ -1872,20 +1912,23 @@ func main() {
 	}
 }
 
-func loadEnv(rdr io.Reader) {
+func loadEnv(rdr io.Reader) map[string]interface{} {
 	buf, err := ioutil.ReadAll(rdr)
 	if err != nil {
 		log.Fatalf("read stdin: %s", err)
 	}
-	var env map[string]string
+	var env map[string]interface{}
 	err = json.Unmarshal(buf, &env)
 	if err != nil {
 		log.Fatalf("decode stdin: %s", err)
 	}
 	for k, v := range env {
-		err = os.Setenv(k, v)
-		if err != nil {
-			log.Fatalf("setenv(%q): %s", k, err)
+		if v, ok := v.(string); ok {
+			err = os.Setenv(k, v)
+			if err != nil {
+				log.Fatalf("setenv(%q): %s", k, err)
+			}
 		}
 	}
+	return env
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list