[ARVADOS] updated: 2.4.0-10-gde3df01f3
Git user
git at public.arvados.org
Tue Apr 19 15:23:47 UTC 2022
Summary of changes:
lib/config/config.default.yml | 26 ++++---
lib/crunchrun/cgroup.go | 11 ++-
lib/crunchrun/cgroup_test.go | 6 +-
lib/crunchrun/crunchrun.go | 80 ++++++++++++++++++++--
lib/crunchrun/crunchrun_test.go | 7 +-
lib/crunchrun/integration_test.go | 52 ++++++++++----
sdk/cwl/arvados_cwl/executor.py | 3 +-
sdk/cwl/arvados_cwl/pathmapper.py | 52 ++++++++------
sdk/cwl/arvados_cwl/runner.py | 30 ++++----
sdk/cwl/tests/18994-basename/check.cwl | 22 ++++++
sdk/cwl/tests/18994-basename/rename.cwl | 16 +++++
sdk/cwl/tests/18994-basename/wf_ren.cwl | 33 +++++++++
.../cwl/tests/18994-basename/whale.txt | 2 +-
sdk/cwl/tests/arvados-tests.yml | 5 ++
sdk/go/ctxlog/log.go | 5 ++
15 files changed, 271 insertions(+), 79 deletions(-)
create mode 100644 sdk/cwl/tests/18994-basename/check.cwl
create mode 100644 sdk/cwl/tests/18994-basename/rename.cwl
create mode 100644 sdk/cwl/tests/18994-basename/wf_ren.cwl
copy tools/salt-install/tests/test.txt => sdk/cwl/tests/18994-basename/whale.txt (94%)
via de3df01f392c72f589b3e29f0080da437d19a011 (commit)
via 88b0b3bf854380dd349003fdcb75a52623218028 (commit)
from 865c356c6ab258fc5ff6a20bf976e548811638d9 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit de3df01f392c72f589b3e29f0080da437d19a011
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Tue Apr 19 10:13:32 2022 -0400
Merge branch '18994-cwl-basename' refs #18994
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 6e23d80a8..5f24d2407 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -18,6 +18,7 @@ import json
import re
from functools import partial
import time
+import urllib
from cwltool.errors import WorkflowException
import cwltool.workflow
@@ -450,7 +451,7 @@ The 'jobs' API is no longer supported.
srccollection = sp[0][5:]
try:
reader = self.collection_cache.get(srccollection)
- srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+ srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
except arvados.errors.ArgumentError as e:
logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 4a91a7a83..64fdfa0d0 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -52,7 +52,8 @@ class ArvPathMapper(PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, single_collection=False):
+ collection_pattern, file_pattern, name=None, single_collection=False,
+ optional_deps=None):
self.arvrunner = arvrunner
self.input_basedir = input_basedir
self.collection_pattern = collection_pattern
@@ -61,6 +62,7 @@ class ArvPathMapper(PathMapper):
self.referenced_files = [r["location"] for r in referenced_files]
self.single_collection = single_collection
self.pdh_to_uuid = {}
+ self.optional_deps = optional_deps or []
super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
def visit(self, srcobj, uploadfiles):
@@ -73,6 +75,7 @@ class ArvPathMapper(PathMapper):
if isinstance(src, basestring) and src.startswith("keep:"):
if collection_pdh_pattern.match(src):
self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.parse.unquote(src[5:]), srcobj["class"], True)
+
if arvados_cwl.util.collectionUUID in srcobj:
self.pdh_to_uuid[src.split("/", 1)[0][5:]] = srcobj[arvados_cwl.util.collectionUUID]
elif not collection_uuid_pattern.match(src):
@@ -135,6 +138,9 @@ class ArvPathMapper(PathMapper):
f.write(obj["contents"])
remap.append((obj["location"], path + "/" + obj["basename"]))
else:
+ for opt in self.optional_deps:
+ if obj["location"] == opt["location"]:
+ return
raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
@@ -149,22 +155,33 @@ class ArvPathMapper(PathMapper):
loc = srcobj["location"]
if loc.startswith("_:"):
return True
- if prefix:
- if loc != prefix+srcobj["basename"]:
- return True
+
+ i = loc.rfind("/")
+ if i > -1:
+ loc_prefix = loc[:i+1]
+ if not prefix:
+ prefix = loc_prefix
+ # quote/unquote to ensure consistent quoting
+ suffix = urllib.parse.quote(urllib.parse.unquote(loc[i+1:]), "/+@")
else:
- i = loc.rfind("/")
- if i > -1:
- prefix = loc[:i+1]
- else:
- prefix = loc+"/"
+ # no '/' found
+ loc_prefix = loc+"/"
+ prefix = loc+"/"
+ suffix = ""
+
+ if prefix != loc_prefix:
+ return True
+
+ if "basename" in srcobj and suffix != urllib.parse.quote(srcobj["basename"], "/+@"):
+ return True
+
if srcobj["class"] == "File" and loc not in self._pathmap:
return True
for s in srcobj.get("secondaryFiles", []):
if self.needs_new_collection(s, prefix):
return True
if srcobj.get("listing"):
- prefix = "%s%s/" % (prefix, srcobj["basename"])
+ prefix = "%s%s/" % (prefix, urllib.parse.quote(srcobj.get("basename", suffix), "/+@"))
for l in srcobj["listing"]:
if self.needs_new_collection(l, prefix):
return True
@@ -194,7 +211,7 @@ class ArvPathMapper(PathMapper):
packed=False)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ self._pathmap[src] = MapperEnt(urllib.parse.quote(st.fn, "/:+@"), urllib.parse.quote(self.collection_pattern % st.fn[5:], "/:+@"),
"Directory" if os.path.isdir(ab) else "File", True)
for srcobj in referenced_files:
@@ -217,19 +234,10 @@ class ArvPathMapper(PathMapper):
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
- elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
- (srcobj["location"].startswith("_:") and "contents" in srcobj)):
-
- # If all secondary files/directories are located in
- # the same collection as the primary file and the
- # paths and names that are consistent with staging,
- # don't create a new collection.
- if not self.needs_new_collection(srcobj):
- continue
-
+ elif srcobj["class"] == "File" and self.needs_new_collection(srcobj):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries )
+ num_retries=self.arvrunner.num_retries)
self.addentry(srcobj, c, ".", remap)
container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 38e2c4d80..e5a81cdc7 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -285,10 +285,18 @@ def upload_dependencies(arvrunner, name, document_loader,
sc_result = scandeps(uri, scanobj,
loadref_fields,
- set(("$include", "$schemas", "location")),
+ set(("$include", "location")),
loadref, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ sc_result.extend(optional_deps)
+
sc = []
uuids = {}
@@ -345,24 +353,13 @@ def upload_dependencies(arvrunner, name, document_loader,
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
- if "$schemas" in workflowobj:
- for s in workflowobj["$schemas"]:
- sc.append({"class": "File", "location": s})
-
def visit_default(obj):
- remove = [False]
- def ensure_default_location(f):
+ def defaults_are_optional(f):
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
- if "location" in f and not arvrunner.fs_access.exists(f["location"]):
- # Doesn't exist, remove from list of dependencies to upload
- sc[:] = [x for x in sc if x["location"] != f["location"]]
- # Delete "default" from workflowobj
- remove[0] = True
- visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
- if remove[0]:
- del obj["default"]
+ optional_deps.append(f)
+ visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
find_defaults(workflowobj, visit_default)
@@ -398,7 +395,8 @@ def upload_dependencies(arvrunner, name, document_loader,
"keep:%s",
"keep:%s/%s",
name=name,
- single_collection=True)
+ single_collection=True,
+ optional_deps=optional_deps)
def setloc(p):
loc = p.get("location")
diff --git a/sdk/cwl/tests/18994-basename/check.cwl b/sdk/cwl/tests/18994-basename/check.cwl
new file mode 100644
index 000000000..0046ce66c
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/check.cwl
@@ -0,0 +1,22 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: CommandLineTool
+cwlVersion: v1.2
+inputs:
+ p: File
+ checkname: string
+outputs: []
+arguments:
+ - sh
+ - "-c"
+ - |
+ name=`basename $(inputs.p.path)`
+ ls -l $(inputs.p.path)
+ if test $name = $(inputs.checkname) ; then
+ echo success
+ else
+ echo expected basename to be $(inputs.checkname) but was $name
+ exit 1
+ fi
diff --git a/sdk/cwl/tests/18994-basename/rename.cwl b/sdk/cwl/tests/18994-basename/rename.cwl
new file mode 100644
index 000000000..026555973
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/rename.cwl
@@ -0,0 +1,16 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: ExpressionTool
+cwlVersion: v1.2
+inputs:
+ f1: File
+ newname: string
+outputs:
+ out: File
+expression: |
+ ${
+ inputs.f1.basename = inputs.newname;
+ return {"out": inputs.f1};
+ }
diff --git a/sdk/cwl/tests/18994-basename/wf_ren.cwl b/sdk/cwl/tests/18994-basename/wf_ren.cwl
new file mode 100644
index 000000000..b0177494e
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/wf_ren.cwl
@@ -0,0 +1,33 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.2
+inputs:
+ f1:
+ type: File
+ default:
+ class: File
+ location: whale.txt
+ newname:
+ type: string
+ default: "badger.txt"
+outputs: []
+requirements:
+ StepInputExpressionRequirement: {}
+ InlineJavascriptRequirement: {}
+steps:
+ rename:
+ in:
+ f1: f1
+ newname: newname
+ run: rename.cwl
+ out: [out]
+
+ echo:
+ in:
+ p: rename/out
+ checkname: newname
+ out: []
+ run: check.cwl
diff --git a/sdk/cwl/tests/18994-basename/whale.txt b/sdk/cwl/tests/18994-basename/whale.txt
new file mode 100644
index 000000000..9dfd0a6ab
--- /dev/null
+++ b/sdk/cwl/tests/18994-basename/whale.txt
@@ -0,0 +1,5 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+whale
diff --git a/sdk/cwl/tests/arvados-tests.yml b/sdk/cwl/tests/arvados-tests.yml
index 5282e9392..9e691bdba 100644
--- a/sdk/cwl/tests/arvados-tests.yml
+++ b/sdk/cwl/tests/arvados-tests.yml
@@ -444,3 +444,8 @@
output: {}
tool: chipseq/cwl-packed.json
doc: "Test issue 18723 - correctly upload two directories with the same basename"
+
+- job: null
+ output: {}
+ tool: 18994-basename/wf_ren.cwl
+ doc: "Test issue 18994 - correctly stage file with modified basename"
commit 88b0b3bf854380dd349003fdcb75a52623218028
Author: Tom Clegg <tom at curii.com>
Date: Fri Apr 15 15:28:21 2022 -0400
Merge branch '18992-hpc-local-keepstore'
closes #18992
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 651238981..e60880c21 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -969,15 +969,25 @@ Clusters:
# A zero value disables this feature.
#
# In order for this feature to be activated, no volume may use
- # AccessViaHosts, and each volume must have Replication higher
- # than Collections.DefaultReplication. If these requirements are
- # not satisfied, the feature is disabled automatically
- # regardless of the value given here.
+ # AccessViaHosts, and no writable volume may have Replication
+ # lower than Collections.DefaultReplication. If these
+ # requirements are not satisfied, the feature is disabled
+ # automatically regardless of the value given here.
#
- # Note that when this configuration is enabled, the entire
- # cluster configuration file, including the system root token,
- # is copied to the worker node and held in memory for the
- # duration of the container.
+ # When an HPC dispatcher is in use (see SLURM and LSF sections),
+ # this feature depends on the operator to ensure an up-to-date
+ # cluster configuration file (/etc/arvados/config.yml) is
+ # available on all compute nodes. If it is missing or not
+ # readable by the crunch-run user, the feature will be disabled
+ # automatically. To read it from a different location, add a
+ # "-config=/path/to/config.yml" argument to
+ # CrunchRunArgumentsList above.
+ #
+ # When the cloud dispatcher is in use (see CloudVMs section) and
+ # this configuration is enabled, the entire cluster
+ # configuration file, including the system root token, is copied
+ # to the worker node and held in memory for the duration of the
+ # container.
LocalKeepBlobBuffersPerVCPU: 1
# When running a dedicated keepstore process for a container
diff --git a/lib/crunchrun/cgroup.go b/lib/crunchrun/cgroup.go
index 0b254f5bd..48ec93b87 100644
--- a/lib/crunchrun/cgroup.go
+++ b/lib/crunchrun/cgroup.go
@@ -6,16 +6,16 @@ package crunchrun
import (
"bytes"
+ "fmt"
"io/ioutil"
- "log"
)
// Return the current process's cgroup for the given subsystem.
-func findCgroup(subsystem string) string {
+func findCgroup(subsystem string) (string, error) {
subsys := []byte(subsystem)
cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
- log.Fatal(err)
+ return "", err
}
for _, line := range bytes.Split(cgroups, []byte("\n")) {
toks := bytes.SplitN(line, []byte(":"), 4)
@@ -24,10 +24,9 @@ func findCgroup(subsystem string) string {
}
for _, s := range bytes.Split(toks[1], []byte(",")) {
if bytes.Compare(s, subsys) == 0 {
- return string(toks[2])
+ return string(toks[2]), nil
}
}
}
- log.Fatalf("subsystem %q not found in /proc/self/cgroup", subsystem)
- return ""
+ return "", fmt.Errorf("subsystem %q not found in /proc/self/cgroup", subsystem)
}
diff --git a/lib/crunchrun/cgroup_test.go b/lib/crunchrun/cgroup_test.go
index b43479a3b..eb87456d1 100644
--- a/lib/crunchrun/cgroup_test.go
+++ b/lib/crunchrun/cgroup_test.go
@@ -14,8 +14,10 @@ var _ = Suite(&CgroupSuite{})
func (s *CgroupSuite) TestFindCgroup(c *C) {
for _, s := range []string{"devices", "cpu", "cpuset"} {
- g := findCgroup(s)
- c.Check(g, Not(Equals), "")
+ g, err := findCgroup(s)
+ if c.Check(err, IsNil) {
+ c.Check(g, Not(Equals), "", Commentf("subsys %q", s))
+ }
c.Logf("cgroup(%q) == %q", s, g)
}
}
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 65f43e964..474fbf4ad 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -32,9 +32,11 @@ import (
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
"golang.org/x/sys/unix"
@@ -167,6 +169,7 @@ type ContainerRunner struct {
enableMemoryLimit bool
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
+ brokenNodeHook string // script to run if node appears to be broken
arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
@@ -210,10 +213,9 @@ var errorBlacklist = []string{
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
"(?ms).*grpc: the connection is unavailable.*",
}
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
func (runner *ContainerRunner) runBrokenNodeHook() {
- if *brokenNodeHook == "" {
+ if runner.brokenNodeHook == "" {
path := filepath.Join(lockdir, brokenfile)
runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
@@ -223,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() {
}
f.Close()
} else {
- runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
// run killme script
- c := exec.Command(*brokenNodeHook)
+ c := exec.Command(runner.brokenNodeHook)
c.Stdout = runner.CrunchLog
c.Stderr = runner.CrunchLog
err := c.Run()
@@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+ configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1730,6 +1733,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+ brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
ignoreDetachFlag := false
@@ -1767,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 1
}
+ var keepstoreLogbuf bufThenWrite
var conf ConfigData
if *stdinConfig {
err := json.NewDecoder(stdin).Decode(&conf)
@@ -1788,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
// fill it using the container UUID prefix.
conf.Cluster.ClusterID = containerUUID[:5]
}
+ } else {
+ conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
}
log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1797,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
arvadosclient.CertFiles = []string{*caCertsPath}
}
- var keepstoreLogbuf bufThenWrite
keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
if err != nil {
log.Print(err)
@@ -1883,6 +1889,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
}
defer cr.executor.Close()
+ cr.brokenNodeHook = *brokenNodeHook
+
gwAuthSecret := os.Getenv("GatewayAuthSecret")
os.Unsetenv("GatewayAuthSecret")
if gwAuthSecret == "" {
@@ -1923,7 +1931,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
- p := findCgroup(*cgroupParentSubsystem)
+ p, err := findCgroup(*cgroupParentSubsystem)
+ if err != nil {
+ log.Printf("fatal: cgroup parent subsystem: %s", err)
+ return 1
+ }
cr.setCgroupParent = p
cr.expectCgroupParent = p
}
@@ -1952,8 +1964,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
return 0
}
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+ var conf ConfigData
+ conf.Cluster = loadClusterConfigFile(configFile, stderr)
+ if conf.Cluster == nil {
+ // skip loading the container record -- we won't be
+ // able to start local keepstore anyway.
+ return conf
+ }
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+ return conf
+ }
+ arv.Retries = 8
+ var ctr arvados.Container
+ err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+ if err != nil {
+ fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+ return conf
+ }
+ if ctr.RuntimeConstraints.VCPUs > 0 {
+ conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+ }
+ return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+ ldr.Path = path
+ cfg, err := ldr.Load()
+ if err != nil {
+ fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+ return nil
+ }
+ fmt.Fprintf(stderr, "loaded config file %s\n", path)
+ return cluster
+}
+
func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
- if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ if configData.KeepBuffers < 1 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+ return nil, nil
+ }
+ if configData.Cluster == nil {
+ fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
return nil, nil
}
for uuid, vol := range configData.Cluster.Volumes {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 62df0032b..1d2c7b09f 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -50,7 +50,6 @@ type TestSuite struct {
}
func (s *TestSuite) SetUpTest(c *C) {
- *brokenNodeHook = ""
s.client = arvados.NewClientFromEnv()
s.executor = &stubExecutor{}
var err error
@@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
func() {
c.Log("// loadErr = cannot connect")
s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
- *brokenNodeHook = c.MkDir() + "/broken-node-hook"
- err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+ s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+ err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
c.Assert(err, IsNil)
nextState = "Queued"
},
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
}`, nil, 0, func() {})
c.Check(s.api.CalledWith("container.state", nextState), NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- if *brokenNodeHook != "" {
+ if s.runner.brokenNodeHook != "" {
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go
index 9b797fd86..0b139dd97 100644
--- a/lib/crunchrun/integration_test.go
+++ b/lib/crunchrun/integration_test.go
@@ -32,6 +32,7 @@ type integrationSuite struct {
stdin bytes.Buffer
stdout bytes.Buffer
stderr bytes.Buffer
+ args []string
cr arvados.ContainerRequest
client *arvados.Client
ac *arvadosclient.ArvadosClient
@@ -39,6 +40,7 @@ type integrationSuite struct {
logCollection arvados.Collection
outputCollection arvados.Collection
+ logFiles map[string]string // filename => contents
}
func (s *integrationSuite) SetUpSuite(c *C) {
@@ -102,11 +104,13 @@ func (s *integrationSuite) TearDownSuite(c *C) {
func (s *integrationSuite) SetUpTest(c *C) {
os.Unsetenv("ARVADOS_KEEP_SERVICES")
s.engine = "docker"
+ s.args = nil
s.stdin = bytes.Buffer{}
s.stdout = bytes.Buffer{}
s.stderr = bytes.Buffer{}
s.logCollection = arvados.Collection{}
s.outputCollection = arvados.Collection{}
+ s.logFiles = map[string]string{}
s.cr = arvados.ContainerRequest{
Priority: 1,
State: "Committed",
@@ -201,20 +205,42 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
s.engine = "docker"
s.testRunTrivialContainer(c)
- fs, err := s.logCollection.FileSystem(s.client, s.kc)
- c.Assert(err, IsNil)
- f, err := fs.Open("keepstore.txt")
+ log, logExists := s.logFiles["keepstore.txt"]
if trial.logConfig == "none" {
- c.Check(err, NotNil)
- c.Check(os.IsNotExist(err), Equals, true)
+ c.Check(logExists, Equals, false)
} else {
- c.Assert(err, IsNil)
- buf, err := ioutil.ReadAll(f)
- c.Assert(err, IsNil)
- c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
- c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+ c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
}
}
+
+ // Check that (1) config is loaded from $ARVADOS_CONFIG when
+ // not provided on stdin and (2) if a local keepstore is not
+ // started, crunch-run.txt explains why not.
+ s.SetUpTest(c)
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
+
+ // Check that config read errors are logged
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config-error.yaml"}
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`)
+
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
+ s.stdin.Reset()
+ err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+ c.Check(err, IsNil)
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
+
+ s.SetUpTest(c)
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`)
}
func (s *integrationSuite) testRunTrivialContainer(c *C) {
@@ -227,11 +253,12 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
args := []string{
"-runtime-engine=" + s.engine,
"-enable-memory-limit=false",
- s.cr.ContainerUUID,
}
if s.stdin.Len() > 0 {
- args = append([]string{"-stdin-config=true"}, args...)
+ args = append(args, "-stdin-config=true")
}
+ args = append(args, s.args...)
+ args = append(args, s.cr.ContainerUUID)
code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
c.Logf("\n===== stdout =====\n%s", s.stdout.String())
c.Logf("\n===== stderr =====\n%s", s.stderr.String())
@@ -257,6 +284,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
buf, err := ioutil.ReadAll(f)
c.Assert(err, IsNil)
c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
+ s.logFiles[fi.Name()] = string(buf)
}
}
s.logCollection = log
diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go
index acbb11a36..e888f3151 100644
--- a/sdk/go/ctxlog/log.go
+++ b/sdk/go/ctxlog/log.go
@@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) {
FullTimestamp: true,
TimestampFormat: rfc3339NanoFixed,
}
+ case "plain":
+ logger.Formatter = &logrus.TextFormatter{
+ DisableColors: true,
+ DisableTimestamp: true,
+ }
case "json", "":
logger.Formatter = &logrus.JSONFormatter{
TimestampFormat: rfc3339NanoFixed,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list