[ARVADOS] updated: 1.1.2-135-g2b41877
Git user
git at public.curoverse.com
Sun Jan 28 19:33:39 EST 2018
Summary of changes:
.../crunch-dispatch-slurm/crunch-dispatch-slurm.go | 2 +-
.../crunch-dispatch-slurm_test.go | 2 +-
services/dispatchcloud/node_size.go | 22 +++++----
.../arvnodeman/computenode/dispatch/slurm.py | 51 +++++++++++++-------
services/nodemanager/arvnodeman/jobqueue.py | 56 +++++++++++++++-------
.../nodemanager/tests/test_computenode_dispatch.py | 4 +-
.../tests/test_computenode_dispatch_slurm.py | 17 ++++++-
services/nodemanager/tests/test_jobqueue.py | 20 ++++++--
8 files changed, 122 insertions(+), 52 deletions(-)
via 2b41877b842bbf47a0fdea713c9d2fc2fb80664a (commit)
via 428e8f9663ba05eb28667e02d01675e271c38fdf (commit)
via d2369c0fb8742d54f0cdfc88ef485beff62ea277 (commit)
from bad6499910388c17c4a54b3128f361aa36a670e1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 2b41877b842bbf47a0fdea713c9d2fc2fb80664a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 17:22:28 2018 -0500
12199: Update slurm features when nodes come up.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index c8883c3..d022892 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -8,8 +8,8 @@ from __future__ import absolute_import, print_function
import subprocess
import time
-from . import \
- ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
from . import ComputeNodeShutdownActor as ShutdownActorBase
from . import ComputeNodeUpdateActor as UpdateActorBase
from .. import RetryMixin
@@ -20,16 +20,29 @@ class SlurmMixin(object):
'fail\n', 'fail*\n'])
SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
- def _set_node_state(self, nodename, state, *args):
- cmd = ['scontrol', 'update', 'NodeName=' + nodename,
- 'State=' + state]
- cmd.extend(args)
- subprocess.check_output(cmd)
+ def _update_slurm_node(self, nodename, updates):
+ cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+ try:
+ subprocess.check_output(cmd)
+ except:
+ self._logger.error(
+ "SLURM update %r failed", cmd, exc_info=True)
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+ def create_cloud_node(self):
+ hostname = self.arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_node(self.arvados_node['hostname'], [
+ 'Weight=%i' % int(self.cloud_size.price * 1000),
+ 'Features=instancetype='+self.cloud_size.name,
+ ])
+ return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def on_start(self):
arv_node = self._arvados_node()
@@ -47,7 +60,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
if self._nodename:
if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
- self._set_node_state(self._nodename, 'RESUME')
+ self._update_slurm_node(self._nodename, ['State=RESUME'])
else:
# Node is in a state such as 'idle' or 'alloc' so don't
# try to resume it because that will just raise an error.
@@ -59,7 +72,8 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
if self.cancel_reason is not None:
return
if self._nodename:
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DRAIN', 'Reason=Node Manager shutdown'])
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
else:
@@ -82,15 +96,18 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def _destroy_node(self):
if self._nodename:
- self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DOWN', 'Reason=Node Manager shutdown'])
super(ComputeNodeShutdownActor, self)._destroy_node()
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
- if arvados_node.get("hostname"):
- try:
- subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
- except:
- self._logger.error("Unable to set slurm node weight.", exc_info=True)
- return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+ hostname = arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_node(hostname, [
+ 'Weight=%i' % int(cloud_node.size.price * 1000),
+ 'Features=instancetype=' + cloud_node.size.name,
+ ])
+ return super(ComputeNodeUpdateActor, self).sync_node(
+ cloud_node, arvados_node)
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 4b35205..b62ce56 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -21,6 +21,8 @@ from arvnodeman.computenode.driver import BaseComputeNodeDriver
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
arvados_effect = [testutil.arvados_node_mock()]
@@ -35,7 +37,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
- self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+ self.setup_actor = self.ACTOR_CLASS.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 0b6162d..f896684 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -13,7 +13,10 @@ import mock
import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+ ComputeNodeShutdownActorMixin, \
+ ComputeNodeSetupActorTestCase, \
+ ComputeNodeUpdateActorTestCase
@mock.patch('subprocess.check_output')
class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
@@ -123,4 +126,14 @@ class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
cloud_node = testutil.cloud_node_mock()
arv_node = testutil.arvados_node_mock()
self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+ ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+ @mock.patch('subprocess.check_output')
+ def test_update_node_features(self, check_output):
+ self.make_mocks()
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
commit 428e8f9663ba05eb28667e02d01675e271c38fdf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 16:52:13 2018 -0500
12199: Rename instance type features: a1.xl => instancetype=a1.xl
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 44123f8..9eebbab 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -234,7 +234,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
} else if err != nil {
return nil, err
} else {
- sbatchArgs = append(sbatchArgs, "--constraint="+it.Name)
+ sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
}
return sbatchArgs, nil
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index b822232..583e0d8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -388,7 +388,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
{Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
{Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
},
- sbatchArgs: []string{"--constraint=a1.small"},
+ sbatchArgs: []string{"--constraint=instancetype=a1.small"},
},
// No node types configured => no slurm constraint
{
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
index 9cb6d3f..29832da 100644
--- a/services/dispatchcloud/node_size.go
+++ b/services/dispatchcloud/node_size.go
@@ -66,15 +66,19 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
//
// SlurmNodeTypeFeatureKludge does a test-and-fix operation
// immediately, and then periodically, in case slurm restarts and
-// forgets the list of valid features. It never returns, so it should
-// generally be invoked with "go".
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
- var types []string
+ if len(cc.InstanceTypes) == 0 {
+ return
+ }
+ var features []string
for _, it := range cc.InstanceTypes {
- types = append(types, it.Name)
+ features = append(features, "instancetype="+it.Name)
}
for {
- slurmKludge(types)
+ slurmKludge(features)
time.Sleep(time.Minute)
}
}
@@ -85,8 +89,8 @@ var (
slurmErrBadGres = "Invalid generic resource"
)
-func slurmKludge(types []string) {
- cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(types, "&"), "true")
+func slurmKludge(features []string) {
+ cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
out, err := cmd.CombinedOutput()
switch {
case err == nil:
@@ -95,8 +99,8 @@ func slurmKludge(types []string) {
case bytes.Contains(out, []byte(slurmErrBadFeature)):
log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
- for _, features := range []string{strings.Join(types, ","), ""} {
- cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+features)
+ for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+ cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
log.Printf("running: %q %q", cmd.Path, cmd.Args)
out, err := cmd.CombinedOutput()
if err != nil {
commit d2369c0fb8742d54f0cdfc88ef485beff62ea277
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 16:32:22 2018 -0500
12199: Use node type X to run jobs with constraint instancetype=X.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 4d2d3e0..ebc90e4 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -6,6 +6,7 @@
from __future__ import absolute_import, print_function
import logging
+import re
import subprocess
import arvados.util
@@ -74,13 +75,15 @@ class ServerCalculator(object):
return fallback
def cloud_size_for_constraints(self, constraints):
+ specified_name = constraints.get('instance_type')
want_value = lambda key: self.coerce_int(constraints.get(key), 0)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
for size in self.cloud_sizes:
- if size.meets_constraints(**wants):
- return size
+ if (size.meets_constraints(**wants) and
+ (specified_name is None or size.name == specified_name)):
+ return size
return None
def servers_for_queue(self, queue):
@@ -92,8 +95,7 @@ class ServerCalculator(object):
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
unsatisfiable_jobs[job['uuid']] = (
- 'Requirements for a single node exceed the available '
- 'cloud node size')
+ "Constraints cannot be satisfied by any node type")
elif (want_count > self.max_nodes):
unsatisfiable_jobs[job['uuid']] = (
"Job's min_nodes constraint is greater than the configured "
@@ -152,21 +154,43 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
queuelist = []
if self.slurm_queue:
# cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
for out in squeue_out.splitlines():
try:
- cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
- queuelist.append({
- "uuid": jobname,
- "runtime_constraints": {
- "min_cores_per_node": cpu,
- "min_ram_mb_per_node": self.coerce_to_mb(ram),
- "min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
- })
+ cpu, ram, disk, reason, jobname, features = out.split("|", 5)
except ValueError:
- pass
+ self._logger.warning("ignored malformed line in squeue output: %r", out)
+ continue
+ if '-dz642-' not in jobname:
+ continue
+ if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+ continue
+
+ for feature in features.split(','):
+ m = re.match(r'instancetype=(.*)', feature)
+ if not m:
+ continue
+ instance_type = m.group(1)
+ # Ignore cpu/ram/scratch requirements, bring up
+ # the requested node type.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "instance_type": instance_type,
+ }
+ })
+ break
+ else:
+ # No instance type specified. Choose a node type
+ # to suit cpu/ram/scratch requirements.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "min_cores_per_node": cpu,
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+ }
+ })
if self.jobs_queue:
queuelist.extend(self._client.jobs().queue().execute()['items'])
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index b1d5e00..a64fb23 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -159,7 +159,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
- mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+ mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -207,6 +207,16 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
+ @mock.patch("subprocess.check_output")
+ def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+ super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.subscriber.assert_called_with([testutil.MockSize(2)])
+
def test_coerce_to_mb(self):
self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list