[ARVADOS] updated: 1.1.2-135-g2b41877

Git user git at public.curoverse.com
Sun Jan 28 19:33:39 EST 2018


Summary of changes:
 .../crunch-dispatch-slurm/crunch-dispatch-slurm.go |  2 +-
 .../crunch-dispatch-slurm_test.go                  |  2 +-
 services/dispatchcloud/node_size.go                | 22 +++++----
 .../arvnodeman/computenode/dispatch/slurm.py       | 51 +++++++++++++-------
 services/nodemanager/arvnodeman/jobqueue.py        | 56 +++++++++++++++-------
 .../nodemanager/tests/test_computenode_dispatch.py |  4 +-
 .../tests/test_computenode_dispatch_slurm.py       | 17 ++++++-
 services/nodemanager/tests/test_jobqueue.py        | 20 ++++++--
 8 files changed, 122 insertions(+), 52 deletions(-)

       via  2b41877b842bbf47a0fdea713c9d2fc2fb80664a (commit)
       via  428e8f9663ba05eb28667e02d01675e271c38fdf (commit)
       via  d2369c0fb8742d54f0cdfc88ef485beff62ea277 (commit)
      from  bad6499910388c17c4a54b3128f361aa36a670e1 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2b41877b842bbf47a0fdea713c9d2fc2fb80664a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sun Jan 28 17:22:28 2018 -0500

    12199: Update slurm features when nodes come up.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index c8883c3..d022892 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -8,8 +8,8 @@ from __future__ import absolute_import, print_function
 import subprocess
 import time
 
-from . import \
-    ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
 from . import ComputeNodeShutdownActor as ShutdownActorBase
 from . import ComputeNodeUpdateActor as UpdateActorBase
 from .. import RetryMixin
@@ -20,16 +20,29 @@ class SlurmMixin(object):
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
-    def _set_node_state(self, nodename, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
+    def _update_slurm_node(self, nodename, updates):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+        try:
+            subprocess.check_output(cmd)
+        except:
+            self._logger.error(
+                "SLURM update %r failed", cmd, exc_info=True)
 
     def _get_slurm_state(self, nodename):
         return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
 
 
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+    def create_cloud_node(self):
+        hostname = self.arvados_node.get("hostname")
+        if hostname:
+            self._update_slurm_node(self.arvados_node['hostname'], [
+                'Weight=%i' % int(self.cloud_size.price * 1000),
+                'Features=instancetype='+self.cloud_size.name,
+            ])
+        return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
 class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
@@ -47,7 +60,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self._nodename:
             if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state(self._nodename, 'RESUME')
+                self._update_slurm_node(self._nodename, ['State=RESUME'])
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
@@ -59,7 +72,8 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self.cancel_reason is not None:
             return
         if self._nodename:
-            self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DRAIN', 'Reason=Node Manager shutdown'])
             self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
             self._later.await_slurm_drain()
         else:
@@ -82,15 +96,18 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
 
     def _destroy_node(self):
         if self._nodename:
-            self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DOWN', 'Reason=Node Manager shutdown'])
         super(ComputeNodeShutdownActor, self)._destroy_node()
 
 
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
     def sync_node(self, cloud_node, arvados_node):
-        if arvados_node.get("hostname"):
-            try:
-                subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
-            except:
-                self._logger.error("Unable to set slurm node weight.", exc_info=True)
-        return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+        hostname = arvados_node.get("hostname")
+        if hostname:
+            self._update_slurm_node(hostname, [
+                'Weight=%i' % int(cloud_node.size.price * 1000),
+                'Features=instancetype=' + cloud_node.size.name,
+            ])
+        return super(ComputeNodeUpdateActor, self).sync_node(
+            cloud_node, arvados_node)
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 4b35205..b62ce56 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -21,6 +21,8 @@ from arvnodeman.computenode.driver import BaseComputeNodeDriver
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
     def make_mocks(self, arvados_effect=None):
         if arvados_effect is None:
             arvados_effect = [testutil.arvados_node_mock()]
@@ -35,7 +37,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
             self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
-        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+        self.setup_actor = self.ACTOR_CLASS.start(
             self.timer, self.api_client, self.cloud_client,
             testutil.MockSize(1), arv_node).proxy()
 
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 0b6162d..f896684 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -13,7 +13,10 @@ import mock
 
 import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
 from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+    ComputeNodeShutdownActorMixin, \
+    ComputeNodeSetupActorTestCase, \
+    ComputeNodeUpdateActorTestCase
 
 @mock.patch('subprocess.check_output')
 class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
@@ -123,4 +126,14 @@ class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
         cloud_node = testutil.cloud_node_mock()
         arv_node = testutil.arvados_node_mock()
         self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+    ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+    @mock.patch('subprocess.check_output')
+    def test_update_node_features(self, check_output):
+        self.make_mocks()
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])

commit 428e8f9663ba05eb28667e02d01675e271c38fdf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sun Jan 28 16:52:13 2018 -0500

    12199: Rename instance type features: a1.xl => instancetype=a1.xl
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 44123f8..9eebbab 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -234,7 +234,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
 	} else if err != nil {
 		return nil, err
 	} else {
-		sbatchArgs = append(sbatchArgs, "--constraint="+it.Name)
+		sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
 	}
 
 	return sbatchArgs, nil
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index b822232..583e0d8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -388,7 +388,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
 				{Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
 				{Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
 			},
-			sbatchArgs: []string{"--constraint=a1.small"},
+			sbatchArgs: []string{"--constraint=instancetype=a1.small"},
 		},
 		// No node types configured => no slurm constraint
 		{
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
index 9cb6d3f..29832da 100644
--- a/services/dispatchcloud/node_size.go
+++ b/services/dispatchcloud/node_size.go
@@ -66,15 +66,19 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
 //
 // SlurmNodeTypeFeatureKludge does a test-and-fix operation
 // immediately, and then periodically, in case slurm restarts and
-// forgets the list of valid features. It never returns, so it should
-// generally be invoked with "go".
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
 func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
-	var types []string
+	if len(cc.InstanceTypes) == 0 {
+		return
+	}
+	var features []string
 	for _, it := range cc.InstanceTypes {
-		types = append(types, it.Name)
+		features = append(features, "instancetype="+it.Name)
 	}
 	for {
-		slurmKludge(types)
+		slurmKludge(features)
 		time.Sleep(time.Minute)
 	}
 }
@@ -85,8 +89,8 @@ var (
 	slurmErrBadGres    = "Invalid generic resource"
 )
 
-func slurmKludge(types []string) {
-	cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(types, "&"), "true")
+func slurmKludge(features []string) {
+	cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
 	out, err := cmd.CombinedOutput()
 	switch {
 	case err == nil:
@@ -95,8 +99,8 @@ func slurmKludge(types []string) {
 
 	case bytes.Contains(out, []byte(slurmErrBadFeature)):
 		log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
-		for _, features := range []string{strings.Join(types, ","), ""} {
-			cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+features)
+		for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+			cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
 			log.Printf("running: %q %q", cmd.Path, cmd.Args)
 			out, err := cmd.CombinedOutput()
 			if err != nil {

commit d2369c0fb8742d54f0cdfc88ef485beff62ea277
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Sun Jan 28 16:32:22 2018 -0500

    12199: Use node type X to run jobs with constraint instancetype=X.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 4d2d3e0..ebc90e4 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -6,6 +6,7 @@
 from __future__ import absolute_import, print_function
 
 import logging
+import re
 import subprocess
 
 import arvados.util
@@ -74,13 +75,15 @@ class ServerCalculator(object):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
+        specified_name = constraints.get('instance_type')
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
         for size in self.cloud_sizes:
-            if size.meets_constraints(**wants):
-                return size
+            if (size.meets_constraints(**wants) and
+                (specified_name is None or size.name == specified_name)):
+                    return size
         return None
 
     def servers_for_queue(self, queue):
@@ -92,8 +95,7 @@ class ServerCalculator(object):
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
                 unsatisfiable_jobs[job['uuid']] = (
-                    'Requirements for a single node exceed the available '
-                    'cloud node size')
+                    "Constraints cannot be satisfied by any node type")
             elif (want_count > self.max_nodes):
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's min_nodes constraint is greater than the configured "
@@ -152,21 +154,43 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         queuelist = []
         if self.slurm_queue:
             # cpus, memory, tempory disk space, reason, job name
-            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
             for out in squeue_out.splitlines():
                 try:
-                    cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
-                        queuelist.append({
-                            "uuid": jobname,
-                            "runtime_constraints": {
-                                "min_cores_per_node": cpu,
-                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                            }
-                        })
+                    cpu, ram, disk, reason, jobname, features = out.split("|", 5)
                 except ValueError:
-                    pass
+                    self._logger.warning("ignored malformed line in squeue output: %r", out)
+                    continue
+                if '-dz642-' not in jobname:
+                    continue
+                if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+                    continue
+
+                for feature in features.split(','):
+                    m = re.match(r'instancetype=(.*)', feature)
+                    if not m:
+                        continue
+                    instance_type = m.group(1)
+                    # Ignore cpu/ram/scratch requirements, bring up
+                    # the requested node type.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "instance_type": instance_type,
+                        }
+                    })
+                    break
+                else:
+                    # No instance type specified. Choose a node type
+                    # to suit cpu/ram/scratch requirements.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        }
+                    })
 
         if self.jobs_queue:
             queuelist.extend(self._client.jobs().queue().execute()['items'])
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index b1d5e00..a64fb23 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -159,7 +159,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
         job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
         container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
-        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+        mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
 
         self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list(self, mock_squeue):
-        mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list_suffix(self, mock_squeue):
-        mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -207,6 +207,16 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
         self.subscriber.assert_called_with([testutil.MockSize(1),
                                             testutil.MockSize(2)])
 
+    @mock.patch("subprocess.check_output")
+    def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+        super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+                                                                True, True)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
+        self.subscriber.assert_called_with([testutil.MockSize(2)])
+
     def test_coerce_to_mb(self):
         self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
         self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list