[ARVADOS] updated: 2.4.0-10-gde3df01f3

Git user git at public.arvados.org
Tue Apr 19 15:23:47 UTC 2022


Summary of changes:
 lib/config/config.default.yml                      | 26 ++++---
 lib/crunchrun/cgroup.go                            | 11 ++-
 lib/crunchrun/cgroup_test.go                       |  6 +-
 lib/crunchrun/crunchrun.go                         | 80 ++++++++++++++++++++--
 lib/crunchrun/crunchrun_test.go                    |  7 +-
 lib/crunchrun/integration_test.go                  | 52 ++++++++++----
 sdk/cwl/arvados_cwl/executor.py                    |  3 +-
 sdk/cwl/arvados_cwl/pathmapper.py                  | 52 ++++++++------
 sdk/cwl/arvados_cwl/runner.py                      | 30 ++++----
 sdk/cwl/tests/18994-basename/check.cwl             | 22 ++++++
 sdk/cwl/tests/18994-basename/rename.cwl            | 16 +++++
 sdk/cwl/tests/18994-basename/wf_ren.cwl            | 33 +++++++++
 .../cwl/tests/18994-basename/whale.txt             |  2 +-
 sdk/cwl/tests/arvados-tests.yml                    |  5 ++
 sdk/go/ctxlog/log.go                               |  5 ++
 15 files changed, 271 insertions(+), 79 deletions(-)
 create mode 100644 sdk/cwl/tests/18994-basename/check.cwl
 create mode 100644 sdk/cwl/tests/18994-basename/rename.cwl
 create mode 100644 sdk/cwl/tests/18994-basename/wf_ren.cwl
 copy tools/salt-install/tests/test.txt => sdk/cwl/tests/18994-basename/whale.txt (94%)

       via  de3df01f392c72f589b3e29f0080da437d19a011 (commit)
       via  88b0b3bf854380dd349003fdcb75a52623218028 (commit)
      from  865c356c6ab258fc5ff6a20bf976e548811638d9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit de3df01f392c72f589b3e29f0080da437d19a011
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Tue Apr 19 10:13:32 2022 -0400

    Merge branch '18994-cwl-basename' refs #18994
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 6e23d80a8..5f24d2407 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -18,6 +18,7 @@ import json
 import re
 from functools import partial
 import time
+import urllib
 
 from cwltool.errors import WorkflowException
 import cwltool.workflow
@@ -450,7 +451,7 @@ The 'jobs' API is no longer supported.
             srccollection = sp[0][5:]
             try:
                 reader = self.collection_cache.get(srccollection)
-                srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+                srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
             except arvados.errors.ArgumentError as e:
                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 4a91a7a83..64fdfa0d0 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -52,7 +52,8 @@ class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
-                 collection_pattern, file_pattern, name=None, single_collection=False):
+                 collection_pattern, file_pattern, name=None, single_collection=False,
+                 optional_deps=None):
         self.arvrunner = arvrunner
         self.input_basedir = input_basedir
         self.collection_pattern = collection_pattern
@@ -61,6 +62,7 @@ class ArvPathMapper(PathMapper):
         self.referenced_files = [r["location"] for r in referenced_files]
         self.single_collection = single_collection
         self.pdh_to_uuid = {}
+        self.optional_deps = optional_deps or []
         super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
     def visit(self, srcobj, uploadfiles):
@@ -73,6 +75,7 @@ class ArvPathMapper(PathMapper):
         if isinstance(src, basestring) and src.startswith("keep:"):
             if collection_pdh_pattern.match(src):
                 self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
                 if arvados_cwl.util.collectionUUID in srcobj:
                     self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
             elif not collection_uuid_pattern.match(src):
@@ -135,6 +138,9 @@ class ArvPathMapper(PathMapper):
                 f.write(obj["contents"])
             remap.append((obj["location"], path + "/" + obj["basename"]))
         else:
+            for opt in self.optional_deps:
+                if obj["location"] == opt["location"]:
+                    return
             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
 
     def needs_new_collection(self, srcobj, prefix=""):
@@ -149,22 +155,33 @@ class ArvPathMapper(PathMapper):
         loc = srcobj["location"]
         if loc.startswith("_:"):
             return True
-        if prefix:
-            if loc != prefix+srcobj["basename"]:
-                return True
+
+        i = loc.rfind("/")
+        if i > -1:
+            loc_prefix = loc[:i+1]
+            if not prefix:
+                prefix = loc_prefix
+            # quote/unquote to ensure consistent quoting
+            suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
         else:
-            i = loc.rfind("/")
-            if i > -1:
-                prefix = loc[:i+1]
-            else:
-                prefix = loc+"/"
+            # no '/' found
+            loc_prefix = loc+"/"
+            prefix = loc+"/"
+            suffix = ""
+
+        if prefix != loc_prefix:
+            return True
+
+        if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
+            return True
+
         if srcobj["class"] == "File" and loc not in self._pathmap:
             return True
         for s in srcobj.get("secondaryFiles", []):
             if self.needs_new_collection(s, prefix):
                 return True
         if srcobj.get("listing"):
-            prefix = "%s%s/" % (prefix, srcobj["basename"])
+            prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
             for l in srcobj["listing"]:
                 if self.needs_new_collection(l, prefix):
                     return True
@@ -194,7 +211,7 @@ class ArvPathMapper(PathMapper):
                                          packed=False)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+            self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
                                            "Directory" if os.path.isdir(ab) else "File", True)
 
         for srcobj in referenced_files:
@@ -217,19 +234,10 @@ class ArvPathMapper(PathMapper):
 
                 ab = self.collection_pattern % c.portable_data_hash()
                 self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
-            elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
-                (srcobj["location"].startswith("_:") and "contents" in srcobj)):
-
-                # If all secondary files/directories are located in
-                # the same collection as the primary file and the
-                # paths and names that are consistent with staging,
-                # don't create a new collection.
-                if not self.needs_new_collection(srcobj):
-                    continue
-
+            elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
-                                                  num_retries=self.arvrunner.num_retries                                                  )
+                                                  num_retries=self.arvrunner.num_retries)
                 self.addentry(srcobj, c, ".", remap)
 
                 container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 38e2c4d80..e5a81cdc7 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -285,10 +285,18 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     sc_result = scandeps(uri, scanobj,
                          loadref_fields,
-                         set(("$include", "$schemas", "location")),
+                         set(("$include", "location")),
                          loadref, urljoin=document_loader.fetcher.urljoin,
                          nestdirs=False)
 
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
+
     sc = []
     uuids = {}
 
@@ -345,24 +353,13 @@ def upload_dependencies(arvrunner, name, document_loader,
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
@@ -398,7 +395,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
 
     def setloc(p):
         loc = p.get("location")
diff --git a/sdk/cwl/tests/18994-basename/check.cwl b/sdk/cwl/tests/18994-basename/check.cwl
new file mode 100644
index 000000000..0046ce66c
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/check.cwl
@@ -0,0 +1,22 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: CommandLineTool
+cwlVersion: v1.2
+inputs:
+  p: File
+  checkname: string
+outputs: []
+arguments:
+  - sh
+  - "-c"
+  - |
+    name=`basename $(inputs.p.path)`
+    ls -l $(inputs.p.path)
+    if test $name = $(inputs.checkname) ; then
+      echo success
+    else
+      echo expected basename to be $(inputs.checkname) but was $name
+      exit 1
+    fi
diff --git a/sdk/cwl/tests/18994-basename/rename.cwl b/sdk/cwl/tests/18994-basename/rename.cwl
new file mode 100644
index 000000000..026555973
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/rename.cwl
@@ -0,0 +1,16 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: ExpressionTool
+cwlVersion: v1.2
+inputs:
+  f1: File
+  newname: string
+outputs:
+  out: File
+expression: |
+  ${
+  inputs.f1.basename = inputs.newname;
+  return {"out": inputs.f1};
+  }
diff --git a/sdk/cwl/tests/18994-basename/wf_ren.cwl b/sdk/cwl/tests/18994-basename/wf_ren.cwl
new file mode 100644
index 000000000..b0177494e
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/wf_ren.cwl
@@ -0,0 +1,33 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.2
+inputs:
+  f1:
+    type: File
+    default:
+      class: File
+      location: whale.txt
+  newname:
+    type: string
+    default:  "badger.txt"
+outputs: []
+requirements:
+  StepInputExpressionRequirement: {}
+  InlineJavascriptRequirement: {}
+steps:
+  rename:
+    in:
+      f1: f1
+      newname: newname
+    run: rename.cwl
+    out: [out]
+
+  echo:
+    in:
+      p: rename/out
+      checkname: newname
+    out: []
+    run: check.cwl
diff --git a/sdk/cwl/tests/18994-basename/whale.txt b/sdk/cwl/tests/18994-basename/whale.txt
new file mode 100644
index 000000000..9dfd0a6ab
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/whale.txt
@@ -0,0 +1,5 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+whale
diff --git a/sdk/cwl/tests/arvados-tests.yml b/sdk/cwl/tests/arvados-tests.yml
index 5282e9392..9e691bdba 100644
--- a/sdk/cwl/tests/arvados-tests.yml
+++ b/sdk/cwl/tests/arvados-tests.yml
@@ -444,3 +444,8 @@
   output: {}
   tool: chipseq/cwl-packed.json
   doc: "Test issue 18723 - correctly upload two directories with the same basename"
+
+- job: null
+  output: {}
+  tool: 18994-basename/wf_ren.cwl
+  doc: "Test issue 18994 - correctly stage file with modified basename"

commit 88b0b3bf854380dd349003fdcb75a52623218028
Author: Tom Clegg <tom at curii.com>
Date:   Fri Apr 15 15:28:21 2022 -0400

    Merge branch '18992-hpc-local-keepstore'
    
    closes #18992
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 651238981..e60880c21 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -969,15 +969,25 @@ Clusters:
       # A zero value disables this feature.
       #
       # In order for this feature to be activated, no volume may use
-      # AccessViaHosts, and each volume must have Replication higher
-      # than Collections.DefaultReplication. If these requirements are
-      # not satisfied, the feature is disabled automatically
-      # regardless of the value given here.
+      # AccessViaHosts, and no writable volume may have Replication
+      # lower than Collections.DefaultReplication. If these
+      # requirements are not satisfied, the feature is disabled
+      # automatically regardless of the value given here.
       #
-      # Note that when this configuration is enabled, the entire
-      # cluster configuration file, including the system root token,
-      # is copied to the worker node and held in memory for the
-      # duration of the container.
+      # When an HPC dispatcher is in use (see SLURM and LSF sections),
+      # this feature depends on the operator to ensure an up-to-date
+      # cluster configuration file (/etc/arvados/config.yml) is
+      # available on all compute nodes. If it is missing or not
+      # readable by the crunch-run user, the feature will be disabled
+      # automatically. To read it from a different location, add a
+      # "-config=/path/to/config.yml" argument to
+      # CrunchRunArgumentsList above.
+      #
+      # When the cloud dispatcher is in use (see CloudVMs section) and
+      # this configuration is enabled, the entire cluster
+      # configuration file, including the system root token, is copied
+      # to the worker node and held in memory for the duration of the
+      # container.
       LocalKeepBlobBuffersPerVCPU: 1
 
       # When running a dedicated keepstore process for a container
diff --git a/lib/crunchrun/cgroup.go b/lib/crunchrun/cgroup.go
index 0b254f5bd..48ec93b87 100644
--- a/lib/crunchrun/cgroup.go
+++ b/lib/crunchrun/cgroup.go
@@ -6,16 +6,16 @@ package crunchrun
 
 import (
 	"bytes"
+	"fmt"
 	"io/ioutil"
-	"log"
 )
 
 // Return the current process's cgroup for the given subsystem.
-func findCgroup(subsystem string) string {
+func findCgroup(subsystem string) (string, error) {
 	subsys := []byte(subsystem)
 	cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
 	if err != nil {
-		log.Fatal(err)
+		return "", err
 	}
 	for _, line := range bytes.Split(cgroups, []byte("\n")) {
 		toks := bytes.SplitN(line, []byte(":"), 4)
@@ -24,10 +24,9 @@ func findCgroup(subsystem string) string {
 		}
 		for _, s := range bytes.Split(toks[1], []byte(",")) {
 			if bytes.Compare(s, subsys) == 0 {
-				return string(toks[2])
+				return string(toks[2]), nil
 			}
 		}
 	}
-	log.Fatalf("subsystem %q not found in /proc/self/cgroup", subsystem)
-	return ""
+	return "", fmt.Errorf("subsystem %q not found in /proc/self/cgroup", subsystem)
 }
diff --git a/lib/crunchrun/cgroup_test.go b/lib/crunchrun/cgroup_test.go
index b43479a3b..eb87456d1 100644
--- a/lib/crunchrun/cgroup_test.go
+++ b/lib/crunchrun/cgroup_test.go
@@ -14,8 +14,10 @@ var _ = Suite(&CgroupSuite{})
 
 func (s *CgroupSuite) TestFindCgroup(c *C) {
 	for _, s := range []string{"devices", "cpu", "cpuset"} {
-		g := findCgroup(s)
-		c.Check(g, Not(Equals), "")
+		g, err := findCgroup(s)
+		if c.Check(err, IsNil) {
+			c.Check(g, Not(Equals), "", Commentf("subsys %q", s))
+		}
 		c.Logf("cgroup(%q) == %q", s, g)
 	}
 }
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 65f43e964..474fbf4ad 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -32,9 +32,11 @@ import (
 	"time"
 
 	"git.arvados.org/arvados.git/lib/cmd"
+	"git.arvados.org/arvados.git/lib/config"
 	"git.arvados.org/arvados.git/lib/crunchstat"
 	"git.arvados.org/arvados.git/sdk/go/arvados"
 	"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+	"git.arvados.org/arvados.git/sdk/go/ctxlog"
 	"git.arvados.org/arvados.git/sdk/go/keepclient"
 	"git.arvados.org/arvados.git/sdk/go/manifest"
 	"golang.org/x/sys/unix"
@@ -167,6 +169,7 @@ type ContainerRunner struct {
 	enableMemoryLimit bool
 	enableNetwork     string // one of "default" or "always"
 	networkMode       string // "none", "host", or "" -- passed through to executor
+	brokenNodeHook    string // script to run if node appears to be broken
 	arvMountLog       *ThrottledLogger
 
 	containerWatchdogInterval time.Duration
@@ -210,10 +213,9 @@ var errorBlacklist = []string{
 	"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
 	"(?ms).*grpc: the connection is unavailable.*",
 }
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
-	if *brokenNodeHook == "" {
+	if runner.brokenNodeHook == "" {
 		path := filepath.Join(lockdir, brokenfile)
 		runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
 		f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -223,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
 		}
 		f.Close()
 	} else {
-		runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+		runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
 		// run killme script
-		c := exec.Command(*brokenNodeHook)
+		c := exec.Command(runner.brokenNodeHook)
 		c.Stdout = runner.CrunchLog
 		c.Stderr = runner.CrunchLog
 		err := c.Run()
@@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
 	detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
 	stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+	configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
 	sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
 	kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
 	list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1730,6 +1733,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
 	memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
 	runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+	brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
 	flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
 	ignoreDetachFlag := false
@@ -1767,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		return 1
 	}
 
+	var keepstoreLogbuf bufThenWrite
 	var conf ConfigData
 	if *stdinConfig {
 		err := json.NewDecoder(stdin).Decode(&conf)
@@ -1788,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 			// fill it using the container UUID prefix.
 			conf.Cluster.ClusterID = containerUUID[:5]
 		}
+	} else {
+		conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
 	}
 
 	log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1797,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 		arvadosclient.CertFiles = []string{*caCertsPath}
 	}
 
-	var keepstoreLogbuf bufThenWrite
 	keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
 	if err != nil {
 		log.Print(err)
@@ -1883,6 +1889,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	}
 	defer cr.executor.Close()
 
+	cr.brokenNodeHook = *brokenNodeHook
+
 	gwAuthSecret := os.Getenv("GatewayAuthSecret")
 	os.Unsetenv("GatewayAuthSecret")
 	if gwAuthSecret == "" {
@@ -1923,7 +1931,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	cr.enableNetwork = *enableNetwork
 	cr.networkMode = *networkMode
 	if *cgroupParentSubsystem != "" {
-		p := findCgroup(*cgroupParentSubsystem)
+		p, err := findCgroup(*cgroupParentSubsystem)
+		if err != nil {
+			log.Printf("fatal: cgroup parent subsystem: %s", err)
+			return 1
+		}
 		cr.setCgroupParent = p
 		cr.expectCgroupParent = p
 	}
@@ -1952,8 +1964,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 	return 0
 }
 
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+	var conf ConfigData
+	conf.Cluster = loadClusterConfigFile(configFile, stderr)
+	if conf.Cluster == nil {
+		// skip loading the container record -- we won't be
+		// able to start local keepstore anyway.
+		return conf
+	}
+	arv, err := arvadosclient.MakeArvadosClient()
+	if err != nil {
+		fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+		return conf
+	}
+	arv.Retries = 8
+	var ctr arvados.Container
+	err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+	if err != nil {
+		fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+		return conf
+	}
+	if ctr.RuntimeConstraints.VCPUs > 0 {
+		conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+	}
+	return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+	ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+	ldr.Path = path
+	cfg, err := ldr.Load()
+	if err != nil {
+		fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+		return nil
+	}
+	cluster, err := cfg.GetCluster("")
+	if err != nil {
+		fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+		return nil
+	}
+	fmt.Fprintf(stderr, "loaded config file %s\n", path)
+	return cluster
+}
+
 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
-	if configData.Cluster == nil || configData.KeepBuffers < 1 {
+	if configData.KeepBuffers < 1 {
+		fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+		return nil, nil
+	}
+	if configData.Cluster == nil {
+		fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
 		return nil, nil
 	}
 	for uuid, vol := range configData.Cluster.Volumes {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 62df0032b..1d2c7b09f 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -50,7 +50,6 @@ type TestSuite struct {
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
-	*brokenNodeHook = ""
 	s.client = arvados.NewClientFromEnv()
 	s.executor = &stubExecutor{}
 	var err error
@@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 		func() {
 			c.Log("// loadErr = cannot connect")
 			s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-			*brokenNodeHook = c.MkDir() + "/broken-node-hook"
-			err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+			s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+			err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
 			c.Assert(err, IsNil)
 			nextState = "Queued"
 		},
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 }`, nil, 0, func() {})
 		c.Check(s.api.CalledWith("container.state", nextState), NotNil)
 		c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-		if *brokenNodeHook != "" {
+		if s.runner.brokenNodeHook != "" {
 			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
 			c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
 			c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index 9b797fd86..0b139dd97 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -32,6 +32,7 @@ type integrationSuite struct {
 	stdin  bytes.Buffer
 	stdout bytes.Buffer
 	stderr bytes.Buffer
+	args   []string
 	cr     arvados.ContainerRequest
 	client *arvados.Client
 	ac     *arvadosclient.ArvadosClient
@@ -39,6 +40,7 @@ type integrationSuite struct {
 
 	logCollection    arvados.Collection
 	outputCollection arvados.Collection
+	logFiles         map[string]string // filename => contents
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -102,11 +104,13 @@ func (s *integrationSuite) TearDownSuite(c *C) {
 func (s *integrationSuite) SetUpTest(c *C) {
 	os.Unsetenv("ARVADOS_KEEP_SERVICES")
 	s.engine = "docker"
+	s.args = nil
 	s.stdin = bytes.Buffer{}
 	s.stdout = bytes.Buffer{}
 	s.stderr = bytes.Buffer{}
 	s.logCollection = arvados.Collection{}
 	s.outputCollection = arvados.Collection{}
+	s.logFiles = map[string]string{}
 	s.cr = arvados.ContainerRequest{
 		Priority:       1,
 		State:          "Committed",
@@ -201,20 +205,42 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
 		s.engine = "docker"
 		s.testRunTrivialContainer(c)
 
-		fs, err := s.logCollection.FileSystem(s.client, s.kc)
-		c.Assert(err, IsNil)
-		f, err := fs.Open("keepstore.txt")
+		log, logExists := s.logFiles["keepstore.txt"]
 		if trial.logConfig == "none" {
-			c.Check(err, NotNil)
-			c.Check(os.IsNotExist(err), Equals, true)
+			c.Check(logExists, Equals, false)
 		} else {
-			c.Assert(err, IsNil)
-			buf, err := ioutil.ReadAll(f)
-			c.Assert(err, IsNil)
-			c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
-			c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+			c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+			c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
 		}
 	}
+
+	// Check that (1) config is loaded from $ARVADOS_CONFIG when
+	// not provided on stdin and (2) if a local keepstore is not
+	// started, crunch-run.txt explains why not.
+	s.SetUpTest(c)
+	s.stdin.Reset()
+	s.testRunTrivialContainer(c)
+	c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
+
+	// Check that config read errors are logged
+	s.SetUpTest(c)
+	s.args = []string{"-config", c.MkDir() + "/config-error.yaml"}
+	s.stdin.Reset()
+	s.testRunTrivialContainer(c)
+	c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`)
+
+	s.SetUpTest(c)
+	s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
+	s.stdin.Reset()
+	err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+	c.Check(err, IsNil)
+	s.testRunTrivialContainer(c)
+	c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
+
+	s.SetUpTest(c)
+	s.stdin.Reset()
+	s.testRunTrivialContainer(c)
+	c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`)
 }
 
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
@@ -227,11 +253,12 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
 	args := []string{
 		"-runtime-engine=" + s.engine,
 		"-enable-memory-limit=false",
-		s.cr.ContainerUUID,
 	}
 	if s.stdin.Len() > 0 {
-		args = append([]string{"-stdin-config=true"}, args...)
+		args = append(args, "-stdin-config=true")
 	}
+	args = append(args, s.args...)
+	args = append(args, s.cr.ContainerUUID)
 	code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
 	c.Logf("\n===== stdout =====\n%s", s.stdout.String())
 	c.Logf("\n===== stderr =====\n%s", s.stderr.String())
@@ -257,6 +284,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
 			buf, err := ioutil.ReadAll(f)
 			c.Assert(err, IsNil)
 			c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
+			s.logFiles[fi.Name()] = string(buf)
 		}
 	}
 	s.logCollection = log
diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go
index acbb11a36..e888f3151 100644
--- a/sdk/go/ctxlog/log.go
+++ b/sdk/go/ctxlog/log.go
@@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) {
 			FullTimestamp:   true,
 			TimestampFormat: rfc3339NanoFixed,
 		}
+	case "plain":
+		logger.Formatter = &logrus.TextFormatter{
+			DisableColors:    true,
+			DisableTimestamp: true,
+		}
 	case "json", "":
 		logger.Formatter = &logrus.JSONFormatter{
 			TimestampFormat: rfc3339NanoFixed,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list