[arvados] updated: 2.6.2-14-gbe529d229
git repository hosting
git at public.arvados.org
Mon Jun 5 16:44:31 UTC 2023
Summary of changes:
doc/_config.yml | 1 +
doc/admin/inspect.html.textile.liquid | 69 ++++++++++++++
doc/admin/metrics.html.textile.liquid | 2 +-
.../install-compute-node.html.textile.liquid | 4 +-
lib/cloud/azure/azure.go | 19 ++--
lib/cloud/azure/azure_test.go | 28 ++++--
lib/cloud/cloudtest/cmd.go | 33 +++----
lib/cloud/cloudtest/tester.go | 37 ++++----
lib/cloud/ec2/ec2.go | 77 +++++++++-------
lib/cloud/ec2/ec2_test.go | 22 ++++-
lib/config/config.default.yml | 14 +++
lib/controller/proxy.go | 5 +
lib/crunchrun/copier.go | 21 ++++-
lib/crunchrun/copier_test.go | 4 -
lib/crunchrun/crunchrun.go | 46 ++++++++--
lib/crunchrun/crunchrun_test.go | 71 +++++++++++++-
lib/crunchrun/git_mount.go | 15 ++-
lib/crunchrun/git_mount_test.go | 35 ++-----
lib/crunchrun/logging_test.go | 4 +
lib/dispatchcloud/dispatcher.go | 6 +-
lib/dispatchcloud/test/stub_driver.go | 17 +++-
lib/dispatchcloud/worker/pool.go | 4 +-
lib/dispatchcloud/worker/pool_test.go | 4 +
sdk/cwl/arvados_cwl/__init__.py | 7 ++
sdk/cwl/arvados_cwl/arvcontainer.py | 2 +-
sdk/cwl/arvados_cwl/done.py | 66 ++++++-------
sdk/cwl/arvados_cwl/pathmapper.py | 4 +-
sdk/cwl/arvados_cwl/runner.py | 3 +-
sdk/cwl/setup.py | 6 +-
sdk/cwl/test_with_arvbox.sh | 19 +++-
sdk/go/arvados/client.go | 44 +++++++++
sdk/go/arvados/client_test.go | 102 +++++++++++++++++++++
sdk/go/arvados/config.go | 2 +
sdk/go/arvadosclient/arvadosclient.go | 99 +++++++-------------
sdk/go/arvadosclient/arvadosclient_test.go | 50 ++++++----
35 files changed, 661 insertions(+), 281 deletions(-)
create mode 100644 doc/admin/inspect.html.textile.liquid
via be529d229fbc486e56a71628dd514e5e74455120 (commit)
via 6114eb9ddbfa218bab9a6cf937097bc77f946db1 (commit)
via b0671eae0f78e5cb7d87c3f5ebc3b951563813e5 (commit)
via 4a1942d74e0cd720c1a160dd4c6daa90afd2a6d5 (commit)
via be1e2a5b0ec1ccb72053e43dbeb8a1f0cad86fa5 (commit)
via 4a5db0148d7f02775ef3e867b3fe50450a7ec24f (commit)
via a3ce4d1f8cc231404e13b96894cd2afecb95192d (commit)
via df650530536403974839b8c706295abe80c11955 (commit)
via 41968dfc81774a07342fb1b777e885cbd82a03bc (commit)
from 97aca635adc2fc448e47984fe1c8974b44f7a656 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit be529d229fbc486e56a71628dd514e5e74455120
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Jun 2 13:59:52 2023 -0400
Merge branch '20531-cwl-log-tail' refs #20531
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 472243397..6a8e320c6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -377,10 +377,17 @@ def main(args=sys.argv[1:],
logger.setLevel(logging.INFO)
logging.getLogger('arvados').setLevel(logging.INFO)
logging.getLogger('arvados.keep').setLevel(logging.WARNING)
+ # API retries are at WARNING level and can be noisy, but as long as
+ # they succeed we don't need to see warnings about it.
+ logging.getLogger('googleapiclient').setLevel(logging.ERROR)
if arvargs.debug:
logger.setLevel(logging.DEBUG)
logging.getLogger('arvados').setLevel(logging.DEBUG)
+ # In debug mode show logs about retries, but we arn't
+ # debugging the google client so we don't need to see
+ # everything.
+ logging.getLogger('googleapiclient').setLevel(logging.WARNING)
if arvargs.quiet:
logger.setLevel(logging.WARN)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index be8e557bd..6180a4c62 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -499,7 +499,7 @@ class ArvadosContainer(JobBase):
label = self.arvrunner.label(self)
done.logtail(
logc, logger.error,
- "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+ "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
index e12fe185a..5c1241976 100644
--- a/sdk/cwl/arvados_cwl/done.py
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -57,43 +57,45 @@ def done_outputs(self, record, tmpdir, outdir, keepdir):
crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
-def logtail(logcollection, logfunc, header, maxlen=25):
+def logtail(logcollection, logfunc, header, maxlen=25, include_crunchrun=True):
if len(logcollection) == 0:
logfunc("%s\n%s", header, " ** log is empty **")
return
- containersapi = ("crunch-run.txt" in logcollection)
mergelogs = {}
+ logfiles = ["stdout.txt", "stderr.txt"]
- for log in list(logcollection):
- if not containersapi or log in ("crunch-run.txt", "stdout.txt", "stderr.txt"):
- logname = log[:-4]
- logt = deque([], maxlen)
- mergelogs[logname] = logt
- with logcollection.open(log, encoding="utf-8") as f:
- for l in f:
- if containersapi:
- g = timestamp_re.match(l)
- logt.append((g.group(1), g.group(2)))
- elif not crunchstat_re.match(l):
- logt.append(l)
-
- if containersapi:
- keys = list(mergelogs)
- loglines = []
- while True:
- earliest = None
- for k in keys:
- if mergelogs[k]:
- if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
- earliest = k
- if earliest is None:
- break
- ts, msg = mergelogs[earliest].popleft()
- loglines.append("%s %s %s" % (ts, earliest, msg))
- loglines = loglines[-maxlen:]
- else:
- loglines = mergelogs[list(mergelogs)[0]]
+ if include_crunchrun:
+ logfiles.append("crunch-run.txt")
+
+ for log in logfiles:
+ if log not in logcollection:
+ continue
+ logname = log[:-4] # trim off the .txt
+ logt = deque([], maxlen)
+ mergelogs[logname] = logt
+ with logcollection.open(log, encoding="utf-8") as f:
+ for l in f:
+ g = timestamp_re.match(l)
+ logt.append((g.group(1), g.group(2)))
+
+ keys = list(mergelogs)
+ loglines = []
+
+ # we assume the log lines are all in order so this this is a
+ # straight linear merge where we look at the next timestamp of
+ # each log and take whichever one is earliest.
+ while True:
+ earliest = None
+ for k in keys:
+ if mergelogs[k]:
+ if earliest is None or mergelogs[k][0][0] < mergelogs[earliest][0][0]:
+ earliest = k
+ if earliest is None:
+ break
+ ts, msg = mergelogs[earliest].popleft()
+ loglines.append("%s %s %s" % (ts, earliest, msg))
+ loglines = loglines[-maxlen:]
logtxt = "\n ".join(l.strip() for l in loglines)
- logfunc("%s\n\n %s", header, logtxt)
+ logfunc("%s\n\n %s\n", header, logtxt)
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 3f54a396b..539188fdd 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -115,7 +115,7 @@ class ArvPathMapper(PathMapper):
logger.info("%s is %s", src, keepref)
self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
- logger.warning(str(e))
+ logger.warning("Download error: %s", e)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
@@ -147,7 +147,7 @@ class ArvPathMapper(PathMapper):
for opt in self.optional_deps:
if obj["location"] == opt["location"]:
return
- raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
+ raise SourceLine(obj, "location", WorkflowException).makeError("Can't handle '%s'" % obj["location"])
def needs_new_collection(self, srcobj, prefix=""):
"""Check if files need to be staged into a new collection.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 54af2be51..4432813f6 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -923,7 +923,8 @@ class Runner(Process):
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40,
+ include_crunchrun=(record.get("exit_code") is None or record.get("exit_code") > 127))
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index f124651c5..92f0952af 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,14 +36,14 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20230127121939',
- 'schema-salad==8.4.20230127112827',
+ 'cwltool==3.1.20230601100705',
+ 'schema-salad==8.4.20230601112322',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
'msgpack==1.0.3',
'importlib-metadata<5',
- 'setuptools>=40.3.0',
+ 'setuptools>=40.3.0'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
diff --git a/sdk/cwl/test_with_arvbox.sh b/sdk/cwl/test_with_arvbox.sh
index 6823a8e2a..55099afdf 100755
--- a/sdk/cwl/test_with_arvbox.sh
+++ b/sdk/cwl/test_with_arvbox.sh
@@ -17,6 +17,7 @@ tag="latest"
pythoncmd=python3
suite=conformance
runapi=containers
+reinstall=0
while test -n "$1" ; do
arg="$1"
@@ -45,6 +46,10 @@ while test -n "$1" ; do
devcwl=1
shift
;;
+ --reinstall)
+ reinstall=1
+ shift
+ ;;
--pythoncmd)
pythoncmd=$2
shift ; shift
@@ -96,7 +101,11 @@ set -eu -o pipefail
export PYCMD=$pythoncmd
-if test $config = dev ; then
+if test $config = dev -o $reinstall = 1; then
+ cd /usr/src/arvados/sdk/python
+ \$PYCMD setup.py sdist
+ pip_install \$(ls -r dist/arvados-python-client-*.tar.gz | head -n1)
+
cd /usr/src/arvados/sdk/cwl
\$PYCMD setup.py sdist
pip_install \$(ls -r dist/arvados-cwl-runner-*.tar.gz | head -n1)
@@ -104,10 +113,14 @@ fi
set -x
+# 2.3.20230527113600 release of cwltest confirms that files exist on disk, since
+# our files are in Keep, all the tests fail.
+# We should add [optional] Arvados support to cwltest so it can access
+# Keep but for the time being just install the last working version.
if [ "\$PYCMD" = "python3" ]; then
- pip3 install cwltest
+ pip3 install 'cwltest<2.3.20230527113600'
else
- pip install cwltest
+ pip install 'cwltest<2.3.20230527113600'
fi
mkdir -p /tmp/cwltest
commit 6114eb9ddbfa218bab9a6cf937097bc77f946db1
Author: Tom Clegg <tom at curii.com>
Date: Fri Jun 2 12:11:02 2023 -0400
Merge branch '20541-less-fetching-mounts'
fixes #20541
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index 9d9d6d400..972b8f3d5 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -1124,6 +1124,7 @@ func (runner *ContainerRunner) WaitFinish() error {
}
runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
"container": arvadosclient.Dict{"exit_code": exitcode},
}, nil)
if err != nil {
@@ -1200,7 +1201,10 @@ func (runner *ContainerRunner) updateLogs() {
}
err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
- "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "log": saved.PortableDataHash,
+ },
}, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
@@ -1316,6 +1320,7 @@ func (runner *ContainerRunner) checkSpotInterruptionNotices() {
func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
"container": arvadosclient.Dict{
"runtime_status": status,
},
@@ -1332,7 +1337,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
// Output may have been set directly by the container, so
// refresh the container record to check.
err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
- nil, &runner.Container)
+ arvadosclient.Dict{
+ "select": []string{"output"},
+ }, &runner.Container)
if err != nil {
return err
}
@@ -1370,6 +1377,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
var resp arvados.Collection
err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
"ensure_unique_name": true,
+ "select": []string{"portable_data_hash"},
"collection": arvadosclient.Dict{
"is_trashed": true,
"name": "output for " + runner.Container.UUID,
@@ -1496,6 +1504,8 @@ func (runner *ContainerRunner) CommitLogs() error {
return nil
}
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
runner.logMtx.Lock()
defer runner.logMtx.Unlock()
@@ -1524,7 +1534,10 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
updates["trash_at"] = exp
updates["delete_at"] = exp
}
- reqBody := arvadosclient.Dict{"collection": updates}
+ reqBody := arvadosclient.Dict{
+ "select": []string{"uuid", "portable_data_hash"},
+ "collection": updates,
+ }
var err2 error
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
@@ -1559,7 +1572,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
return runner.DispatcherArvClient.Update(
"containers",
runner.Container.UUID,
- arvadosclient.Dict{"container": updates},
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": updates,
+ },
nil,
)
}
@@ -1597,7 +1613,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
update["output"] = *runner.OutputPDH
}
update["cost"] = runner.calculateCost(time.Now())
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+ return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": update,
+ }, nil)
}
// IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -2089,7 +2108,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
cr.gateway.UpdateTunnelURL = func(url string) {
cr.gateway.Address = "tunnel " + url
cr.DispatcherArvClient.Update("containers", containerUUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+ arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+ }, nil)
}
}
err = cr.gateway.Start()
@@ -2458,6 +2480,7 @@ func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
for range sigchan {
runner.loadPrices()
update := arvadosclient.Dict{
+ "select": []string{"uuid"},
"container": arvadosclient.Dict{
"cost": runner.calculateCost(time.Now()),
},
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 636592b5a..d71ade8a8 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -540,6 +540,12 @@ func (c *Client) RequestAndDecodeContext(ctx context.Context, dst interface{}, m
if err != nil {
return err
}
+ if dst == nil {
+ if urlValues == nil {
+ urlValues = url.Values{}
+ }
+ urlValues["select"] = []string{`["uuid"]`}
+ }
if urlValues == nil {
// Nothing to send
} else if body != nil || ((method == "GET" || method == "HEAD") && len(urlValues.Encode()) < 1000) {
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
index b7c60fbf8..a196003b8 100644
--- a/sdk/go/arvados/client_test.go
+++ b/sdk/go/arvados/client_test.go
@@ -170,6 +170,44 @@ func (*clientSuite) TestAnythingToValues(c *check.C) {
}
}
+// select=["uuid"] is added automatically when RequestAndDecode's
+// destination argument is nil.
+func (*clientSuite) TestAutoSelectUUID(c *check.C) {
+ var req *http.Request
+ var err error
+ server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c.Check(r.ParseForm(), check.IsNil)
+ req = r
+ w.Write([]byte("{}"))
+ }))
+ client := Client{
+ APIHost: strings.TrimPrefix(server.URL, "https://"),
+ AuthToken: "zzz",
+ Insecure: true,
+ Timeout: 2 * time.Second,
+ }
+
+ req = nil
+ err = client.RequestAndDecode(nil, http.MethodPost, "test", nil, nil)
+ c.Check(err, check.IsNil)
+ c.Check(req.FormValue("select"), check.Equals, `["uuid"]`)
+
+ req = nil
+ err = client.RequestAndDecode(nil, http.MethodGet, "test", nil, nil)
+ c.Check(err, check.IsNil)
+ c.Check(req.FormValue("select"), check.Equals, `["uuid"]`)
+
+ req = nil
+ err = client.RequestAndDecode(nil, http.MethodGet, "test", nil, map[string]interface{}{"select": []string{"blergh"}})
+ c.Check(err, check.IsNil)
+ c.Check(req.FormValue("select"), check.Equals, `["uuid"]`)
+
+ req = nil
+ err = client.RequestAndDecode(&struct{}{}, http.MethodGet, "test", nil, map[string]interface{}{"select": []string{"blergh"}})
+ c.Check(err, check.IsNil)
+ c.Check(req.FormValue("select"), check.Equals, `["blergh"]`)
+}
+
func (*clientSuite) TestLoadConfig(c *check.C) {
oldenv := os.Environ()
defer func() {
commit b0671eae0f78e5cb7d87c3f5ebc3b951563813e5
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 1 15:13:57 2023 -0400
Merge branch '20540-crunch-run-retry'
fixes #20540
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/crunchrun/copier.go b/lib/crunchrun/copier.go
index 734b2c282..a081c5d32 100644
--- a/lib/crunchrun/copier.go
+++ b/lib/crunchrun/copier.go
@@ -51,7 +51,6 @@ type filetodo struct {
// manifest, err := (&copier{...}).Copy()
type copier struct {
client *arvados.Client
- arvClient IArvadosClient
keepClient IKeepClient
hostOutputDir string
ctrOutputDir string
@@ -372,7 +371,7 @@ func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
return mft, nil
}
var coll arvados.Collection
- err := cp.arvClient.Get("collections", pdh, nil, &coll)
+ err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
if err != nil {
return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
}
diff --git a/lib/crunchrun/copier_test.go b/lib/crunchrun/copier_test.go
index 5e9249016..c8936d1a9 100644
--- a/lib/crunchrun/copier_test.go
+++ b/lib/crunchrun/copier_test.go
@@ -12,7 +12,6 @@ import (
"syscall"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
@@ -27,12 +26,9 @@ type copierSuite struct {
func (s *copierSuite) SetUpTest(c *check.C) {
tmpdir := c.MkDir()
- api, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.IsNil)
s.log = bytes.Buffer{}
s.cp = copier{
client: arvados.NewClientFromEnv(),
- arvClient: api,
hostOutputDir: tmpdir,
ctrOutputDir: "/ctr/outdir",
mounts: map[string]arvados.Mount{
diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go
index bfd852257..9d9d6d400 100644
--- a/lib/crunchrun/crunchrun.go
+++ b/lib/crunchrun/crunchrun.go
@@ -638,7 +638,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
if err != nil {
return nil, fmt.Errorf("creating temp dir: %v", err)
}
- err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
+ err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token)
if err != nil {
return nil, err
}
@@ -1345,7 +1345,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
txt, err := (&copier{
client: runner.containerClient,
- arvClient: runner.ContainerArvClient,
keepClient: runner.ContainerKeepClient,
hostOutputDir: runner.HostOutputDir,
ctrOutputDir: runner.Container.OutputPath,
@@ -1989,7 +1988,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
log.Printf("%s: %v", containerUUID, err)
return 1
}
- api.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ api.Retries = 10
kc, err := keepclient.MakeKeepClient(api)
if err != nil {
@@ -2166,7 +2167,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
return conf
}
- arv.Retries = 8
+ // arvadosclient now interprets Retries=10 to mean
+ // Timeout=10m, retrying with exponential backoff + jitter.
+ arv.Retries = 10
var ctr arvados.Container
err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
if err != nil {
diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go
index 9c4fe20bc..aa20104f3 100644
--- a/lib/crunchrun/crunchrun_test.go
+++ b/lib/crunchrun/crunchrun_test.go
@@ -17,6 +17,8 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
+ "net/http/httputil"
+ "net/url"
"os"
"os/exec"
"path"
@@ -38,6 +40,8 @@ import (
"git.arvados.org/arvados.git/sdk/go/manifest"
. "gopkg.in/check.v1"
+ git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
+ git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
)
// Gocheck boilerplate
@@ -416,6 +420,67 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
return nil, nil
}
+type apiStubServer struct {
+ server *httptest.Server
+ proxy *httputil.ReverseProxy
+ intercept func(http.ResponseWriter, *http.Request) bool
+
+ container arvados.Container
+ logs map[string]string
+}
+
+func apiStub() (*arvados.Client, *apiStubServer) {
+ client := arvados.NewClientFromEnv()
+ apistub := &apiStubServer{}
+ apistub.server = httptest.NewTLSServer(apistub)
+ apistub.proxy = httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "https", Host: client.APIHost})
+ if client.Insecure {
+ apistub.proxy.Transport = arvados.InsecureHTTPClient.Transport
+ }
+ client.APIHost = apistub.server.Listener.Addr().String()
+ return client, apistub
+}
+
+func (apistub *apiStubServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if apistub.intercept != nil && apistub.intercept(w, r) {
+ return
+ }
+ if r.Method == "POST" && r.URL.Path == "/arvados/v1/logs" {
+ var body struct {
+ Log struct {
+ EventType string `json:"event_type"`
+ Properties struct {
+ Text string
+ }
+ }
+ }
+ json.NewDecoder(r.Body).Decode(&body)
+ apistub.logs[body.Log.EventType] += body.Log.Properties.Text
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+hwPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: hwManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+otherPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: otherManifest})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+normalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: normalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+denormalizedWithSubdirsPDH {
+ json.NewEncoder(w).Encode(arvados.Collection{ManifestText: denormalizedManifestWithSubdirs})
+ return
+ }
+ if r.Method == "GET" && r.URL.Path == "/arvados/v1/containers/"+apistub.container.UUID {
+ json.NewEncoder(w).Encode(apistub.container)
+ return
+ }
+ apistub.proxy.ServeHTTP(w, r)
+}
+
func (s *TestSuite) TestLoadImage(c *C) {
s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
s.runner.Container.Mounts = map[string]arvados.Mount{
@@ -687,8 +752,9 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn
}
return d, err
}
+ client, _ := apiStub()
s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
- return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, nil, nil
+ return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, client, nil
}
if extraMounts != nil && len(extraMounts) > 0 {
@@ -1352,6 +1418,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr := s.runner
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
+ cr.containerClient, _ = apiStub()
cr.ContainerArvClient = &ArvTestClient{}
cr.ContainerKeepClient = &KeepTestClient{}
cr.Container.OutputStorageClasses = []string{"default"}
@@ -1674,7 +1741,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
{
i = 0
cr.ArvMountPoint = ""
- (*GitMountSuite)(nil).useTestGitServer(c)
+ git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
cr.token = arvadostest.ActiveToken
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
diff --git a/lib/crunchrun/git_mount.go b/lib/crunchrun/git_mount.go
index 92bb6d11d..561ea18de 100644
--- a/lib/crunchrun/git_mount.go
+++ b/lib/crunchrun/git_mount.go
@@ -48,25 +48,22 @@ func (gm gitMount) validate() error {
// ExtractTree extracts the specified tree into dir, which is an
// existing empty local directory.
-func (gm gitMount) extractTree(ac IArvadosClient, dir string, token string) error {
+func (gm gitMount) extractTree(ac *arvados.Client, dir string, token string) error {
err := gm.validate()
if err != nil {
return err
}
- baseURL, err := ac.Discovery("gitUrl")
+ dd, err := ac.DiscoveryDocument()
if err != nil {
- return fmt.Errorf("discover gitUrl from API: %s", err)
- } else if _, ok := baseURL.(string); !ok {
- return fmt.Errorf("discover gitUrl from API: expected string, found %T", baseURL)
+ return fmt.Errorf("error getting discovery document: %w", err)
}
-
- u, err := url.Parse(baseURL.(string))
+ u, err := url.Parse(dd.GitURL)
if err != nil {
- return fmt.Errorf("parse gitUrl %q: %s", baseURL, err)
+ return fmt.Errorf("parse gitUrl %q: %s", dd.GitURL, err)
}
u, err = u.Parse("/" + gm.UUID + ".git")
if err != nil {
- return fmt.Errorf("build git url from %q, %q: %s", baseURL, gm.UUID, err)
+ return fmt.Errorf("build git url from %q, %q: %s", dd.GitURL, gm.UUID, err)
}
store := memory.NewStorage()
repo, err := git.Init(store, osfs.New(dir))
diff --git a/lib/crunchrun/git_mount_test.go b/lib/crunchrun/git_mount_test.go
index e39beaa94..ac98dcc48 100644
--- a/lib/crunchrun/git_mount_test.go
+++ b/lib/crunchrun/git_mount_test.go
@@ -6,14 +6,11 @@ package crunchrun
import (
"io/ioutil"
- "net/url"
"os"
"path/filepath"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
@@ -26,11 +23,10 @@ type GitMountSuite struct {
var _ = check.Suite(&GitMountSuite{})
func (s *GitMountSuite) SetUpTest(c *check.C) {
- s.useTestGitServer(c)
-
var err error
s.tmpdir, err = ioutil.TempDir("", "")
c.Assert(err, check.IsNil)
+ git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
}
func (s *GitMountSuite) TearDownTest(c *check.C) {
@@ -39,13 +35,14 @@ func (s *GitMountSuite) TearDownTest(c *check.C) {
}
// Commit fd3531f is crunch-run-tree-test
-func (s *GitMountSuite) TestextractTree(c *check.C) {
+func (s *GitMountSuite) TestExtractTree(c *check.C) {
gm := gitMount{
Path: "/",
UUID: arvadostest.Repository2UUID,
Commit: "fd3531f42995344f36c30b79f55f27b502f3d344",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ ac := arvados.NewClientFromEnv()
+ err := gm.extractTree(ac, s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.IsNil)
fnm := filepath.Join(s.tmpdir, "dir1/dir2/file with mode 0644")
@@ -85,7 +82,7 @@ func (s *GitMountSuite) TestExtractNonTipCommit(c *check.C) {
UUID: arvadostest.Repository2UUID,
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.IsNil)
fnm := filepath.Join(s.tmpdir, "file only on testbranch")
@@ -100,7 +97,7 @@ func (s *GitMountSuite) TestNonexistentRepository(c *check.C) {
UUID: "zzzzz-s0uqq-nonexistentrepo",
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, ".*repository not found.*")
@@ -113,7 +110,7 @@ func (s *GitMountSuite) TestNonexistentCommit(c *check.C) {
UUID: arvadostest.Repository2UUID,
Commit: "bb66b6bb6b6bbb6b6b6b66b6b6b6b6b6b6b6b66b",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
c.Check(err, check.ErrorMatches, ".*object not found.*")
@@ -127,8 +124,8 @@ func (s *GitMountSuite) TestGitUrlDiscoveryFails(c *check.C) {
UUID: arvadostest.Repository2UUID,
Commit: "5ebfab0522851df01fec11ec55a6d0f4877b542e",
}
- err := gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
- c.Check(err, check.ErrorMatches, ".*gitUrl.*")
+ err := gm.extractTree(&arvados.Client{}, s.tmpdir, arvadostest.ActiveToken)
+ c.Check(err, check.ErrorMatches, ".*error getting discovery doc.*")
}
func (s *GitMountSuite) TestInvalid(c *check.C) {
@@ -186,7 +183,7 @@ func (s *GitMountSuite) TestInvalid(c *check.C) {
matcher: ".*writable.*",
},
} {
- err := trial.gm.extractTree(&ArvTestClient{}, s.tmpdir, arvadostest.ActiveToken)
+ err := trial.gm.extractTree(arvados.NewClientFromEnv(), s.tmpdir, arvadostest.ActiveToken)
c.Check(err, check.NotNil)
s.checkTmpdirContents(c, []string{})
@@ -202,15 +199,3 @@ func (s *GitMountSuite) checkTmpdirContents(c *check.C, expect []string) {
c.Check(err, check.IsNil)
c.Check(names, check.DeepEquals, expect)
}
-
-func (*GitMountSuite) useTestGitServer(c *check.C) {
- git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
-
- loader := config.NewLoader(nil, ctxlog.TestLogger(c))
- cfg, err := loader.Load()
- c.Assert(err, check.IsNil)
- cluster, err := cfg.GetCluster("")
- c.Assert(err, check.IsNil)
-
- discoveryMap["gitUrl"] = (*url.URL)(&cluster.Services.GitHTTP.ExternalURL).String()
-}
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go
index 6316d1bed..636592b5a 100644
--- a/sdk/go/arvados/client.go
+++ b/sdk/go/arvados/client.go
@@ -16,12 +16,15 @@ import (
"io/fs"
"io/ioutil"
"log"
+ "math"
"math/big"
+ mathrand "math/rand"
"net"
"net/http"
"net/url"
"os"
"regexp"
+ "strconv"
"strings"
"sync/atomic"
"time"
@@ -274,6 +277,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
rclient := retryablehttp.NewClient()
rclient.HTTPClient = c.httpClient()
+ rclient.Backoff = exponentialBackoff
if c.Timeout > 0 {
rclient.RetryWaitMax = c.Timeout / 10
rclient.RetryMax = 32
@@ -370,6 +374,40 @@ func isRedirectStatus(code int) bool {
}
}
+const minExponentialBackoffBase = time.Second
+
+// Implements retryablehttp.Backoff using the server-provided
+// Retry-After header if available, otherwise nearly-full jitter
+// exponential backoff (similar to
+// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/),
+// in all cases respecting the provided min and max.
+func exponentialBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration {
+ if attemptNum > 0 && min < minExponentialBackoffBase {
+ min = minExponentialBackoffBase
+ }
+ var t time.Duration
+ if resp != nil && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable) {
+ if s := resp.Header.Get("Retry-After"); s != "" {
+ if sleep, err := strconv.ParseInt(s, 10, 64); err == nil {
+ t = time.Second * time.Duration(sleep)
+ } else if stamp, err := time.Parse(time.RFC1123, s); err == nil {
+ t = stamp.Sub(time.Now())
+ }
+ }
+ }
+ if t == 0 {
+ jitter := mathrand.New(mathrand.NewSource(int64(time.Now().Nanosecond()))).Float64()
+ t = min + time.Duration((math.Pow(2, float64(attemptNum))*float64(min)-float64(min))*jitter)
+ }
+ if t < min {
+ return min
+ } else if t > max {
+ return max
+ } else {
+ return t
+ }
+}
+
// DoAndDecode performs req and unmarshals the response (which must be
// JSON) into dst. Use this instead of RequestAndDecode if you need
// more control of the http.Request object.
diff --git a/sdk/go/arvados/client_test.go b/sdk/go/arvados/client_test.go
index 422aca9f6..b7c60fbf8 100644
--- a/sdk/go/arvados/client_test.go
+++ b/sdk/go/arvados/client_test.go
@@ -9,6 +9,7 @@ import (
"context"
"fmt"
"io/ioutil"
+ "math"
"math/rand"
"net/http"
"net/http/httptest"
@@ -339,3 +340,66 @@ func (s *clientRetrySuite) TestContextAlreadyCanceled(c *check.C) {
err := s.client.RequestAndDecodeContext(ctx, &struct{}{}, http.MethodGet, "test", nil, nil)
c.Check(err, check.Equals, context.Canceled)
}
+
+func (s *clientRetrySuite) TestExponentialBackoff(c *check.C) {
+ var min, max time.Duration
+ min, max = time.Second, 64*time.Second
+
+ t := exponentialBackoff(min, max, 0, nil)
+ c.Check(t, check.Equals, min)
+
+ for e := float64(1); e < 5; e += 1 {
+ ok := false
+ for i := 0; i < 20; i++ {
+ t = exponentialBackoff(min, max, int(e), nil)
+ // Every returned value must be between min and min(2^e, max)
+ c.Check(t >= min, check.Equals, true)
+ c.Check(t <= min*time.Duration(math.Pow(2, e)), check.Equals, true)
+ c.Check(t <= max, check.Equals, true)
+ // Check that jitter is actually happening by
+ // checking that at least one in 20 trials is
+ // between min*2^(e-.75) and min*2^(e-.25)
+ jittermin := time.Duration(float64(min) * math.Pow(2, e-0.75))
+ jittermax := time.Duration(float64(min) * math.Pow(2, e-0.25))
+ c.Logf("min %v max %v e %v jittermin %v jittermax %v t %v", min, max, e, jittermin, jittermax, t)
+ if t > jittermin && t < jittermax {
+ ok = true
+ break
+ }
+ }
+ c.Check(ok, check.Equals, true)
+ }
+
+ for i := 0; i < 20; i++ {
+ t := exponentialBackoff(min, max, 100, nil)
+ c.Check(t < max, check.Equals, true)
+ }
+
+ for _, trial := range []struct {
+ retryAfter string
+ expect time.Duration
+ }{
+ {"1", time.Second * 4}, // minimum enforced
+ {"5", time.Second * 5}, // header used
+ {"55", time.Second * 10}, // maximum enforced
+ {"eleventy-nine", time.Second * 4}, // invalid header, exponential backoff used
+ {time.Now().UTC().Add(time.Second).Format(time.RFC1123), time.Second * 4}, // minimum enforced
+ {time.Now().UTC().Add(time.Minute).Format(time.RFC1123), time.Second * 10}, // maximum enforced
+ {time.Now().UTC().Add(-time.Minute).Format(time.RFC1123), time.Second * 4}, // minimum enforced
+ } {
+ c.Logf("trial %+v", trial)
+ t := exponentialBackoff(time.Second*4, time.Second*10, 0, &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ Header: http.Header{"Retry-After": {trial.retryAfter}}})
+ c.Check(t, check.Equals, trial.expect)
+ }
+ t = exponentialBackoff(time.Second*4, time.Second*10, 0, &http.Response{
+ StatusCode: http.StatusTooManyRequests,
+ })
+ c.Check(t, check.Equals, time.Second*4)
+
+ t = exponentialBackoff(0, max, 0, nil)
+ c.Check(t, check.Equals, time.Duration(0))
+ t = exponentialBackoff(0, max, 1, nil)
+ c.Check(t, check.Not(check.Equals), time.Duration(0))
+}
diff --git a/sdk/go/arvadosclient/arvadosclient.go b/sdk/go/arvadosclient/arvadosclient.go
index 13b3a30ac..516187c0e 100644
--- a/sdk/go/arvadosclient/arvadosclient.go
+++ b/sdk/go/arvadosclient/arvadosclient.go
@@ -231,74 +231,37 @@ func (c *ArvadosClient) CallRaw(method string, resourceType string, uuid string,
vals.Set(k, string(m))
}
}
-
- retryable := false
- switch method {
- case "GET", "HEAD", "PUT", "OPTIONS", "DELETE":
- retryable = true
- }
-
- // Non-retryable methods such as POST are not safe to retry automatically,
- // so we minimize such failures by always using a new or recently active socket
- if !retryable {
- if time.Since(c.lastClosedIdlesAt) > MaxIdleConnectionDuration {
- c.lastClosedIdlesAt = time.Now()
- c.Client.Transport.(*http.Transport).CloseIdleConnections()
- }
- }
-
- // Make the request
var req *http.Request
- var resp *http.Response
-
- for attempt := 0; attempt <= c.Retries; attempt++ {
- if method == "GET" || method == "HEAD" {
- u.RawQuery = vals.Encode()
- if req, err = http.NewRequest(method, u.String(), nil); err != nil {
- return nil, err
- }
- } else {
- if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
- return nil, err
- }
- req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
- }
-
- // Add api token header
- req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", c.ApiToken))
- if c.RequestID != "" {
- req.Header.Add("X-Request-Id", c.RequestID)
- }
-
- resp, err = c.Client.Do(req)
- if err != nil {
- if retryable {
- time.Sleep(RetryDelay)
- continue
- } else {
- return nil, err
- }
- }
-
- if resp.StatusCode == http.StatusOK {
- return resp.Body, nil
+ if method == "GET" || method == "HEAD" {
+ u.RawQuery = vals.Encode()
+ if req, err = http.NewRequest(method, u.String(), nil); err != nil {
+ return nil, err
}
-
- defer resp.Body.Close()
-
- switch resp.StatusCode {
- case 408, 409, 422, 423, 500, 502, 503, 504:
- time.Sleep(RetryDelay)
- continue
- default:
- return nil, newAPIServerError(c.ApiServer, resp)
+ } else {
+ if req, err = http.NewRequest(method, u.String(), bytes.NewBufferString(vals.Encode())); err != nil {
+ return nil, err
}
+ req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
}
-
- if resp != nil {
+ if c.RequestID != "" {
+ req.Header.Add("X-Request-Id", c.RequestID)
+ }
+ client := arvados.Client{
+ Client: c.Client,
+ APIHost: c.ApiServer,
+ AuthToken: c.ApiToken,
+ Insecure: c.ApiInsecure,
+ Timeout: 30 * RetryDelay * time.Duration(c.Retries),
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent {
+ defer resp.Body.Close()
return nil, newAPIServerError(c.ApiServer, resp)
}
- return nil, err
+ return resp.Body, nil
}
func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError {
@@ -332,12 +295,12 @@ func newAPIServerError(ServerAddress string, resp *http.Response) APIServerError
// Call an API endpoint and parse the JSON response into an object.
//
-// method - HTTP method: GET, HEAD, PUT, POST, PATCH or DELETE.
-// resourceType - the type of arvados resource to act on (e.g., "collections", "pipeline_instances").
-// uuid - the uuid of the specific item to access. May be empty.
-// action - API method name (e.g., "lock"). This is often empty if implied by method and uuid.
-// parameters - method parameters.
-// output - a map or annotated struct which is a legal target for encoding/json/Decoder.
+// method - HTTP method: GET, HEAD, PUT, POST, PATCH or DELETE.
+// resourceType - the type of arvados resource to act on (e.g., "collections", "pipeline_instances").
+// uuid - the uuid of the specific item to access. May be empty.
+// action - API method name (e.g., "lock"). This is often empty if implied by method and uuid.
+// parameters - method parameters.
+// output - a map or annotated struct which is a legal target for encoding/json/Decoder.
//
// Returns a non-nil error if an error occurs making the API call, the
// API responds with a non-successful HTTP status, or an error occurs
diff --git a/sdk/go/arvadosclient/arvadosclient_test.go b/sdk/go/arvadosclient/arvadosclient_test.go
index 27e23c1ae..b074e21e8 100644
--- a/sdk/go/arvadosclient/arvadosclient_test.go
+++ b/sdk/go/arvadosclient/arvadosclient_test.go
@@ -31,7 +31,7 @@ type ServerRequiredSuite struct{}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartKeep(2, false)
- RetryDelay = 0
+ RetryDelay = 2 * time.Second
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
@@ -248,7 +248,7 @@ func (s *UnitSuite) TestPDHMatch(c *C) {
type MockArvadosServerSuite struct{}
func (s *MockArvadosServerSuite) SetUpSuite(c *C) {
- RetryDelay = 0
+ RetryDelay = 100 * time.Millisecond
}
func (s *MockArvadosServerSuite) SetUpTest(c *C) {
@@ -279,15 +279,17 @@ type APIStub struct {
}
func (h *APIStub) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if req.URL.Path == "/redirect-loop" {
- http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
- return
- }
- if h.respStatus[h.retryAttempts] < 0 {
- // Fail the client's Do() by starting a redirect loop
- http.Redirect(resp, req, "/redirect-loop", http.StatusFound)
+ if status := h.respStatus[h.retryAttempts]; status < 0 {
+ // Fail the client's Do() by hanging up without
+ // sending an HTTP response header.
+ conn, _, err := resp.(http.Hijacker).Hijack()
+ if err != nil {
+ panic(err)
+ }
+ conn.Write([]byte("zzzzzzzzzz"))
+ conn.Close()
} else {
- resp.WriteHeader(h.respStatus[h.retryAttempts])
+ resp.WriteHeader(status)
resp.Write([]byte(h.responseBody[h.retryAttempts]))
}
h.retryAttempts++
@@ -302,22 +304,22 @@ func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
"create", 0, 200, []int{200, 500}, []string{`{"ok":"ok"}`, ``},
},
{
- "get", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "get", 0, 423, []int{500, 500, 423, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "create", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "create", 0, 423, []int{500, 500, 423, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "update", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "update", 0, 422, []int{500, 500, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "delete", 0, 500, []int{500, 500, 500, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "delete", 0, 422, []int{500, 500, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "get", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "get", 0, 401, []int{500, 502, 401, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
- "create", 0, 502, []int{500, 500, 502, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
+ "create", 0, 422, []int{500, 502, 422, 200}, []string{``, ``, ``, `{"ok":"ok"}`},
},
{
"get", 0, 200, []int{500, 500, 200}, []string{``, ``, `{"ok":"ok"}`},
@@ -337,6 +339,12 @@ func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
{
"create", 0, 401, []int{401, 200}, []string{``, `{"ok":"ok"}`},
},
+ {
+ "create", 0, 403, []int{403, 200}, []string{``, `{"ok":"ok"}`},
+ },
+ {
+ "create", 0, 422, []int{422, 200}, []string{``, `{"ok":"ok"}`},
+ },
{
"get", 0, 404, []int{404, 200}, []string{``, `{"ok":"ok"}`},
},
@@ -352,11 +360,13 @@ func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
{
"get", 0, 200, []int{-1, -1, 200}, []string{``, ``, `{"ok":"ok"}`},
},
- // "POST" is not safe to retry: fail after one error
+ // "POST" protocol error is safe to retry
{
- "create", 0, -1, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
+ "create", 0, 200, []int{-1, 200}, []string{``, `{"ok":"ok"}`},
},
} {
+ c.Logf("stub: %#v", stub)
+
api, err := RunFakeArvadosServer(&stub)
c.Check(err, IsNil)
@@ -396,7 +406,9 @@ func (s *MockArvadosServerSuite) TestWithRetries(c *C) {
default:
c.Check(err, NotNil)
c.Check(err, ErrorMatches, fmt.Sprintf("arvados API server error: %d.*", stub.expected))
- c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+ if c.Check(err, FitsTypeOf, APIServerError{}) {
+ c.Check(err.(APIServerError).HttpStatusCode, Equals, stub.expected)
+ }
}
}
}
commit 4a1942d74e0cd720c1a160dd4c6daa90afd2a6d5
Author: Brett Smith <brett.smith at curii.com>
Date: Thu Jun 1 11:53:27 2023 -0400
Merge branch '20545-drop-if-none-match'
Closes #20545.
Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>
diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go
index 47b8cb471..26d1859ec 100644
--- a/lib/controller/proxy.go
+++ b/lib/controller/proxy.go
@@ -45,6 +45,11 @@ var dropHeaders = map[string]bool{
// Content-Length depends on encoding.
"Content-Length": true,
+
+ // Defend against Rails vulnerability CVE-2023-22795 -
+ // we don't use this functionality anyway, so it costs us nothing.
+ // <https://discuss.rubyonrails.org/t/cve-2023-22795-possible-redos-based-dos-vulnerability-in-action-dispatch/82118>
+ "If-None-Match": true,
}
type ResponseFilter func(*http.Response, error) (*http.Response, error)
commit be1e2a5b0ec1ccb72053e43dbeb8a1f0cad86fa5
Author: Brett Smith <brett.smith at curii.com>
Date: Thu Jun 1 11:53:23 2023 -0400
Merge branch '20433-crunch-log-zero-bytes-job'
Closes #20433.
Arvados-DCO-1.1-Signed-off-by: Brett Smith <brett.smith at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 93c0fbd34..9dc1d3e47 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1154,6 +1154,8 @@ Clusters:
# Maximum bytes that may be logged by a single job. Log bytes that are
# silenced by throttling are not counted against this total.
+ # If you set this to zero, each container will only create a single
+ # log on the API server, noting for users that logging is throttled.
LimitLogBytesPerJob: 67108864
LogPartialLineThrottlePeriod: 5s
diff --git a/lib/crunchrun/logging_test.go b/lib/crunchrun/logging_test.go
index fdd4f27b7..42f165fd7 100644
--- a/lib/crunchrun/logging_test.go
+++ b/lib/crunchrun/logging_test.go
@@ -191,6 +191,10 @@ func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C)
s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
}
+func (s *LoggingTestSuite) TestWriteLogsWithZeroBytesPerJob(c *C) {
+ s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 0, 67108864, "Exceeded log limit 0 bytes (crunch_limit_log_bytes_per_job)")
+}
+
func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
discoveryMap[throttleParam] = float64(throttleValue)
defer func() {
commit 4a5db0148d7f02775ef3e867b3fe50450a7ec24f
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Jun 1 10:23:41 2023 -0400
Merge branch '20561-file-copy-logging' refs #20561
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/lib/crunchrun/copier.go b/lib/crunchrun/copier.go
index 72c714dfa..734b2c282 100644
--- a/lib/crunchrun/copier.go
+++ b/lib/crunchrun/copier.go
@@ -109,7 +109,7 @@ func (cp *copier) Copy() (string, error) {
}
func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
- cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
+ cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return 0, err
@@ -162,6 +162,20 @@ func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow b
// copy, relative to its mount point -- ".", "./foo.txt", ...
srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
+ // outputRelPath is the path relative in the output directory
+ // that corresponds to the path in the output collection where
+ // the file will go, for logging
+ var outputRelPath = ""
+ if strings.HasPrefix(src, cp.ctrOutputDir) {
+ outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
+ }
+ if outputRelPath == "" {
+ // blank means copy a whole directory, so replace it
+ // with a wildcard to make it a little clearer what's
+ // going on since outputRelPath is only used for logging
+ outputRelPath = "*"
+ }
+
switch {
case srcMount.ExcludeFromOutput:
case srcMount.Kind == "tmp":
@@ -170,12 +184,14 @@ func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow b
case srcMount.Kind != "collection":
return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
case !srcMount.Writable:
+ cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
mft, err := cp.getManifest(srcMount.PortableDataHash)
if err != nil {
return err
}
cp.manifest += mft.Extract(srcRelPath, dest).Text
default:
+ cp.logger.Printf("copying %q", outputRelPath)
hostRoot, err := cp.hostRoot(srcRoot)
if err != nil {
return err
commit a3ce4d1f8cc231404e13b96894cd2afecb95192d
Author: Tom Clegg <tom at curii.com>
Date: Thu Jun 1 09:33:48 2023 -0400
Merge branch '20485-optional-deploy-ssh-key'
closes #20485
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/cloud/azure/azure.go b/lib/cloud/azure/azure.go
index 7b170958b..494db854e 100644
--- a/lib/cloud/azure/azure.go
+++ b/lib/cloud/azure/azure.go
@@ -514,20 +514,23 @@ func (az *azureInstanceSet) Create(
AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
- SSH: &compute.SSHConfiguration{
- PublicKeys: &[]compute.SSHPublicKey{
- {
- Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
- KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
- },
- },
- },
},
CustomData: &customData,
},
},
}
+ if publicKey != nil {
+ vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH = &compute.SSHConfiguration{
+ PublicKeys: &[]compute.SSHPublicKey{
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
+ KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
+ },
+ },
+ }
+ }
+
if instanceType.Preemptible {
// Setting maxPrice to -1 is the equivalent of paying spot price, up to the
// normal price. This means the node will not be pre-empted for price
diff --git a/lib/cloud/azure/azure_test.go b/lib/cloud/azure/azure_test.go
index b6aa9a16b..2f88f7344 100644
--- a/lib/cloud/azure/azure_test.go
+++ b/lib/cloud/azure/azure_test.go
@@ -69,14 +69,17 @@ var _ = check.Suite(&AzureInstanceSetSuite{})
const testNamePrefix = "compute-test123-"
-type VirtualMachinesClientStub struct{}
+type VirtualMachinesClientStub struct {
+ vmParameters compute.VirtualMachine
+}
-func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
+func (stub *VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
resourceGroupName string,
VMName string,
parameters compute.VirtualMachine) (result compute.VirtualMachine, err error) {
parameters.ID = &VMName
parameters.Name = &VMName
+ stub.vmParameters = parameters
return parameters, nil
}
@@ -124,7 +127,7 @@ type testConfig struct {
var live = flag.String("live-azure-cfg", "", "Test with real azure API, provide config file")
-func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error) {
+func GetInstanceSet() (*azureInstanceSet, cloud.ImageID, arvados.Cluster, error) {
cluster := arvados.Cluster{
InstanceTypes: arvados.InstanceTypeMap(map[string]arvados.InstanceType{
"tiny": {
@@ -154,7 +157,7 @@ func GetInstanceSet() (cloud.InstanceSet, cloud.ImageID, arvados.Cluster, error)
}
ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
- return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
+ return ap.(*azureInstanceSet), cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
ap := azureInstanceSet{
azconfig: azureInstanceSetConfig{
@@ -193,18 +196,25 @@ func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
tags := inst.Tags()
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ c.Check(ap.vmClient.(*VirtualMachinesClientStub).vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH, check.NotNil)
+ }
instPreemptable, err := ap.Create(cluster.InstanceTypes["tinyp"],
img, map[string]string{
"TestTagName": "test tag value",
- }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", nil)
c.Assert(err, check.IsNil)
tags = instPreemptable.Tags()
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("instPreemptable.String()=%v Address()=%v Tags()=%v", instPreemptable.String(), instPreemptable.Address(), tags)
-
+ if *live == "" {
+ // Should not have set SSH option, because publickey
+ // arg was nil
+ c.Check(ap.vmClient.(*VirtualMachinesClientStub).vmParameters.VirtualMachineProperties.OsProfile.LinuxConfiguration.SSH, check.IsNil)
+ }
}
func (*AzureInstanceSetSuite) TestListInstances(c *check.C) {
@@ -229,7 +239,7 @@ func (*AzureInstanceSetSuite) TestManageNics(c *check.C) {
c.Fatal("Error making provider", err)
}
- ap.(*azureInstanceSet).manageNics()
+ ap.manageNics()
ap.Stop()
}
@@ -239,7 +249,7 @@ func (*AzureInstanceSetSuite) TestManageBlobs(c *check.C) {
c.Fatal("Error making provider", err)
}
- ap.(*azureInstanceSet).manageBlobs()
+ ap.manageBlobs()
ap.Stop()
}
@@ -263,7 +273,7 @@ func (*AzureInstanceSetSuite) TestDeleteFake(c *check.C) {
c.Fatal("Error making provider", err)
}
- _, err = ap.(*azureInstanceSet).netClient.delete(context.Background(), "fakefakefake", "fakefakefake")
+ _, err = ap.netClient.delete(context.Background(), "fakefakefake", "fakefakefake")
de, ok := err.(autorest.DetailedError)
if ok {
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index 81e1f8b00..e2cf5e0f1 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -149,11 +149,6 @@ func (instanceSet *ec2InstanceSet) Create(
initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
- keyname, err := instanceSet.getKeyName(publicKey)
- if err != nil {
- return nil, err
- }
-
ec2tags := []*ec2.Tag{}
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
@@ -172,7 +167,6 @@ func (instanceSet *ec2InstanceSet) Create(
InstanceType: &instanceType.ProviderType,
MaxCount: aws.Int64(1),
MinCount: aws.Int64(1),
- KeyName: &keyname,
NetworkInterfaces: []*ec2.InstanceNetworkInterfaceSpecification{
{
@@ -192,6 +186,14 @@ func (instanceSet *ec2InstanceSet) Create(
UserData: aws.String(base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))),
}
+ if publicKey != nil {
+ keyname, err := instanceSet.getKeyName(publicKey)
+ if err != nil {
+ return nil, err
+ }
+ rii.KeyName = &keyname
+ }
+
if instanceType.AddedScratch > 0 {
rii.BlockDeviceMappings = []*ec2.BlockDeviceMapping{{
DeviceName: aws.String("/dev/xvdt"),
diff --git a/lib/cloud/ec2/ec2_test.go b/lib/cloud/ec2/ec2_test.go
index 38ada13ed..ede7f9de5 100644
--- a/lib/cloud/ec2/ec2_test.go
+++ b/lib/cloud/ec2/ec2_test.go
@@ -57,15 +57,19 @@ type testConfig struct {
}
type ec2stub struct {
- c *check.C
- reftime time.Time
+ c *check.C
+ reftime time.Time
+ importKeyPairCalls []*ec2.ImportKeyPairInput
+ describeKeyPairsCalls []*ec2.DescribeKeyPairsInput
}
func (e *ec2stub) ImportKeyPair(input *ec2.ImportKeyPairInput) (*ec2.ImportKeyPairOutput, error) {
+ e.importKeyPairCalls = append(e.importKeyPairCalls, input)
return nil, nil
}
func (e *ec2stub) DescribeKeyPairs(input *ec2.DescribeKeyPairsInput) (*ec2.DescribeKeyPairsOutput, error) {
+ e.describeKeyPairsCalls = append(e.describeKeyPairsCalls, input)
return &ec2.DescribeKeyPairsOutput{}, nil
}
@@ -213,16 +217,18 @@ func (*EC2InstanceSetSuite) TestCreate(c *check.C) {
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 1)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 1)
+ }
}
func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
ap, img, cluster := GetInstanceSet(c)
- pk, _ := test.LoadTestKey(c, "../../dispatchcloud/test/sshkey_dispatch")
-
inst, err := ap.Create(cluster.InstanceTypes["tiny-with-extra-scratch"],
img, map[string]string{
"TestTagName": "test tag value",
- }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", nil)
c.Assert(err, check.IsNil)
@@ -230,6 +236,12 @@ func (*EC2InstanceSetSuite) TestCreateWithExtraScratch(c *check.C) {
c.Check(tags["TestTagName"], check.Equals, "test tag value")
c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
+ if *live == "" {
+ // Should not have called key pair APIs, because
+ // publickey arg was nil
+ c.Check(ap.client.(*ec2stub).describeKeyPairsCalls, check.HasLen, 0)
+ c.Check(ap.client.(*ec2stub).importKeyPairCalls, check.HasLen, 0)
+ }
}
func (*EC2InstanceSetSuite) TestCreatePreemptible(c *check.C) {
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index 150c30bc1..93c0fbd34 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1412,6 +1412,12 @@ Clusters:
# version of crunch-run installed; see CrunchRunCommand above.
DeployRunnerBinary: "/proc/self/exe"
+ # Install the Dispatcher's SSH public key (derived from
+ # DispatchPrivateKey) when creating new cloud
+ # instances. Change this to false if you are using a different
+ # mechanism to pre-install the public key on new instances.
+ DeployPublicKey: true
+
# Tags to add on all resources (VMs, NICs, disks) created by
# the container dispatcher. (Arvados's own tags --
# InstanceType, IdleBehavior, and InstanceSecret -- will also
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
index 06a558d5f..e982736ff 100644
--- a/lib/dispatchcloud/dispatcher.go
+++ b/lib/dispatchcloud/dispatcher.go
@@ -142,6 +142,10 @@ func (disp *dispatcher) initialize() {
} else {
disp.sshKey = key
}
+ installPublicKey := disp.sshKey.PublicKey()
+ if !disp.Cluster.Containers.CloudVMs.DeployPublicKey {
+ installPublicKey = nil
+ }
instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
if err != nil {
@@ -149,7 +153,7 @@ func (disp *dispatcher) initialize() {
}
dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
disp.instanceSet = instanceSet
- disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 1221678cc..d6a055146 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -560,6 +560,7 @@ type CloudVMsConfig struct {
BootProbeCommand string
InstanceInitCommand string
DeployRunnerBinary string
+ DeployPublicKey bool
ImageID string
MaxCloudOpsPerSecond int
MaxProbesPerSecond int
commit df650530536403974839b8c706295abe80c11955
Author: Tom Clegg <tom at curii.com>
Date: Mon May 29 15:43:17 2023 -0400
Merge branch '20229-doc-inspect-requests'
closes #20229
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/doc/_config.yml b/doc/_config.yml
index 4f8625301..2a6a581e8 100644
--- a/doc/_config.yml
+++ b/doc/_config.yml
@@ -184,6 +184,7 @@ navbar:
- admin/logging.html.textile.liquid
- admin/metrics.html.textile.liquid
- admin/health-checks.html.textile.liquid
+ - admin/inspect.html.textile.liquid
- admin/diagnostics.html.textile.liquid
- admin/management-token.html.textile.liquid
- admin/user-activity.html.textile.liquid
diff --git a/doc/admin/inspect.html.textile.liquid b/doc/admin/inspect.html.textile.liquid
new file mode 100644
index 000000000..2ce528b8d
--- /dev/null
+++ b/doc/admin/inspect.html.textile.liquid
@@ -0,0 +1,69 @@
+---
+layout: default
+navsection: admin
+title: Inspecting active requests
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Most Arvados services publish a snapshot of HTTP requests currently being serviced at @/_inspect/requests at . This can be useful for troubleshooting slow requests and understanding high server load conditions.
+
+To access snapshots, services must be configured with a "management token":management-token.html. When accessing this endpoint, prefix the management token with @"Bearer "@ and supply it in the @Authorization@ request header.
+
+In an interactive setting, use the @jq@ tool to format the JSON response.
+
+<notextile><pre><code>curl -sfH "Authorization: Bearer <span class="userinput">your_management_token_goes_here</span>" "https://<span class="userinput">0.0.0.0:25107</span>/_inspect/requests" | jq .
+</code></pre></notextile>
+
+table(table table-bordered table-condensed table-hover){width:40em}.
+|_. Component|_. Provides @/_inspect/requests@ endpoint|
+|arvados-api-server||
+|arvados-controller|✓|
+|arvados-dispatch-cloud|✓|
+|arvados-dispatch-lsf|✓|
+|arvados-git-httpd||
+|arvados-ws|✓|
+|composer||
+|keepproxy|✓|
+|keepstore|✓|
+|keep-balance|✓|
+|keep-web|✓|
+|workbench1||
+|workbench2||
+
+h2. Report fields
+
+Most fields are self explanatory.
+
+The @Host@ field reports the virtual host specified in the incoming HTTP request.
+
+The @RemoteAddr@ field reports the source of the incoming TCP connection, which is typically a local address associated with the Nginx proxy service.
+
+The @Elapsed@ field reports the number of seconds since the incoming HTTP request headers were received.
+
+h2. Example response
+
+<pre>
+[
+ {
+ "RequestID": "req-1vzzj6nwrki0rd2hj08a",
+ "Method": "GET",
+ "Host": "tordo.arvadosapi.com",
+ "URL": "/arvados/v1/groups?order=name+asc&filters=[[%22owner_uuid%22,%22%3D%22,%22zzzzz-tpzed-aaaaaaaaaaaaaaa%22],[%22group_class%22,%22in%22,[%22project%22,%22filter%22]]]",
+ "RemoteAddr": "127.0.0.1:55822",
+ "Elapsed": 0.006363228
+ },
+ {
+ "RequestID": "req-1wrof2b2wlj5s1rao4u3",
+ "Method": "GET",
+ "Host": "tordo.arvadosapi.com",
+ "URL": "/arvados/v1/users/current",
+ "RemoteAddr": "127.0.0.1:55814",
+ "Elapsed": 0.04796585
+ }
+]
+</pre>
diff --git a/doc/admin/metrics.html.textile.liquid b/doc/admin/metrics.html.textile.liquid
index b140bcc1b..660b3b8f3 100644
--- a/doc/admin/metrics.html.textile.liquid
+++ b/doc/admin/metrics.html.textile.liquid
@@ -38,7 +38,7 @@ table(table table-bordered table-condensed table-hover).
|arvados-git-httpd||
|arvados-ws|✓|
|composer||
-|keepproxy||
+|keepproxy|✓|
|keepstore|✓|
|keep-balance|✓|
|keep-web|✓|
commit 41968dfc81774a07342fb1b777e885cbd82a03bc
Author: Tom Clegg <tom at curii.com>
Date: Fri May 26 15:30:25 2023 -0400
Merge branch '20520-instance-init-command'
refs #20520
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/doc/install/crunch2-cloud/install-compute-node.html.textile.liquid b/doc/install/crunch2-cloud/install-compute-node.html.textile.liquid
index fb69a0df3..c20e4855a 100644
--- a/doc/install/crunch2-cloud/install-compute-node.html.textile.liquid
+++ b/doc/install/crunch2-cloud/install-compute-node.html.textile.liquid
@@ -195,7 +195,7 @@ The @VPC@ and @Subnet@ should be configured for where you want the compute image
h3(#aws-ebs-autoscaler). Autoscaling compute node scratch space
-Arvados supports "AWS EBS autoscaler":https://github.com/awslabs/amazon-ebs-autoscale . This feature automatically expands the scratch space on the compute node on demand by 200 GB at a time, up to 5 TB.
+Arvados supports "AWS EBS autoscaler":https://github.com/awslabs/amazon-ebs-autoscale. This feature automatically expands the scratch space on the compute node on demand by 200 GB at a time, up to 5 TB.
If you want to add the daemon in your images, add the @--aws-ebs-autoscale@ flag to the "the build script":#building.
@@ -228,7 +228,7 @@ The AWS EBS autoscaler daemon will be installed with this configuration:
Changing the ebs-autoscale configuration is left as an exercise for the reader.
-This feature also requires a few Arvados configuration changes, described in "EBS-Autoscale configuration"#aws-ebs-autoscaler .
+This feature also requires a few Arvados configuration changes, described in "EBS Autoscale configuration":install-dispatch-cloud.html#aws-ebs-autoscaler.
h2(#azure). Build an Azure image
diff --git a/lib/cloud/cloudtest/cmd.go b/lib/cloud/cloudtest/cmd.go
index 0ec79e117..b3a262c7e 100644
--- a/lib/cloud/cloudtest/cmd.go
+++ b/lib/cloud/cloudtest/cmd.go
@@ -86,22 +86,23 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
tagKeyPrefix := cluster.Containers.CloudVMs.TagKeyPrefix
tags[tagKeyPrefix+"CloudTestPID"] = fmt.Sprintf("%d", os.Getpid())
if !(&tester{
- Logger: logger,
- Tags: tags,
- TagKeyPrefix: tagKeyPrefix,
- SetID: cloud.InstanceSetID(*instanceSetID),
- DestroyExisting: *destroyExisting,
- ProbeInterval: cluster.Containers.CloudVMs.ProbeInterval.Duration(),
- SyncInterval: cluster.Containers.CloudVMs.SyncInterval.Duration(),
- TimeoutBooting: cluster.Containers.CloudVMs.TimeoutBooting.Duration(),
- Driver: driver,
- DriverParameters: cluster.Containers.CloudVMs.DriverParameters,
- ImageID: cloud.ImageID(*imageID),
- InstanceType: it,
- SSHKey: key,
- SSHPort: cluster.Containers.CloudVMs.SSHPort,
- BootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
- ShellCommand: *shellCommand,
+ Logger: logger,
+ Tags: tags,
+ TagKeyPrefix: tagKeyPrefix,
+ SetID: cloud.InstanceSetID(*instanceSetID),
+ DestroyExisting: *destroyExisting,
+ ProbeInterval: cluster.Containers.CloudVMs.ProbeInterval.Duration(),
+ SyncInterval: cluster.Containers.CloudVMs.SyncInterval.Duration(),
+ TimeoutBooting: cluster.Containers.CloudVMs.TimeoutBooting.Duration(),
+ Driver: driver,
+ DriverParameters: cluster.Containers.CloudVMs.DriverParameters,
+ ImageID: cloud.ImageID(*imageID),
+ InstanceType: it,
+ SSHKey: key,
+ SSHPort: cluster.Containers.CloudVMs.SSHPort,
+ BootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ InstanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
+ ShellCommand: *shellCommand,
PauseBeforeDestroy: func() {
if *pauseBeforeDestroy {
logger.Info("waiting for operator to press Enter")
diff --git a/lib/cloud/cloudtest/tester.go b/lib/cloud/cloudtest/tester.go
index 9fd7c9e74..41e8f658a 100644
--- a/lib/cloud/cloudtest/tester.go
+++ b/lib/cloud/cloudtest/tester.go
@@ -27,23 +27,24 @@ var (
// configuration. Run() should be called only once, after assigning
// suitable values to public fields.
type tester struct {
- Logger logrus.FieldLogger
- Tags cloud.SharedResourceTags
- TagKeyPrefix string
- SetID cloud.InstanceSetID
- DestroyExisting bool
- ProbeInterval time.Duration
- SyncInterval time.Duration
- TimeoutBooting time.Duration
- Driver cloud.Driver
- DriverParameters json.RawMessage
- InstanceType arvados.InstanceType
- ImageID cloud.ImageID
- SSHKey ssh.Signer
- SSHPort string
- BootProbeCommand string
- ShellCommand string
- PauseBeforeDestroy func()
+ Logger logrus.FieldLogger
+ Tags cloud.SharedResourceTags
+ TagKeyPrefix string
+ SetID cloud.InstanceSetID
+ DestroyExisting bool
+ ProbeInterval time.Duration
+ SyncInterval time.Duration
+ TimeoutBooting time.Duration
+ Driver cloud.Driver
+ DriverParameters json.RawMessage
+ InstanceType arvados.InstanceType
+ ImageID cloud.ImageID
+ SSHKey ssh.Signer
+ SSHPort string
+ BootProbeCommand string
+ InstanceInitCommand cloud.InitCommand
+ ShellCommand string
+ PauseBeforeDestroy func()
is cloud.InstanceSet
testInstance *worker.TagVerifier
@@ -127,7 +128,7 @@ func (t *tester) Run() bool {
defer t.destroyTestInstance()
bootDeadline := time.Now().Add(t.TimeoutBooting)
- initCommand := worker.TagVerifier{Instance: nil, Secret: t.secret, ReportVerified: nil}.InitCommand()
+ initCommand := worker.TagVerifier{Instance: nil, Secret: t.secret, ReportVerified: nil}.InitCommand() + "\n" + t.InstanceInitCommand
t.Logger.WithFields(logrus.Fields{
"InstanceType": t.InstanceType.Name,
diff --git a/lib/cloud/ec2/ec2.go b/lib/cloud/ec2/ec2.go
index a74f12561..81e1f8b00 100644
--- a/lib/cloud/ec2/ec2.go
+++ b/lib/cloud/ec2/ec2.go
@@ -149,39 +149,10 @@ func (instanceSet *ec2InstanceSet) Create(
initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
- md5keyFingerprint, sha1keyFingerprint, err := awsKeyFingerprint(publicKey)
+ keyname, err := instanceSet.getKeyName(publicKey)
if err != nil {
- return nil, fmt.Errorf("Could not make key fingerprint: %v", err)
- }
- instanceSet.keysMtx.Lock()
- var keyname string
- var ok bool
- if keyname, ok = instanceSet.keys[md5keyFingerprint]; !ok {
- keyout, err := instanceSet.client.DescribeKeyPairs(&ec2.DescribeKeyPairsInput{
- Filters: []*ec2.Filter{{
- Name: aws.String("fingerprint"),
- Values: []*string{&md5keyFingerprint, &sha1keyFingerprint},
- }},
- })
- if err != nil {
- return nil, fmt.Errorf("Could not search for keypair: %v", err)
- }
-
- if len(keyout.KeyPairs) > 0 {
- keyname = *(keyout.KeyPairs[0].KeyName)
- } else {
- keyname = "arvados-dispatch-keypair-" + md5keyFingerprint
- _, err := instanceSet.client.ImportKeyPair(&ec2.ImportKeyPairInput{
- KeyName: &keyname,
- PublicKeyMaterial: ssh.MarshalAuthorizedKey(publicKey),
- })
- if err != nil {
- return nil, fmt.Errorf("Could not import keypair: %v", err)
- }
- }
- instanceSet.keys[md5keyFingerprint] = keyname
+ return nil, err
}
- instanceSet.keysMtx.Unlock()
ec2tags := []*ec2.Tag{}
for k, v := range newTags {
@@ -257,6 +228,40 @@ func (instanceSet *ec2InstanceSet) Create(
}, nil
}
+func (instanceSet *ec2InstanceSet) getKeyName(publicKey ssh.PublicKey) (string, error) {
+ instanceSet.keysMtx.Lock()
+ defer instanceSet.keysMtx.Unlock()
+ md5keyFingerprint, sha1keyFingerprint, err := awsKeyFingerprint(publicKey)
+ if err != nil {
+ return "", fmt.Errorf("Could not make key fingerprint: %v", err)
+ }
+ if keyname, ok := instanceSet.keys[md5keyFingerprint]; ok {
+ return keyname, nil
+ }
+ keyout, err := instanceSet.client.DescribeKeyPairs(&ec2.DescribeKeyPairsInput{
+ Filters: []*ec2.Filter{{
+ Name: aws.String("fingerprint"),
+ Values: []*string{&md5keyFingerprint, &sha1keyFingerprint},
+ }},
+ })
+ if err != nil {
+ return "", fmt.Errorf("Could not search for keypair: %v", err)
+ }
+ if len(keyout.KeyPairs) > 0 {
+ return *(keyout.KeyPairs[0].KeyName), nil
+ }
+ keyname := "arvados-dispatch-keypair-" + md5keyFingerprint
+ _, err = instanceSet.client.ImportKeyPair(&ec2.ImportKeyPairInput{
+ KeyName: &keyname,
+ PublicKeyMaterial: ssh.MarshalAuthorizedKey(publicKey),
+ })
+ if err != nil {
+ return "", fmt.Errorf("Could not import keypair: %v", err)
+ }
+ instanceSet.keys[md5keyFingerprint] = keyname
+ return keyname, nil
+}
+
func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) {
var filters []*ec2.Filter
for k, v := range tags {
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index ad9061942..150c30bc1 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -1396,6 +1396,12 @@ Clusters:
# https://xxxxx.blob.core.windows.net/system/Microsoft.Compute/Images/images/xxxxx.vhd
ImageID: ""
+ # Shell script to run on new instances using the cloud
+ # provider's UserData (EC2) or CustomData (Azure) feature.
+ #
+ # It is not necessary to include a #!/bin/sh line.
+ InstanceInitCommand: ""
+
# An executable file (located on the dispatcher host) to be
# copied to cloud instances at runtime and used as the
# container runner/supervisor. The default value is the
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 01af8e6d5..e91878527 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -108,7 +108,7 @@ type StubInstanceSet struct {
lastInstanceID int
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, initCommand cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
@@ -127,11 +127,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
}
sis.lastInstanceID++
svm := &StubVM{
+ InitCommand: initCommand,
sis: sis,
id: cloud.InstanceID(fmt.Sprintf("inst%d,%s", sis.lastInstanceID, it.ProviderType)),
tags: copyTags(tags),
providerType: it.ProviderType,
- initCommand: cmd,
running: map[string]stubProcess{},
killing: map[string]bool{},
}
@@ -171,6 +171,15 @@ func (sis *StubInstanceSet) Stop() {
sis.stopped = true
}
+func (sis *StubInstanceSet) StubVMs() (svms []*StubVM) {
+ sis.mtx.Lock()
+ defer sis.mtx.Unlock()
+ for _, vm := range sis.servers {
+ svms = append(svms, vm)
+ }
+ return
+}
+
type RateLimitError struct{ Retry time.Time }
func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
@@ -196,10 +205,12 @@ type StubVM struct {
CrashRunningContainer func(arvados.Container)
ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
+ // Populated by (*StubInstanceSet)Create()
+ InitCommand cloud.InitCommand
+
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
- initCommand cloud.InitCommand
providerType string
SSHService SSHService
running map[string]stubProcess
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 4bf969358..3de207ffa 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -106,6 +106,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
newExecutor: newExecutor,
cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
@@ -149,6 +150,7 @@ type Pool struct {
newExecutor func(cloud.Instance) Executor
cluster *arvados.Cluster
bootProbeCommand string
+ instanceInitCommand cloud.InitCommand
runnerSource string
imageID cloud.ImageID
instanceTypes map[string]arvados.InstanceType
@@ -347,7 +349,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
- initCmd := TagVerifier{nil, secret, nil}.InitCommand()
+ initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
index 7b5634605..7f3a1531e 100644
--- a/lib/dispatchcloud/worker/pool_test.go
+++ b/lib/dispatchcloud/worker/pool_test.go
@@ -266,6 +266,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
type2.Name: type2,
type3.Name: type3,
},
+ instanceInitCommand: "echo 'instance init command goes here'",
}
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
@@ -294,6 +295,9 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
return len(pool.workers) == 4
})
+ vms := instanceSet.(*test.StubInstanceSet).StubVMs()
+ c.Check(string(vms[0].InitCommand), check.Matches, `umask 0177 && echo -n "[0-9a-f]+" >/var/run/arvados-instance-secret\necho 'instance init command goes here'`)
+
// Place type3 node on admin-hold
ivs := suite.instancesByType(pool, type3)
c.Assert(ivs, check.HasLen, 1)
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 4673f14bb..1221678cc 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -558,6 +558,7 @@ type CloudVMsConfig struct {
Enable bool
BootProbeCommand string
+ InstanceInitCommand string
DeployRunnerBinary string
ImageID string
MaxCloudOpsPerSecond int
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list