[ARVADOS] created: 2.1.0-2311-gace3f32b7

Git user git at public.arvados.org
Thu Apr 14 06:35:00 UTC 2022


        at  ace3f32b75b7ba599d76450bd9b3b6fee42c7799 (commit)


commit ace3f32b75b7ba599d76450bd9b3b6fee42c7799
Author: Tom Clegg <tom at curii.com>
Date:   Thu Apr 14 02:33:03 2022 -0400

    18992: Load cluster config file if present in HPC environment.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 219ed3b98..6ac7a5f0c 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -32,9 +32,11 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/crunchstat"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	"git.arvados.org/arvados.git/sdk/go/manifest"
 	"golang.org/x/sys/unix"
@@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
 	detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
 	stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+	configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
 	sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
 	kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
 	list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1768,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
+	var keepstoreLogbuf bufThenWrite
 	var conf ConfigData
 	if *stdinConfig {
 		err := json.NewDecoder(stdin).Decode(&conf)
@@ -1789,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 			// fill it using the container UUID prefix.
 			conf.Cluster.ClusterID = containerUUID[:5]
 		}
+	} else {
+		conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
 	}
 
 	log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1798,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		arvadosclient.CertFiles = []string{*caCertsPath}
 	}
 
-	var keepstoreLogbuf bufThenWrite
 	keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
 	if err != nil {
 		log.Print(err)
@@ -1959,8 +1964,61 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	return 0
 }
 
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+	var conf ConfigData
+	conf.Cluster = loadClusterConfigFile(configFile, stderr)
+	if conf.Cluster == nil {
+		// skip loading the container record -- we won't be
+		// able to start local keepstore anyway.
+		return conf
+	}
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+		return conf
+	}
+	arv.Retries = 8
+	var ctr arvados.Container
+	err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+	if err != nil {
+		fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+		return conf
+	}
+	if ctr.RuntimeConstraints.VCPUs > 0 {
+		conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+	}
+	return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+	ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+	ldr.Path = path
+	cfg, err := ldr.Load()
+	if err != nil {
+		fmt.Fprintf(stderr, "could not load config file %s: %s", path, err)
+		return nil
+	}
+	cluster, err := cfg.GetCluster("")
+	if err != nil {
+		fmt.Fprintf(stderr, "could not use config file %s: %s", path, err)
+		return nil
+	}
+	return cluster
+}
+
 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
-	if configData.Cluster == nil || configData.KeepBuffers < 1 {
+	if configData.KeepBuffers < 1 {
+		fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+		return nil, nil
+	}
+	if configData.Cluster == nil {
+		fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
 		return nil, nil
 	}
 	for uuid, vol := range configData.Cluster.Volumes {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 62df0032b..1d2c7b09f 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -50,7 +50,6 @@ type TestSuite struct {
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
-	*brokenNodeHook = ""
 	s.client = arvados.NewClientFromEnv()
 	s.executor = &stubExecutor{}
 	var err error
@@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 		func() {
 			c.Log("// loadErr = cannot connect")
 			s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-			*brokenNodeHook = c.MkDir() + "/broken-node-hook"
-			err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+			s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+			err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
 			c.Assert(err, IsNil)
 			nextState = "Queued"
 		},
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 }`, nil, 0, func() {})
 		c.Check(s.api.CalledWith("container.state", nextState), NotNil)
 		c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-		if *brokenNodeHook != "" {
+		if s.runner.brokenNodeHook != "" {
 			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
 			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
 			c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go
index acbb11a36..e888f3151 100644
--- a/sdk/go/ctxlog/log.go
+++ b/sdk/go/ctxlog/log.go
@@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) {
 			FullTimestamp:   true,
 			TimestampFormat: rfc3339NanoFixed,
 		}
+	case "plain":
+		logger.Formatter = &logrus.TextFormatter{
+			DisableColors:    true,
+			DisableTimestamp: true,
+		}
 	case "json", "":
 		logger.Formatter = &logrus.JSONFormatter{
 			TimestampFormat: rfc3339NanoFixed,

commit f93ba744038d79211a48105bd691e26e1770cc61
Author: Tom Clegg <tom at curii.com>
Date:   Thu Apr 14 02:32:09 2022 -0400

    18992: Fix abrupt exit on cgroup test failure.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/cgroup.go b/lib/crunchrun/cgroup.go
index 0b254f5bd..48ec93b87 100644
--- a/lib/crunchrun/cgroup.go
+++ b/lib/crunchrun/cgroup.go
@@ -6,16 +6,16 @@ package crunchrun
 
 import (
 	"bytes"
+	"fmt"
 	"io/ioutil"
-	"log"
 )
 
 // Return the current process's cgroup for the given subsystem.
-func findCgroup(subsystem string) string {
+func findCgroup(subsystem string) (string, error) {
 	subsys := []byte(subsystem)
 	cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
 	if err != nil {
-		log.Fatal(err)
+		return "", err
 	}
 	for _, line := range bytes.Split(cgroups, []byte("\n")) {
 		toks := bytes.SplitN(line, []byte(":"), 4)
@@ -24,10 +24,9 @@ func findCgroup(subsystem string) string {
 		}
 		for _, s := range bytes.Split(toks[1], []byte(",")) {
 			if bytes.Compare(s, subsys) == 0 {
-				return string(toks[2])
+				return string(toks[2]), nil
 			}
 		}
 	}
-	log.Fatalf("subsystem %q not found in /proc/self/cgroup", subsystem)
-	return ""
+	return "", fmt.Errorf("subsystem %q not found in /proc/self/cgroup", subsystem)
 }
diff --git a/lib/crunchrun/cgroup_test.go b/lib/crunchrun/cgroup_test.go
index b43479a3b..eb87456d1 100644
--- a/lib/crunchrun/cgroup_test.go
+++ b/lib/crunchrun/cgroup_test.go
@@ -14,8 +14,10 @@ var _ = Suite(&CgroupSuite{})
 
 func (s *CgroupSuite) TestFindCgroup(c *C) {
 	for _, s := range []string{"devices", "cpu", "cpuset"} {
-		g := findCgroup(s)
-		c.Check(g, Not(Equals), "")
+		g, err := findCgroup(s)
+		if c.Check(err, IsNil) {
+			c.Check(g, Not(Equals), "", Commentf("subsys %q", s))
+		}
 		c.Logf("cgroup(%q) == %q", s, g)
 	}
 }
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 84b153554..219ed3b98 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -1926,7 +1926,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	cr.enableNetwork = *enableNetwork
 	cr.networkMode = *networkMode
 	if *cgroupParentSubsystem != "" {
-		p := findCgroup(*cgroupParentSubsystem)
+		p, err := findCgroup(*cgroupParentSubsystem)
+		if err != nil {
+			log.Printf("fatal: cgroup parent subsystem: %s", err)
+			return 1
+		}
 		cr.setCgroupParent = p
 		cr.expectCgroupParent = p
 	}

commit 61c742b48d46f6e21b8993ff66c72cecfbae7f49
Author: Tom Clegg <tom at curii.com>
Date:   Thu Apr 14 02:31:35 2022 -0400

    18992: Fix broken-node-hook global flag.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 65f43e964..84b153554 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -167,6 +167,7 @@ type ContainerRunner struct {
 	enableMemoryLimit bool
 	enableNetwork     string // one of "default" or "always"
 	networkMode       string // "none", "host", or "" -- passed through to executor
+	brokenNodeHook    string // script to run if node appears to be broken
 	arvMountLog       *ThrottledLogger
 
 	containerWatchdogInterval time.Duration
@@ -210,10 +211,9 @@ var errorBlacklist = []string{
 	"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
 	"(?ms).*grpc: the connection is unavailable.*",
 }
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
-	if *brokenNodeHook == "" {
+	if runner.brokenNodeHook == "" {
 		path := filepath.Join(lockdir, brokenfile)
 		runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
 		f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -223,9 +223,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
 		}
 		f.Close()
 	} else {
-		runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+		runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
 		// run killme script
-		c := exec.Command(*brokenNodeHook)
+		c := exec.Command(runner.brokenNodeHook)
 		c.Stdout = runner.CrunchLog
 		c.Stderr = runner.CrunchLog
 		err := c.Run()
@@ -1730,6 +1730,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
 	memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
 	runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+	brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
 	flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
 	ignoreDetachFlag := false
@@ -1883,6 +1884,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	}
 	defer cr.executor.Close()
 
+	cr.brokenNodeHook = *brokenNodeHook
+
 	gwAuthSecret := os.Getenv("GatewayAuthSecret")
 	os.Unsetenv("GatewayAuthSecret")
 	if gwAuthSecret == "" {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list