[ARVADOS] created: 1.1.2-142-g3c5378e
Git user
git at public.curoverse.com
Mon Feb 5 09:49:23 EST 2018
at 3c5378e79de03e312268e31a10fc93650cafbb10 (commit)
commit 3c5378e79de03e312268e31a10fc93650cafbb10
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Feb 2 16:46:32 2018 -0500
12199: Track slurm node features, avoid redundant updates.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index 13a2a6a..e948c51 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -108,12 +108,11 @@ class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
"""Keep SLURM's node properties up to date."""
hostname = arvados_node.get("hostname")
- if hostname:
- # This is only needed when slurm has restarted and lost
- # the dynamically configured node properties. So it's
- # usually redundant, but detecting when it's necessary
- # would be about the same amount of work as doing it
- # repetitively.
+ features = arvados_node.get("slurm_node_features", "").split(",")
+ sizefeature = "instancetype=" + cloud_node.size.name
+ if hostname and sizefeature not in features:
+ # This probably means SLURM has restarted and lost our
+ # dynamically configured node weights and features.
self._update_slurm_size_attrs(hostname, cloud_node.size)
return super(ComputeNodeUpdateActor, self).sync_node(
cloud_node, arvados_node)
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
index 70ad54d..4b9d5b6 100644
--- a/services/nodemanager/arvnodeman/nodelist.py
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -15,8 +15,9 @@ import arvados.util
class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to poll the Arvados node list.
- This actor regularly polls the list of Arvados node records, and
- sends it to subscribers.
+ This actor regularly polls the list of Arvados node records,
+ augments it with the latest SLURM node info (`sinfo`), and sends
+ it to subscribers.
"""
def is_common_error(self, exception):
@@ -29,28 +30,32 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
nodelist = arvados.util.list_all(self._client.nodes().list)
# node hostname, state
- sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
+ sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n|%t|%f"])
nodestates = {}
+ nodefeatures = {}
for out in sinfo_out.splitlines():
try:
- nodename, state = out.split(" ", 2)
- if state in ('alloc', 'alloc*',
- 'comp', 'comp*',
- 'mix', 'mix*',
- 'drng', 'drng*'):
- nodestates[nodename] = 'busy'
- elif state in ('idle', 'fail'):
- nodestates[nodename] = state
- else:
- nodestates[nodename] = 'down'
+ nodename, state, features = out.split("|", 3)
except ValueError:
- pass
+ continue
+ if state in ('alloc', 'alloc*',
+ 'comp', 'comp*',
+ 'mix', 'mix*',
+ 'drng', 'drng*'):
+ nodestates[nodename] = 'busy'
+ elif state in ('idle', 'fail'):
+ nodestates[nodename] = state
+ else:
+ nodestates[nodename] = 'down'
+ if features != "(null)":
+ nodefeatures[nodename] = features
for n in nodelist:
if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
n["crunch_worker_state"] = nodestates[n["hostname"]]
else:
n["crunch_worker_state"] = 'down'
+ n["slurm_node_features"] = nodefeatures.get(n["hostname"], "")
return nodelist
diff --git a/services/nodemanager/tests/test_nodelist.py b/services/nodemanager/tests/test_nodelist.py
index 11f41b8..5becd0c 100644
--- a/services/nodemanager/tests/test_nodelist.py
+++ b/services/nodemanager/tests/test_nodelist.py
@@ -42,10 +42,15 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_update_from_sinfo(self, sinfo_mock):
- sinfo_mock.return_value = "compute99 alloc"
- node = testutil.arvados_node_mock()
+ sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
+compute2|alloc|(null)
+notarvados12345|idle|(null)
+"""
+ nodeIdle = testutil.arvados_node_mock(node_num=1)
+ nodeBusy = testutil.arvados_node_mock(node_num=2)
+ nodeMissing = testutil.arvados_node_mock(node_num=99)
self.build_monitor([{
- 'items': [node],
+ 'items': [nodeIdle, nodeBusy, nodeMissing],
'items_available': 1,
'offset': 0
}, {
@@ -53,11 +58,18 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
'items_available': 1,
'offset': 1
}])
- self.monitor.subscribe_to(node['uuid'],
+ self.monitor.subscribe_to(nodeMissing['uuid'],
self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
- self.subscriber.assert_called_with(node)
- self.assertEqual("busy", node["crunch_worker_state"])
+ self.subscriber.assert_called_with(nodeMissing)
+
+ self.assertEqual("idle", nodeIdle["crunch_worker_state"])
+ self.assertEqual("busy", nodeBusy["crunch_worker_state"])
+ self.assertEqual("down", nodeMissing["crunch_worker_state"])
+
+ self.assertEqual("instancetype=a1.test", nodeIdle["slurm_node_features"])
+ self.assertEqual("", nodeBusy["slurm_node_features"])
+ self.assertEqual("", nodeMissing["slurm_node_features"])
class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
commit 3ccc70f4bb06bab6c0b3c71f555cba24cc5c6a47
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Jan 30 13:33:12 2018 -0500
12199: Keep SLURM's node properties up to date.
Change the semantics of ComputeNodeUpdateActor.sync_node so it gets
called every time a new Arvados node record appears, even if hostnames
match. The base actor's implementation now compares hostnames itself
before calling the cloud driver.
This allows the slurm update actor's sync_node method to sync SLURM
state periodically, even if hostnames don't go out of sync.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 67ea8a2..597a011 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -317,7 +317,8 @@ class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
+ if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
class ComputeNodeMonitorActor(config.actor_class):
@@ -328,14 +329,13 @@ class ComputeNodeMonitorActor(config.actor_class):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+ timer_actor, update_actor, cloud_client,
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
- self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
@@ -488,8 +488,11 @@ class ComputeNodeMonitorActor(config.actor_class):
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
- # If the cloud node's FQDN doesn't match what's in the Arvados node
- # record, make them match.
+ """Called when the latest Arvados node record is retrieved.
+
+ Calls the updater's sync_node() method.
+
+ """
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
@@ -498,7 +501,5 @@ class ComputeNodeMonitorActor(config.actor_class):
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- if (self._cloud_node_fqdn(self.cloud_node) !=
- arvados_node_fqdn(self.arvados_node)):
- self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index d022892..13a2a6a 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -28,6 +28,12 @@ class SlurmMixin(object):
self._logger.error(
"SLURM update %r failed", cmd, exc_info=True)
+ def _update_slurm_size_attrs(self, nodename, size):
+ self._update_slurm_node(nodename, [
+ 'Weight=%i' % int(size.price * 1000),
+ 'Features=instancetype=' + size.name,
+ ])
+
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
@@ -36,10 +42,7 @@ class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
def create_cloud_node(self):
hostname = self.arvados_node.get("hostname")
if hostname:
- self._update_slurm_node(self.arvados_node['hostname'], [
- 'Weight=%i' % int(self.cloud_size.price * 1000),
- 'Features=instancetype='+self.cloud_size.name,
- ])
+ self._update_slurm_size_attrs(hostname, self.cloud_size)
return super(ComputeNodeSetupActor, self).create_cloud_node()
@@ -103,11 +106,14 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
+ """Keep SLURM's node properties up to date."""
hostname = arvados_node.get("hostname")
if hostname:
- self._update_slurm_node(hostname, [
- 'Weight=%i' % int(cloud_node.size.price * 1000),
- 'Features=instancetype=' + cloud_node.size.name,
- ])
+ # This is only needed when slurm has restarted and lost
+ # the dynamically configured node properties. So it's
+ # usually redundant, but detecting when it's necessary
+ # would be about the same amount of work as doing it
+ # repetitively.
+ self._update_slurm_size_attrs(hostname, cloud_node.size)
return super(ComputeNodeUpdateActor, self).sync_node(
cloud_node, arvados_node)
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index dd441ed..73b58bf 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -167,7 +167,6 @@ class NodeManagerDaemonActor(actor_class):
cloud_node=cloud_node,
cloud_node_start_time=start_time,
shutdown_timer=shutdown_timer,
- cloud_fqdn_func=self._cloud_driver.node_fqdn,
update_actor=self._cloud_updater,
timer_actor=self._timer,
arvados_node=None,
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 1102bf7..0a2deb8 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -63,6 +63,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
@@ -78,7 +79,8 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
- self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+ self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
self.setup_actor.cloud_node.get(self.TIMEOUT))
@@ -195,7 +197,7 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
start_time = time.time()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
self.timer, self.cloud_client, self.arvados_client, monitor_actor,
@@ -333,7 +335,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
start_time = time.time()
self.node_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
arv_node, boot_fail_after=300).proxy()
self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
@@ -518,19 +520,10 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
self.assertEqual(testutil.ip_address_mock(4),
current_arvados['ip_address'])
- def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+ def test_update_arvados_node_calls_sync_node(self):
self.make_mocks(5)
self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
self.make_actor()
arv_node = testutil.arvados_node_mock(5)
self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
self.assertEqual(1, self.updates.sync_node.call_count)
-
- def test_update_arvados_node_skips_sync_when_fqdn_match(self):
- self.make_mocks(6)
- arv_node = testutil.arvados_node_mock(6)
- self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
- n=arv_node)
- self.make_actor()
- self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
- self.assertEqual(0, self.updates.sync_node.call_count)
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index f896684..b61db5c 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -133,7 +133,11 @@ class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
@mock.patch('subprocess.check_output')
def test_update_node_features(self, check_output):
- self.make_mocks()
+ # `scontrol update` happens only if the Arvados node record
+ # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+ # uses mocks with scrubbed hostnames, so we override with the
+ # default testutil.arvados_node_mock.
+ self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
diff --git a/services/nodemanager/tests/testutil.py b/services/nodemanager/tests/testutil.py
index 6e13437..d13475b 100644
--- a/services/nodemanager/tests/testutil.py
+++ b/services/nodemanager/tests/testutil.py
@@ -55,7 +55,7 @@ def cloud_object_mock(name_id, **extra):
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
# using this function for lookups.
- return node.extra.get('testname', 'NoTestName')
+ return node.extra.get('testname', node.name+'.NoTestName.invalid')
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
commit 5120666e073018f6821bab07f0bbb788098f97b1
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Jan 29 20:03:23 2018 -0500
12199: Assign slot and hostname when creating/reusing a node record.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/api/app/controllers/application_controller.rb b/services/api/app/controllers/application_controller.rb
index c94ce89..c4f64f6 100644
--- a/services/api/app/controllers/application_controller.rb
+++ b/services/api/app/controllers/application_controller.rb
@@ -554,6 +554,10 @@ class ApplicationController < ActionController::Base
}
end
+ def self._update_requires_parameters
+ {}
+ end
+
def self._index_requires_parameters
{
filters: { type: 'array', required: false },
diff --git a/services/api/app/controllers/arvados/v1/nodes_controller.rb b/services/api/app/controllers/arvados/v1/nodes_controller.rb
index 7ee8c2f..73f1dee 100644
--- a/services/api/app/controllers/arvados/v1/nodes_controller.rb
+++ b/services/api/app/controllers/arvados/v1/nodes_controller.rb
@@ -20,6 +20,32 @@ class Arvados::V1::NodesController < ApplicationController
{ ping_secret: {required: true} }
end
+ def self._create_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def self._update_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def create
+ @object = model_class.new(resource_attrs)
+ @object.assign_slot if params[:assign_slot]
+ @object.save!
+ show
+ end
+
+ def update
+ attrs_to_update = resource_attrs.reject { |k,v|
+ [:kind, :etag, :href].index k
+ }
+ @object.update_attributes!(attrs_to_update)
+ @object.assign_slot if params[:assign_slot]
+ show
+ end
+
def ping
act_as_system_user do
@object = Node.where(uuid: (params[:id] || params[:uuid])).first
diff --git a/services/api/app/models/node.rb b/services/api/app/models/node.rb
index bf1b636..3d8b91b 100644
--- a/services/api/app/models/node.rb
+++ b/services/api/app/models/node.rb
@@ -106,27 +106,7 @@ class Node < ArvadosModel
end
end
- # Assign slot_number
- if self.slot_number.nil?
- while true
- n = self.class.available_slot_number
- if n.nil?
- raise "No available node slots"
- end
- self.slot_number = n
- begin
- self.save!
- break
- rescue ActiveRecord::RecordNotUnique
- # try again
- end
- end
- end
-
- # Assign hostname
- if self.hostname.nil? and Rails.configuration.assign_node_hostname
- self.hostname = self.class.hostname_for_slot(self.slot_number)
- end
+ assign_slot
# Record other basic stats
['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
@@ -140,8 +120,30 @@ class Node < ArvadosModel
save!
end
+ def assign_slot
+ return if self.slot_number.andand > 0
+ while true
+ self.slot_number = self.class.available_slot_number
+ if self.slot_number.nil?
+ raise "No available node slots"
+ end
+ begin
+ save!
+ return assign_hostname
+ rescue ActiveRecord::RecordNotUnique
+ # try again
+ end
+ end
+ end
+
protected
+ def assign_hostname
+ if self.hostname.nil? and Rails.configuration.assign_node_hostname
+ self.hostname = self.class.hostname_for_slot(self.slot_number)
+ end
+ end
+
def self.available_slot_number
# Join the sequence 1..max with the nodes table. Return the first
# (i.e., smallest) value that doesn't match the slot_number of any
diff --git a/services/api/test/functional/arvados/v1/nodes_controller_test.rb b/services/api/test/functional/arvados/v1/nodes_controller_test.rb
index f9e5be4..c198c4c 100644
--- a/services/api/test/functional/arvados/v1/nodes_controller_test.rb
+++ b/services/api/test/functional/arvados/v1/nodes_controller_test.rb
@@ -78,6 +78,40 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
assert_not_nil json_response['uuid']
assert_not_nil json_response['info'].is_a? Hash
assert_not_nil json_response['info']['ping_secret']
+ assert_nil json_response['slot_number']
+ assert_nil json_response['hostname']
+ end
+
+ test "create node and assign slot" do
+ authorize_with :admin
+ post :create, {node: {}, assign_slot: true}
+ assert_response :success
+ assert_not_nil json_response['uuid']
+ assert_not_nil json_response['info'].is_a? Hash
+ assert_not_nil json_response['info']['ping_secret']
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+ end
+
+ test "update node and assign slot" do
+ authorize_with :admin
+ node = nodes(:new_with_no_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+ end
+
+ test "update node and assign slot, don't clobber hostname" do
+ authorize_with :admin
+ node = nodes(:new_with_custom_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "custom1", json_response['hostname']
end
test "ping adds node stats to info" do
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 6c61e32..67ea8a2 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -113,14 +113,16 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self.arvados_node = self._arvados.nodes().create(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._clean_arvados_node(
- node, "Prepared by Node Manager")
+ self._clean_arvados_node(node, "Prepared by Node Manager")
+ self.arvados_node = self._arvados.nodes().update(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index b62ce56..1102bf7 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -25,7 +25,12 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
- arvados_effect = [testutil.arvados_node_mock()]
+ arvados_effect = [testutil.arvados_node_mock(
+ slot_number=None,
+ hostname=None,
+ first_ping_at=None,
+ last_ping_at=None,
+ )]
self.arvados_effect = arvados_effect
self.timer = testutil.MockTimer()
self.api_client = mock.MagicMock(name='api_client')
commit 80da19707253af74bd78c374bfcab64b04d2dbde
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Jan 29 11:48:43 2018 -0500
12199: Derate advertised node sizes by 5%.
This matches nodemanager's default behavior. It aims to account for
the difference between advertised and actual memory capacity, as well
as making some room for system processes on the host.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 583e0d8..73d9c16 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -387,8 +387,9 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
{Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
{Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
{Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+ {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
},
- sbatchArgs: []string{"--constraint=instancetype=a1.small"},
+ sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
},
// No node types configured => no slurm constraint
{
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
index 29832da..eeb3b81 100644
--- a/services/dispatchcloud/node_size.go
+++ b/services/dispatchcloud/node_size.go
@@ -18,6 +18,7 @@ import (
var (
ErrConstraintsNotSatisfiable = errors.New("constraints not satisfiable by any configured instance type")
ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+ discountConfiguredRAMPercent = 5
)
// ChooseInstanceType returns the cheapest available
@@ -26,6 +27,8 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
needVCPUs := ctr.RuntimeConstraints.VCPUs
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+ needRAM = needRAM * 100 / int64(100-discountConfiguredRAMPercent)
+
if len(cc.InstanceTypes) == 0 {
err = ErrInstanceTypesNotConfigured
return
commit ba01bc2ca5bb7351442f58709b7f85aa84a20342
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Mon Jan 29 10:22:26 2018 -0500
12199: Log timestamps in slow integration tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
index 24ae701..7b129f5 100755
--- a/services/nodemanager/tests/integration_test.py
+++ b/services/nodemanager/tests/integration_test.py
@@ -25,9 +25,13 @@ from functools import partial
import arvados
import StringIO
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
detail = logging.getLogger("detail")
detail.setLevel(logging.INFO)
@@ -35,7 +39,9 @@ if os.environ.get("ANMTEST_LOGLEVEL"):
detail_content = sys.stderr
else:
detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
commit a88a3f5710af6eb159299231779a1e4554aa772a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 23:04:22 2018 -0500
12199: Update integration tests.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
index d5b5554..24ae701 100755
--- a/services/nodemanager/tests/integration_test.py
+++ b/services/nodemanager/tests/integration_test.py
@@ -52,14 +52,14 @@ def update_script(path, val):
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
return 0
def set_queue_unsatisfiable(g):
global all_jobs, unsatisfiable_job_scancelled
# Simulate a job requesting a 99 core node.
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
"\ntouch %s" % unsatisfiable_job_scancelled)
return 0
commit 13288c69988380210150124d7991af6ca6e3d62b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 22:32:08 2018 -0500
12199: Make mocks match real slurm output.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index a64fb23..5223245 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|
-2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|
-1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
commit 2b41877b842bbf47a0fdea713c9d2fc2fb80664a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 17:22:28 2018 -0500
12199: Update slurm features when nodes come up.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
index c8883c3..d022892 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
@@ -8,8 +8,8 @@ from __future__ import absolute_import, print_function
import subprocess
import time
-from . import \
- ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
from . import ComputeNodeShutdownActor as ShutdownActorBase
from . import ComputeNodeUpdateActor as UpdateActorBase
from .. import RetryMixin
@@ -20,16 +20,29 @@ class SlurmMixin(object):
'fail\n', 'fail*\n'])
SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
- def _set_node_state(self, nodename, state, *args):
- cmd = ['scontrol', 'update', 'NodeName=' + nodename,
- 'State=' + state]
- cmd.extend(args)
- subprocess.check_output(cmd)
+ def _update_slurm_node(self, nodename, updates):
+ cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+ try:
+ subprocess.check_output(cmd)
+ except:
+ self._logger.error(
+ "SLURM update %r failed", cmd, exc_info=True)
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+ def create_cloud_node(self):
+ hostname = self.arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_node(self.arvados_node['hostname'], [
+ 'Weight=%i' % int(self.cloud_size.price * 1000),
+ 'Features=instancetype='+self.cloud_size.name,
+ ])
+ return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def on_start(self):
arv_node = self._arvados_node()
@@ -47,7 +60,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
if self._nodename:
if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
- self._set_node_state(self._nodename, 'RESUME')
+ self._update_slurm_node(self._nodename, ['State=RESUME'])
else:
# Node is in a state such as 'idle' or 'alloc' so don't
# try to resume it because that will just raise an error.
@@ -59,7 +72,8 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
if self.cancel_reason is not None:
return
if self._nodename:
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DRAIN', 'Reason=Node Manager shutdown'])
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
else:
@@ -82,15 +96,18 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def _destroy_node(self):
if self._nodename:
- self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DOWN', 'Reason=Node Manager shutdown'])
super(ComputeNodeShutdownActor, self)._destroy_node()
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
- if arvados_node.get("hostname"):
- try:
- subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
- except:
- self._logger.error("Unable to set slurm node weight.", exc_info=True)
- return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+ hostname = arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_node(hostname, [
+ 'Weight=%i' % int(cloud_node.size.price * 1000),
+ 'Features=instancetype=' + cloud_node.size.name,
+ ])
+ return super(ComputeNodeUpdateActor, self).sync_node(
+ cloud_node, arvados_node)
diff --git a/services/nodemanager/tests/test_computenode_dispatch.py b/services/nodemanager/tests/test_computenode_dispatch.py
index 4b35205..b62ce56 100644
--- a/services/nodemanager/tests/test_computenode_dispatch.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -21,6 +21,8 @@ from arvnodeman.computenode.driver import BaseComputeNodeDriver
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
arvados_effect = [testutil.arvados_node_mock()]
@@ -35,7 +37,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
- self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+ self.setup_actor = self.ACTOR_CLASS.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
diff --git a/services/nodemanager/tests/test_computenode_dispatch_slurm.py b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
index 0b6162d..f896684 100644
--- a/services/nodemanager/tests/test_computenode_dispatch_slurm.py
+++ b/services/nodemanager/tests/test_computenode_dispatch_slurm.py
@@ -13,7 +13,10 @@ import mock
import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+ ComputeNodeShutdownActorMixin, \
+ ComputeNodeSetupActorTestCase, \
+ ComputeNodeUpdateActorTestCase
@mock.patch('subprocess.check_output')
class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
@@ -123,4 +126,14 @@ class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
cloud_node = testutil.cloud_node_mock()
arv_node = testutil.arvados_node_mock()
self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+ ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+ @mock.patch('subprocess.check_output')
+ def test_update_node_features(self, check_output):
+ self.make_mocks()
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
commit 428e8f9663ba05eb28667e02d01675e271c38fdf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 16:52:13 2018 -0500
12199: Rename instance type features: a1.xl => instancetype=a1.xl
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 44123f8..9eebbab 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -234,7 +234,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
} else if err != nil {
return nil, err
} else {
- sbatchArgs = append(sbatchArgs, "--constraint="+it.Name)
+ sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
}
return sbatchArgs, nil
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index b822232..583e0d8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -388,7 +388,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
{Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
{Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
},
- sbatchArgs: []string{"--constraint=a1.small"},
+ sbatchArgs: []string{"--constraint=instancetype=a1.small"},
},
// No node types configured => no slurm constraint
{
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
index 9cb6d3f..29832da 100644
--- a/services/dispatchcloud/node_size.go
+++ b/services/dispatchcloud/node_size.go
@@ -66,15 +66,19 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
//
// SlurmNodeTypeFeatureKludge does a test-and-fix operation
// immediately, and then periodically, in case slurm restarts and
-// forgets the list of valid features. It never returns, so it should
-// generally be invoked with "go".
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
- var types []string
+ if len(cc.InstanceTypes) == 0 {
+ return
+ }
+ var features []string
for _, it := range cc.InstanceTypes {
- types = append(types, it.Name)
+ features = append(features, "instancetype="+it.Name)
}
for {
- slurmKludge(types)
+ slurmKludge(features)
time.Sleep(time.Minute)
}
}
@@ -85,8 +89,8 @@ var (
slurmErrBadGres = "Invalid generic resource"
)
-func slurmKludge(types []string) {
- cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(types, "&"), "true")
+func slurmKludge(features []string) {
+ cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
out, err := cmd.CombinedOutput()
switch {
case err == nil:
@@ -95,8 +99,8 @@ func slurmKludge(types []string) {
case bytes.Contains(out, []byte(slurmErrBadFeature)):
log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
- for _, features := range []string{strings.Join(types, ","), ""} {
- cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+features)
+ for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+ cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
log.Printf("running: %q %q", cmd.Path, cmd.Args)
out, err := cmd.CombinedOutput()
if err != nil {
commit d2369c0fb8742d54f0cdfc88ef485beff62ea277
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Sun Jan 28 16:32:22 2018 -0500
12199: Use node type X to run jobs with constraint instancetype=X.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 4d2d3e0..ebc90e4 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -6,6 +6,7 @@
from __future__ import absolute_import, print_function
import logging
+import re
import subprocess
import arvados.util
@@ -74,13 +75,15 @@ class ServerCalculator(object):
return fallback
def cloud_size_for_constraints(self, constraints):
+ specified_name = constraints.get('instance_type')
want_value = lambda key: self.coerce_int(constraints.get(key), 0)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
for size in self.cloud_sizes:
- if size.meets_constraints(**wants):
- return size
+ if (size.meets_constraints(**wants) and
+ (specified_name is None or size.name == specified_name)):
+ return size
return None
def servers_for_queue(self, queue):
@@ -92,8 +95,7 @@ class ServerCalculator(object):
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
unsatisfiable_jobs[job['uuid']] = (
- 'Requirements for a single node exceed the available '
- 'cloud node size')
+ "Constraints cannot be satisfied by any node type")
elif (want_count > self.max_nodes):
unsatisfiable_jobs[job['uuid']] = (
"Job's min_nodes constraint is greater than the configured "
@@ -152,21 +154,43 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
queuelist = []
if self.slurm_queue:
# cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
for out in squeue_out.splitlines():
try:
- cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
- queuelist.append({
- "uuid": jobname,
- "runtime_constraints": {
- "min_cores_per_node": cpu,
- "min_ram_mb_per_node": self.coerce_to_mb(ram),
- "min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
- })
+ cpu, ram, disk, reason, jobname, features = out.split("|", 5)
except ValueError:
- pass
+ self._logger.warning("ignored malformed line in squeue output: %r", out)
+ continue
+ if '-dz642-' not in jobname:
+ continue
+ if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+ continue
+
+ for feature in features.split(','):
+ m = re.match(r'instancetype=(.*)', feature)
+ if not m:
+ continue
+ instance_type = m.group(1)
+ # Ignore cpu/ram/scratch requirements, bring up
+ # the requested node type.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "instance_type": instance_type,
+ }
+ })
+ break
+ else:
+ # No instance type specified. Choose a node type
+ # to suit cpu/ram/scratch requirements.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "min_cores_per_node": cpu,
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+ }
+ })
if self.jobs_queue:
queuelist.extend(self._client.jobs().queue().execute()['items'])
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index b1d5e00..a64fb23 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -159,7 +159,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
- mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+ mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -207,6 +207,16 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
+ @mock.patch("subprocess.check_output")
+ def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+ super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.subscriber.assert_called_with([testutil.MockSize(2)])
+
def test_coerce_to_mb(self):
self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
commit bad6499910388c17c4a54b3128f361aa36a670e1
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Jan 26 14:33:48 2018 -0500
12199: Refactor for more test-friendly initialization.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index 879cb78..44123f8 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -25,13 +25,16 @@ import (
"github.com/coreos/go-systemd/daemon"
)
-var version = "dev"
+var (
+ version = "dev"
+ defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+)
-type command struct {
- dispatcher *dispatch.Dispatcher
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+type Dispatcher struct {
+ *dispatch.Dispatcher
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
Client arvados.Client
@@ -49,15 +52,23 @@ type command struct {
}
func main() {
- err := (&command{}).Run(os.Args[0], os.Args[1:])
+ disp := &Dispatcher{}
+ err := disp.Run(os.Args[0], os.Args[1:])
if err != nil {
log.Fatal(err)
}
}
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+func (disp *Dispatcher) Run(prog string, args []string) error {
+ if err := disp.configure(prog, args); err != nil {
+ return err
+ }
+ disp.setup()
+ return disp.run()
+}
-func (cmd *command) Run(prog string, args []string) error {
+// configure() loads config files. Tests skip this.
+func (disp *Dispatcher) configure(prog string, args []string) error {
flags := flag.NewFlagSet(prog, flag.ExitOnError)
flags.Usage = func() { usage(flags) }
@@ -84,81 +95,84 @@ func (cmd *command) Run(prog string, args []string) error {
log.Printf("crunch-dispatch-slurm %s started", version)
- err := cmd.readConfig(*configPath)
+ err := disp.readConfig(*configPath)
if err != nil {
return err
}
- if cmd.CrunchRunCommand == nil {
- cmd.CrunchRunCommand = []string{"crunch-run"}
+ if disp.CrunchRunCommand == nil {
+ disp.CrunchRunCommand = []string{"crunch-run"}
}
- if cmd.PollPeriod == 0 {
- cmd.PollPeriod = arvados.Duration(10 * time.Second)
+ if disp.PollPeriod == 0 {
+ disp.PollPeriod = arvados.Duration(10 * time.Second)
}
- if cmd.Client.APIHost != "" || cmd.Client.AuthToken != "" {
+ if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
// MakeArvadosClient() uses them, and [b] they get
// propagated to crunch-run via SLURM.
- os.Setenv("ARVADOS_API_HOST", cmd.Client.APIHost)
- os.Setenv("ARVADOS_API_TOKEN", cmd.Client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- if cmd.Client.Insecure {
+ if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(cmd.Client.KeepServiceURIs, " "))
+ os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
- log.Fatal(config.DumpAndExit(cmd))
+ return config.DumpAndExit(disp)
}
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- log.Printf("Error making Arvados client: %v", err)
- return err
- }
- arv.Retries = 25
-
siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
if os.IsNotExist(err) {
- log.Printf("warning: no cluster config file %q (%s), proceeding with no node types defined", arvados.DefaultConfigFile, err)
+ log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
} else if err != nil {
- log.Fatalf("error loading config: %s", err)
- } else if cmd.cluster, err = siteConfig.GetCluster(""); err != nil {
- log.Fatalf("config error: %s", err)
- } else if len(cmd.cluster.InstanceTypes) > 0 {
- go dispatchcloud.SlurmNodeTypeFeatureKludge(cmd.cluster)
+ return fmt.Errorf("error loading config: %s", err)
+ } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
+ return fmt.Errorf("config error: %s", err)
}
- if cmd.slurm == nil {
- cmd.slurm = &slurmCLI{}
- }
+ return nil
+}
- cmd.sqCheck = &SqueueChecker{
- Period: time.Duration(cmd.PollPeriod),
- Slurm: cmd.slurm,
+// setup() initializes private fields after configure().
+func (disp *Dispatcher) setup() {
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("Error making Arvados client: %v", err)
}
- defer cmd.sqCheck.Stop()
+ arv.Retries = 25
- cmd.dispatcher = &dispatch.Dispatcher{
+ disp.slurm = &slurmCLI{}
+ disp.sqCheck = &SqueueChecker{
+ Period: time.Duration(disp.PollPeriod),
+ Slurm: disp.slurm,
+ }
+ disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
- RunContainer: cmd.run,
- PollPeriod: time.Duration(cmd.PollPeriod),
- MinRetryPeriod: time.Duration(cmd.MinRetryPeriod),
+ RunContainer: disp.runContainer,
+ PollPeriod: time.Duration(disp.PollPeriod),
+ MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+ }
+}
+
+func (disp *Dispatcher) run() error {
+ defer disp.sqCheck.Stop()
+
+ if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
+ go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
-
- go cmd.checkSqueueForOrphans()
-
- return cmd.dispatcher.Run(context.Background())
+ go disp.checkSqueueForOrphans()
+ return disp.Dispatcher.Run(context.Background())
}
var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
@@ -168,19 +182,19 @@ var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`
// jobs started by a previous dispatch process that never released
// their slurm allocations even though their container states are
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func (cmd *command) checkSqueueForOrphans() {
- for _, uuid := range cmd.sqCheck.All() {
+func (disp *Dispatcher) checkSqueueForOrphans() {
+ for _, uuid := range disp.sqCheck.All() {
if !containerUuidPattern.MatchString(uuid) {
continue
}
- err := cmd.dispatcher.TrackContainer(uuid)
+ err := disp.TrackContainer(uuid)
if err != nil {
log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
}
}
}
-func (cmd *command) niceness(priority int) int {
+func (disp *Dispatcher) niceness(priority int) int {
if priority > 1000 {
priority = 1000
}
@@ -191,7 +205,7 @@ func (cmd *command) niceness(priority int) int {
return (1000 - priority) * 10
}
-func (cmd *command) sbatchArgs(container arvados.Container) ([]string, error) {
+func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
@@ -203,19 +217,19 @@ func (cmd *command) sbatchArgs(container arvados.Container) ([]string, error) {
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, cmd.SbatchArguments...)
+ sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", cmd.niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- if cmd.cluster == nil {
+ if disp.cluster == nil {
// no instance types configured
- } else if it, err := dispatchcloud.ChooseInstanceType(cmd.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
// ditto
} else if err != nil {
return nil, err
@@ -226,39 +240,39 @@ func (cmd *command) sbatchArgs(container arvados.Container) ([]string, error) {
return sbatchArgs, nil
}
-func (cmd *command) submit(container arvados.Container, crunchRunCommand []string) error {
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
crScript := strings.NewReader(execScript(crArgs))
- cmd.sqCheck.L.Lock()
- defer cmd.sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ defer disp.sqCheck.L.Unlock()
- sbArgs, err := cmd.sbatchArgs(container)
+ sbArgs, err := disp.sbatchArgs(container)
if err != nil {
return err
}
log.Printf("running sbatch %+q", sbArgs)
- return cmd.slurm.Batch(crScript, sbArgs)
+ return disp.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- if ctr.State == dispatch.Locked && !cmd.sqCheck.HasUUID(ctr.UUID) {
+ if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := cmd.submit(ctr, cmd.CrunchRunCommand); err != nil {
+ if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
var text string
if err == dispatchcloud.ErrConstraintsNotSatisfiable {
text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
- cmd.dispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
} else {
text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
}
@@ -268,9 +282,9 @@ func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-
"object_uuid": ctr.UUID,
"event_type": "dispatch",
"properties": map[string]string{"text": text}}}
- cmd.dispatcher.Arv.Create("logs", lr, nil)
+ disp.Arv.Create("logs", lr, nil)
- cmd.dispatcher.Unlock(ctr.UUID)
+ disp.Unlock(ctr.UUID)
return
}
}
@@ -282,7 +296,7 @@ func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-
// no point in waiting for further dispatch updates: just
// clean up and return.
go func(uuid string) {
- for ctx.Err() == nil && cmd.sqCheck.HasUUID(uuid) {
+ for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
}
cancel()
}(ctr.UUID)
@@ -291,68 +305,68 @@ func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-
select {
case <-ctx.Done():
// Disappeared from squeue
- if err := cmd.dispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+ if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
}
switch ctr.State {
case dispatch.Running:
- cmd.dispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
case dispatch.Locked:
- cmd.dispatcher.Unlock(ctr.UUID)
+ disp.Unlock(ctr.UUID)
}
return
case updated, ok := <-status:
if !ok {
- log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
- cmd.scancel(ctr)
+ log.Printf("container %s is done: cancel slurm job", ctr.UUID)
+ disp.scancel(ctr)
} else if updated.Priority == 0 {
- log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
- cmd.scancel(ctr)
+ log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+ disp.scancel(ctr)
} else {
- cmd.renice(updated)
+ disp.renice(updated)
}
}
}
}
-func (cmd *command) scancel(ctr arvados.Container) {
- cmd.sqCheck.L.Lock()
- err := cmd.slurm.Cancel(ctr.UUID)
- cmd.sqCheck.L.Unlock()
+func (disp *Dispatcher) scancel(ctr arvados.Container) {
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Cancel(ctr.UUID)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("scancel: %s", err)
time.Sleep(time.Second)
- } else if cmd.sqCheck.HasUUID(ctr.UUID) {
+ } else if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
time.Sleep(time.Second)
}
}
-func (cmd *command) renice(ctr arvados.Container) {
- nice := cmd.niceness(ctr.Priority)
- oldnice := cmd.sqCheck.GetNiceness(ctr.UUID)
+func (disp *Dispatcher) renice(ctr arvados.Container) {
+ nice := disp.niceness(ctr.Priority)
+ oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
if nice == oldnice || oldnice == -1 {
return
}
log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- cmd.sqCheck.L.Lock()
- err := cmd.slurm.Renice(ctr.UUID, nice)
- cmd.sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Renice(ctr.UUID, nice)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("renice: %s", err)
time.Sleep(time.Second)
return
}
- if cmd.sqCheck.HasUUID(ctr.UUID) {
+ if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, cmd.sqCheck.GetNiceness(ctr.UUID))
+ ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
}
}
-func (cmd *command) readConfig(path string) error {
- err := config.LoadFile(cmd, path)
+func (disp *Dispatcher) readConfig(path string) error {
+ err := config.LoadFile(disp, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
log.Printf("Config not specified. Continue with default configuration.")
err = nil
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 459b7c6..b822232 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -37,12 +37,16 @@ var _ = Suite(&IntegrationSuite{})
var _ = Suite(&StubbedSuite{})
type IntegrationSuite struct {
- cmd command
+ disp Dispatcher
+ slurm slurmFake
}
func (s *IntegrationSuite) SetUpTest(c *C) {
arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
+ s.disp = Dispatcher{}
+ s.disp.setup()
+ s.slurm = slurmFake{}
}
func (s *IntegrationSuite) TearDownTest(c *C) {
@@ -87,7 +91,7 @@ func (sf *slurmFake) Cancel(name string) error {
return nil
}
-func (s *IntegrationSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C,
expectBatch [][]string,
runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
@@ -95,8 +99,6 @@ func (s *IntegrationSuite) integrationTest(c *C, slurm *slurmFake,
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
- s.cmd.slurm = slurm
-
// There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
@@ -106,34 +108,35 @@ func (s *IntegrationSuite) integrationTest(c *C, slurm *slurmFake,
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- s.cmd.CrunchRunCommand = []string{"echo"}
+ s.disp.CrunchRunCommand = []string{"echo"}
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
- s.cmd.dispatcher = &dispatch.Dispatcher{
+ s.disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
- slurm.queue = ""
+ s.slurm.queue = ""
doneRun <- struct{}{}
}()
- s.cmd.run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
- s.cmd.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: slurm}
+ s.disp.slurm = &s.slurm
+ s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
- err = s.cmd.dispatcher.Run(ctx)
+ err = s.disp.Dispatcher.Run(ctx)
<-doneRun
c.Assert(err, Equals, context.Canceled)
- s.cmd.sqCheck.Stop()
+ s.disp.sqCheck.Stop()
- c.Check(slurm.didBatch, DeepEquals, expectBatch)
+ c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
@@ -148,8 +151,8 @@ func (s *IntegrationSuite) integrationTest(c *C, slurm *slurmFake,
}
func (s *IntegrationSuite) TestNormal(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
container := s.integrationTest(c,
- &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -160,11 +163,10 @@ func (s *IntegrationSuite) TestNormal(c *C) {
}
func (s *IntegrationSuite) TestCancel(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
readyToCancel := make(chan bool)
- slurm.onCancel = func() { <-readyToCancel }
+ s.slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
- slurm,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -177,12 +179,12 @@ func (s *IntegrationSuite) TestCancel(c *C) {
close(readyToCancel)
})
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
- c.Check(len(slurm.didCancel) > 1, Equals, true)
- c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
+ c.Check(len(s.slurm.didCancel) > 1, Equals, true)
+ c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
- container := s.integrationTest(c, &slurmFake{},
+ container := s.integrationTest(c,
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem=%d", 11445),
@@ -198,8 +200,8 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
}
func (s *IntegrationSuite) TestSbatchFail(c *C) {
+ s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- &slurmFake{errBatch: errors.New("something terrible happened")},
[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -219,8 +221,8 @@ func (s *IntegrationSuite) TestSbatchFail(c *C) {
}
func (s *IntegrationSuite) TestChangePriority(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, slurm, nil,
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(time.Second)
@@ -232,16 +234,17 @@ func (s *IntegrationSuite) TestChangePriority(c *C) {
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(slurm.didRenice), Not(Equals), 0)
- c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+ c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
+ c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
}
type StubbedSuite struct {
- cmd command
+ disp Dispatcher
}
func (s *StubbedSuite) SetUpTest(c *C) {
- s.cmd = command{}
+ s.disp = Dispatcher{}
+ s.disp.setup()
}
func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
@@ -270,7 +273,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
- s.cmd.CrunchRunCommand = []string{crunchCmd}
+ s.disp.CrunchRunCommand = []string{crunchCmd}
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
@@ -282,7 +285,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
disp.UpdateState(ctr.UUID, dispatch.Running)
disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
- s.cmd.run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
@@ -301,7 +304,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
}
func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
- err := s.cmd.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
+ err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
c.Assert(err, NotNil)
}
@@ -313,7 +316,7 @@ func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
_, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
c.Check(err, IsNil)
- err = s.cmd.readConfig(tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, NotNil)
}
@@ -325,9 +328,9 @@ func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
_, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
c.Check(err, IsNil)
- err = s.cmd.readConfig(tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(0, Equals, len(s.cmd.SbatchArguments))
+ c.Check(0, Equals, len(s.disp.SbatchArguments))
}
func (s *StubbedSuite) TestReadConfig(c *C) {
@@ -340,9 +343,9 @@ func (s *StubbedSuite) TestReadConfig(c *C) {
_, err = tmpfile.Write([]byte(argsS))
c.Check(err, IsNil)
- err = s.cmd.readConfig(tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(args, DeepEquals, s.cmd.SbatchArguments)
+ c.Check(args, DeepEquals, s.disp.SbatchArguments)
}
func (s *StubbedSuite) TestSbatchArgs(c *C) {
@@ -358,9 +361,9 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
{"--arg1=v1", "--arg2"},
} {
c.Logf("%#v", defaults)
- s.cmd.SbatchArguments = defaults
+ s.disp.SbatchArguments = defaults
- args, err := s.cmd.sbatchArgs(container)
+ args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
c.Check(err, IsNil)
}
@@ -401,9 +404,9 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
},
} {
c.Logf("%#v", trial)
- s.cmd.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+ s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
- args, err := s.cmd.sbatchArgs(container)
+ args, err := s.disp.sbatchArgs(container)
c.Check(err, Equals, trial.err)
if trial.err == nil {
c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
@@ -419,7 +422,7 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
Priority: 1,
}
- args, err := s.cmd.sbatchArgs(container)
+ args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, []string{
"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
"--partition=blurb,b2",
commit 10f08c358c12468119dc2621c48b68d6d33417da
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Jan 25 19:13:18 2018 -0500
12199: Pass node type to sbatch --constraint argument if configured.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index bcc8197..879cb78 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -131,6 +131,8 @@ func (cmd *command) Run(prog string, args []string) error {
log.Fatalf("error loading config: %s", err)
} else if cmd.cluster, err = siteConfig.GetCluster(""); err != nil {
log.Fatalf("config error: %s", err)
+ } else if len(cmd.cluster.InstanceTypes) > 0 {
+ go dispatchcloud.SlurmNodeTypeFeatureKludge(cmd.cluster)
}
if cmd.slurm == nil {
@@ -211,6 +213,16 @@ func (cmd *command) sbatchArgs(container arvados.Container) ([]string, error) {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
+ if cmd.cluster == nil {
+ // no instance types configured
+ } else if it, err := dispatchcloud.ChooseInstanceType(cmd.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ // ditto
+ } else if err != nil {
+ return nil, err
+ } else {
+ sbatchArgs = append(sbatchArgs, "--constraint="+it.Name)
+ }
+
return sbatchArgs, nil
}
@@ -243,7 +255,13 @@ func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-
if ctr.State == dispatch.Locked && !cmd.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
if err := cmd.submit(ctr, cmd.CrunchRunCommand); err != nil {
- text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ var text string
+ if err == dispatchcloud.ErrConstraintsNotSatisfiable {
+ text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+ cmd.dispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+ } else {
+ text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ }
log.Print(text)
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 63f56f3..459b7c6 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -24,6 +24,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.curoverse.com/arvados.git/services/dispatchcloud"
. "gopkg.in/check.v1"
)
@@ -32,32 +33,19 @@ func Test(t *testing.T) {
TestingT(t)
}
-var _ = Suite(&TestSuite{})
-var _ = Suite(&MockArvadosServerSuite{})
+var _ = Suite(&IntegrationSuite{})
+var _ = Suite(&StubbedSuite{})
-type TestSuite struct {
+type IntegrationSuite struct {
cmd command
}
-var initialArgs []string
-
-func (s *TestSuite) SetUpSuite(c *C) {
- initialArgs = os.Args
-}
-
-func (s *TestSuite) TearDownSuite(c *C) {
-}
-
-func (s *TestSuite) SetUpTest(c *C) {
- args := []string{"crunch-dispatch-slurm"}
- os.Args = args
-
+func (s *IntegrationSuite) SetUpTest(c *C) {
arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
}
-func (s *TestSuite) TearDownTest(c *C) {
- os.Args = initialArgs
+func (s *IntegrationSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
arvadostest.StopAPI()
}
@@ -99,7 +87,7 @@ func (sf *slurmFake) Cancel(name string) error {
return nil
}
-func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C, slurm *slurmFake,
expectBatch [][]string,
runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
@@ -159,7 +147,7 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
return container
}
-func (s *TestSuite) TestIntegrationNormal(c *C) {
+func (s *IntegrationSuite) TestNormal(c *C) {
container := s.integrationTest(c,
&slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
@@ -171,7 +159,7 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
-func (s *TestSuite) TestIntegrationCancel(c *C) {
+func (s *IntegrationSuite) TestCancel(c *C) {
slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
readyToCancel := make(chan bool)
slurm.onCancel = func() { <-readyToCancel }
@@ -193,7 +181,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
-func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
+func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
container := s.integrationTest(c, &slurmFake{},
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
@@ -209,7 +197,7 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
-func (s *TestSuite) TestSbatchFail(c *C) {
+func (s *IntegrationSuite) TestSbatchFail(c *C) {
container := s.integrationTest(c,
&slurmFake{errBatch: errors.New("something terrible happened")},
[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
@@ -230,7 +218,7 @@ func (s *TestSuite) TestSbatchFail(c *C) {
c.Assert(len(ll.Items), Equals, 1)
}
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
+func (s *IntegrationSuite) TestChangePriority(c *C) {
slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
container := s.integrationTest(c, slurm, nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
@@ -248,15 +236,15 @@ func (s *TestSuite) TestIntegrationChangePriority(c *C) {
c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
}
-type MockArvadosServerSuite struct {
+type StubbedSuite struct {
cmd command
}
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
+func (s *StubbedSuite) SetUpTest(c *C) {
+ s.cmd = command{}
}
-func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
+func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
@@ -264,7 +252,7 @@ func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
-func (s *MockArvadosServerSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
@@ -312,12 +300,12 @@ func (s *MockArvadosServerSuite) testWithServerStub(c *C, apiStubResponses map[s
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
+func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
err := s.cmd.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
+func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -329,7 +317,7 @@ func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
+func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -342,7 +330,7 @@ func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
c.Check(0, Equals, len(s.cmd.SbatchArguments))
}
-func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
+func (s *StubbedSuite) TestReadConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -357,40 +345,79 @@ func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
c.Check(args, DeepEquals, s.cmd.SbatchArguments)
}
-func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
- s.testSbatchFuncWithArgs(c, nil)
-}
-
-func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
- s.testSbatchFuncWithArgs(c, []string{})
-}
+func (s *StubbedSuite) TestSbatchArgs(c *C) {
+ container := arvados.Container{
+ UUID: "123",
+ RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+ Priority: 1,
+ }
-func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
- s.testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+ for _, defaults := range [][]string{
+ nil,
+ {},
+ {"--arg1=v1", "--arg2"},
+ } {
+ c.Logf("%#v", defaults)
+ s.cmd.SbatchArguments = defaults
+
+ args, err := s.cmd.sbatchArgs(container)
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+ c.Check(err, IsNil)
+ }
}
-func (s *MockArvadosServerSuite) testSbatchFuncWithArgs(c *C, args []string) {
- s.cmd.SbatchArguments = append([]string(nil), args...)
-
+func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
- Priority: 1}
+ Priority: 1,
+ }
- var expected []string
- expected = append(expected, s.cmd.SbatchArguments...)
- expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
- args, err := s.cmd.sbatchArgs(container)
- c.Check(args, DeepEquals, expected)
- c.Check(err, IsNil)
+ for _, trial := range []struct {
+ types []arvados.InstanceType
+ sbatchArgs []string
+ err error
+ }{
+ // Choose node type => use --constraint arg
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+ {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+ },
+ sbatchArgs: []string{"--constraint=a1.small"},
+ },
+ // No node types configured => no slurm constraint
+ {
+ types: nil,
+ sbatchArgs: nil,
+ },
+ // No node type is big enough => error
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ },
+ err: dispatchcloud.ErrConstraintsNotSatisfiable,
+ },
+ } {
+ c.Logf("%#v", trial)
+ s.cmd.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+
+ args, err := s.cmd.sbatchArgs(container)
+ c.Check(err, Equals, trial.err)
+ if trial.err == nil {
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+ }
+ }
}
-func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
+func (s *StubbedSuite) TestSbatchPartition(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
- Priority: 1}
+ Priority: 1,
+ }
args, err := s.cmd.sbatchArgs(container)
c.Check(args, DeepEquals, []string{
commit 0c9586ca048805b404dd762f5cd7cabe5d1ed227
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Jan 25 19:12:49 2018 -0500
12199: Refactor to eliminate evil globals.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index ae2ca58..bcc8197 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -21,13 +21,18 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.curoverse.com/arvados.git/services/dispatchcloud"
"github.com/coreos/go-systemd/daemon"
)
var version = "dev"
-// Config used by crunch-dispatch-slurm
-type Config struct {
+type command struct {
+ dispatcher *dispatch.Dispatcher
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+
Client arvados.Client
SbatchArguments []string
@@ -41,27 +46,19 @@ type Config struct {
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
-
- slurm Slurm
}
func main() {
- theConfig.slurm = &slurmCLI{}
- err := doMain()
+ err := (&command{}).Run(os.Args[0], os.Args[1:])
if err != nil {
log.Fatal(err)
}
}
-var (
- theConfig Config
- sqCheck = &SqueueChecker{}
-)
-
const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
-func doMain() error {
- flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+func (cmd *command) Run(prog string, args []string) error {
+ flags := flag.NewFlagSet(prog, flag.ExitOnError)
flags.Usage = func() { usage(flags) }
configPath := flags.String(
@@ -77,7 +74,7 @@ func doMain() error {
false,
"Print version information and exit.")
// Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
+ flags.Parse(args)
// Print version information if requested
if *getVersion {
@@ -87,37 +84,37 @@ func doMain() error {
log.Printf("crunch-dispatch-slurm %s started", version)
- err := readConfig(&theConfig, *configPath)
+ err := cmd.readConfig(*configPath)
if err != nil {
return err
}
- if theConfig.CrunchRunCommand == nil {
- theConfig.CrunchRunCommand = []string{"crunch-run"}
+ if cmd.CrunchRunCommand == nil {
+ cmd.CrunchRunCommand = []string{"crunch-run"}
}
- if theConfig.PollPeriod == 0 {
- theConfig.PollPeriod = arvados.Duration(10 * time.Second)
+ if cmd.PollPeriod == 0 {
+ cmd.PollPeriod = arvados.Duration(10 * time.Second)
}
- if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
+ if cmd.Client.APIHost != "" || cmd.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
// MakeArvadosClient() uses them, and [b] they get
// propagated to crunch-run via SLURM.
- os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
- os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST", cmd.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", cmd.Client.AuthToken)
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- if theConfig.Client.Insecure {
+ if cmd.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
+ os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(cmd.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
- log.Fatal(config.DumpAndExit(theConfig))
+ log.Fatal(config.DumpAndExit(cmd))
}
arv, err := arvadosclient.MakeArvadosClient()
@@ -127,23 +124,39 @@ func doMain() error {
}
arv.Retries = 25
- sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
- defer sqCheck.Stop()
+ siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if os.IsNotExist(err) {
+ log.Printf("warning: no cluster config file %q (%s), proceeding with no node types defined", arvados.DefaultConfigFile, err)
+ } else if err != nil {
+ log.Fatalf("error loading config: %s", err)
+ } else if cmd.cluster, err = siteConfig.GetCluster(""); err != nil {
+ log.Fatalf("config error: %s", err)
+ }
+
+ if cmd.slurm == nil {
+ cmd.slurm = &slurmCLI{}
+ }
+
+ cmd.sqCheck = &SqueueChecker{
+ Period: time.Duration(cmd.PollPeriod),
+ Slurm: cmd.slurm,
+ }
+ defer cmd.sqCheck.Stop()
- dispatcher := &dispatch.Dispatcher{
+ cmd.dispatcher = &dispatch.Dispatcher{
Arv: arv,
- RunContainer: run,
- PollPeriod: time.Duration(theConfig.PollPeriod),
- MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+ RunContainer: cmd.run,
+ PollPeriod: time.Duration(cmd.PollPeriod),
+ MinRetryPeriod: time.Duration(cmd.MinRetryPeriod),
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
- go checkSqueueForOrphans(dispatcher, sqCheck)
+ go cmd.checkSqueueForOrphans()
- return dispatcher.Run(context.Background())
+ return cmd.dispatcher.Run(context.Background())
}
var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
@@ -153,19 +166,19 @@ var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`
// jobs started by a previous dispatch process that never released
// their slurm allocations even though their container states are
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
- for _, uuid := range sqCheck.All() {
+func (cmd *command) checkSqueueForOrphans() {
+ for _, uuid := range cmd.sqCheck.All() {
if !containerUuidPattern.MatchString(uuid) {
continue
}
- err := dispatcher.TrackContainer(uuid)
+ err := cmd.dispatcher.TrackContainer(uuid)
if err != nil {
log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
}
}
}
-func niceness(priority int) int {
+func (cmd *command) niceness(priority int) int {
if priority > 1000 {
priority = 1000
}
@@ -176,7 +189,7 @@ func niceness(priority int) int {
return (1000 - priority) * 10
}
-func sbatchArgs(container arvados.Container) []string {
+func (cmd *command) sbatchArgs(container arvados.Container) ([]string, error) {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
@@ -188,45 +201,48 @@ func sbatchArgs(container arvados.Container) []string {
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
+ sbatchArgs = append(sbatchArgs, cmd.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", cmd.niceness(container.Priority)))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return sbatchArgs
+ return sbatchArgs, nil
}
-func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
+func (cmd *command) submit(container arvados.Container, crunchRunCommand []string) error {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
crScript := strings.NewReader(execScript(crArgs))
- sqCheck.L.Lock()
- defer sqCheck.L.Unlock()
+ cmd.sqCheck.L.Lock()
+ defer cmd.sqCheck.L.Unlock()
- sbArgs := sbatchArgs(container)
+ sbArgs, err := cmd.sbatchArgs(container)
+ if err != nil {
+ return err
+ }
log.Printf("running sbatch %+q", sbArgs)
- return theConfig.slurm.Batch(crScript, sbArgs)
+ return cmd.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (cmd *command) run(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+ if ctr.State == dispatch.Locked && !cmd.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
+ if err := cmd.submit(ctr, cmd.CrunchRunCommand); err != nil {
text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
log.Print(text)
@@ -234,9 +250,9 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
"object_uuid": ctr.UUID,
"event_type": "dispatch",
"properties": map[string]string{"text": text}}}
- disp.Arv.Create("logs", lr, nil)
+ cmd.dispatcher.Arv.Create("logs", lr, nil)
- disp.Unlock(ctr.UUID)
+ cmd.dispatcher.Unlock(ctr.UUID)
return
}
}
@@ -248,7 +264,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
// no point in waiting for further dispatch updates: just
// clean up and return.
go func(uuid string) {
- for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+ for ctx.Err() == nil && cmd.sqCheck.HasUUID(uuid) {
}
cancel()
}(ctr.UUID)
@@ -257,68 +273,68 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
select {
case <-ctx.Done():
// Disappeared from squeue
- if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+ if err := cmd.dispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
}
switch ctr.State {
case dispatch.Running:
- disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ cmd.dispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
case dispatch.Locked:
- disp.Unlock(ctr.UUID)
+ cmd.dispatcher.Unlock(ctr.UUID)
}
return
case updated, ok := <-status:
if !ok {
log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
- scancel(ctr)
+ cmd.scancel(ctr)
} else if updated.Priority == 0 {
log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
- scancel(ctr)
+ cmd.scancel(ctr)
} else {
- renice(updated)
+ cmd.renice(updated)
}
}
}
}
-func scancel(ctr arvados.Container) {
- sqCheck.L.Lock()
- err := theConfig.slurm.Cancel(ctr.UUID)
- sqCheck.L.Unlock()
+func (cmd *command) scancel(ctr arvados.Container) {
+ cmd.sqCheck.L.Lock()
+ err := cmd.slurm.Cancel(ctr.UUID)
+ cmd.sqCheck.L.Unlock()
if err != nil {
log.Printf("scancel: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
+ } else if cmd.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
time.Sleep(time.Second)
}
}
-func renice(ctr arvados.Container) {
- nice := niceness(ctr.Priority)
- oldnice := sqCheck.GetNiceness(ctr.UUID)
+func (cmd *command) renice(ctr arvados.Container) {
+ nice := cmd.niceness(ctr.Priority)
+ oldnice := cmd.sqCheck.GetNiceness(ctr.UUID)
if nice == oldnice || oldnice == -1 {
return
}
log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- sqCheck.L.Lock()
- err := theConfig.slurm.Renice(ctr.UUID, nice)
- sqCheck.L.Unlock()
+ cmd.sqCheck.L.Lock()
+ err := cmd.slurm.Renice(ctr.UUID, nice)
+ cmd.sqCheck.L.Unlock()
if err != nil {
log.Printf("renice: %s", err)
time.Sleep(time.Second)
return
}
- if sqCheck.HasUUID(ctr.UUID) {
+ if cmd.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+ ctr.UUID, ctr.Priority, cmd.sqCheck.GetNiceness(ctr.UUID))
}
}
-func readConfig(dst interface{}, path string) error {
- err := config.LoadFile(dst, path)
+func (cmd *command) readConfig(path string) error {
+ err := config.LoadFile(cmd, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
log.Printf("Config not specified. Continue with default configuration.")
err = nil
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 830976d..63f56f3 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -35,8 +35,9 @@ func Test(t *testing.T) {
var _ = Suite(&TestSuite{})
var _ = Suite(&MockArvadosServerSuite{})
-type TestSuite struct{}
-type MockArvadosServerSuite struct{}
+type TestSuite struct {
+ cmd command
+}
var initialArgs []string
@@ -61,10 +62,6 @@ func (s *TestSuite) TearDownTest(c *C) {
arvadostest.StopAPI()
}
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
-}
-
type slurmFake struct {
didBatch [][]string
didCancel []string
@@ -110,10 +107,7 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
- defer func(orig Slurm) {
- theConfig.slurm = orig
- }(theConfig.slurm)
- theConfig.slurm = slurm
+ s.cmd.slurm = slurm
// There should be one queued container
params := arvadosclient.Dict{
@@ -124,12 +118,12 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- theConfig.CrunchRunCommand = []string{"echo"}
+ s.cmd.CrunchRunCommand = []string{"echo"}
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
- dispatcher := dispatch.Dispatcher{
+ s.cmd.dispatcher = &dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
@@ -138,18 +132,18 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
slurm.queue = ""
doneRun <- struct{}{}
}()
- run(disp, ctr, status)
+ s.cmd.run(disp, ctr, status)
cancel()
},
}
- sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+ s.cmd.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: slurm}
- err = dispatcher.Run(ctx)
+ err = s.cmd.dispatcher.Run(ctx)
<-doneRun
c.Assert(err, Equals, context.Canceled)
- sqCheck.Stop()
+ s.cmd.sqCheck.Stop()
c.Check(slurm.didBatch, DeepEquals, expectBatch)
@@ -236,15 +230,41 @@ func (s *TestSuite) TestSbatchFail(c *C) {
c.Assert(len(ll.Items), Equals, 1)
}
+func (s *TestSuite) TestIntegrationChangePriority(c *C) {
+ slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, slurm, nil,
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(time.Second)
+ dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"priority": 600}},
+ nil)
+ time.Sleep(time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
+ c.Assert(len(slurm.didRenice), Not(Equals), 0)
+ c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+}
+
+type MockArvadosServerSuite struct {
+ cmd command
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+ s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
-func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *MockArvadosServerSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
@@ -262,7 +282,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
- theConfig.CrunchRunCommand = []string{crunchCmd}
+ s.cmd.CrunchRunCommand = []string{crunchCmd}
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
@@ -274,7 +294,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
disp.UpdateState(ctr.UUID, dispatch.Running)
disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
- run(disp, ctr, status)
+ s.cmd.run(disp, ctr, status)
cancel()
},
}
@@ -293,14 +313,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
}
func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
- var config Config
- err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
+ err := s.cmd.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
c.Assert(err, NotNil)
}
func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
- var config Config
-
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -308,13 +325,11 @@ func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
_, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.cmd.readConfig(tmpfile.Name())
c.Assert(err, NotNil)
}
func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
- var config Config
-
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -322,14 +337,12 @@ func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
_, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.cmd.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(0, Equals, len(config.SbatchArguments))
+ c.Check(0, Equals, len(s.cmd.SbatchArguments))
}
func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
- var config Config
-
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
@@ -339,27 +352,25 @@ func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
_, err = tmpfile.Write([]byte(argsS))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.cmd.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(3, Equals, len(config.SbatchArguments))
- c.Check(args, DeepEquals, config.SbatchArguments)
+ c.Check(args, DeepEquals, s.cmd.SbatchArguments)
}
func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, nil)
+ s.testSbatchFuncWithArgs(c, nil)
}
func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{})
+ s.testSbatchFuncWithArgs(c, []string{})
}
func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+ s.testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
}
-func testSbatchFuncWithArgs(c *C, args []string) {
- defer func() { theConfig.SbatchArguments = nil }()
- theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
+func (s *MockArvadosServerSuite) testSbatchFuncWithArgs(c *C, args []string) {
+ s.cmd.SbatchArguments = append([]string(nil), args...)
container := arvados.Container{
UUID: "123",
@@ -367,9 +378,11 @@ func testSbatchFuncWithArgs(c *C, args []string) {
Priority: 1}
var expected []string
- expected = append(expected, theConfig.SbatchArguments...)
+ expected = append(expected, s.cmd.SbatchArguments...)
expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
- c.Check(sbatchArgs(container), DeepEquals, expected)
+ args, err := s.cmd.sbatchArgs(container)
+ c.Check(args, DeepEquals, expected)
+ c.Check(err, IsNil)
}
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
@@ -379,26 +392,10 @@ func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
Priority: 1}
- c.Check(sbatchArgs(container), DeepEquals, []string{
+ args, err := s.cmd.sbatchArgs(container)
+ c.Check(args, DeepEquals, []string{
"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
"--partition=blurb,b2",
})
-}
-
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, slurm, nil,
- func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(time.Second)
- dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"priority": 600}},
- nil)
- time.Sleep(time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- })
- c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(slurm.didRenice), Not(Equals), 0)
- c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+ c.Check(err, IsNil)
}
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 5ecfe8f..adb620e 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -22,6 +22,7 @@ type jobPriority struct {
// command 'squeue'.
type SqueueChecker struct {
Period time.Duration
+ Slurm Slurm
uuids map[string]jobPriority
startOnce sync.Once
done chan struct{}
@@ -77,7 +78,7 @@ func (sqc *SqueueChecker) check() {
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
commit bfa436c97990c1a0cf39907acc0235a8535b6a43
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Jan 25 19:11:32 2018 -0500
12199: Add SlurmNodeTypeFeatureKludge.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
index 1abccc5..9cb6d3f 100644
--- a/services/dispatchcloud/node_size.go
+++ b/services/dispatchcloud/node_size.go
@@ -5,7 +5,12 @@
package dispatchcloud
import (
+ "bytes"
"errors"
+ "log"
+ "os/exec"
+ "strings"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
@@ -42,3 +47,67 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
}
return
}
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) when srun is given an invalid --gres argument and an invalid
+// --constraint argument, the error message mentions "Invalid feature
+// specification". So, to test whether a feature name is valid without
+// actually submitting a job, we can call srun with the feature name
+// and an invalid --gres argument.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns, so it should
+// generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+ var types []string
+ for _, it := range cc.InstanceTypes {
+ types = append(types, it.Name)
+ }
+ for {
+ slurmKludge(types)
+ time.Sleep(time.Minute)
+ }
+}
+
+var (
+ slurmDummyNode = "compute0"
+ slurmErrBadFeature = "Invalid feature"
+ slurmErrBadGres = "Invalid generic resource"
+)
+
+func slurmKludge(types []string) {
+ cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(types, "&"), "true")
+ out, err := cmd.CombinedOutput()
+ switch {
+ case err == nil:
+ log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
+ log.Printf("output was: %q", out)
+
+ case bytes.Contains(out, []byte(slurmErrBadFeature)):
+ log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
+ for _, features := range []string{strings.Join(types, ","), ""} {
+ cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+features)
+ log.Printf("running: %q %q", cmd.Path, cmd.Args)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Printf("error: scontrol: %s (output was %q)", err, out)
+ }
+ }
+
+ case bytes.Contains(out, []byte(slurmErrBadGres)):
+ // Evidently our node-type feature names are all valid.
+
+ default:
+ log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+ }
+}
commit f9e1447e5ab21dbc0fdb08328c31811fca2fd327
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Jan 23 14:25:21 2018 -0500
12199: Add node size calculator.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-tests.sh b/build/run-tests.sh
index a02f732..57ce41e 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -77,6 +77,7 @@ lib/crunchstat
services/api
services/arv-git-httpd
services/crunchstat
+services/dispatchcloud
services/dockercleaner
services/fuse
services/health
@@ -869,6 +870,7 @@ gostuff=(
sdk/go/stats
services/arv-git-httpd
services/crunchstat
+ services/dispatchcloud
services/health
services/keep-web
services/keepstore
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index ca0df1f..9ed0eac 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package arvados
import (
@@ -48,6 +52,16 @@ type Cluster struct {
ClusterID string `json:"-"`
ManagementToken string
SystemNodes map[string]SystemNode
+ InstanceTypes []InstanceType
+}
+
+type InstanceType struct {
+ Name string
+ ProviderType string
+ VCPUs int
+ RAM int64
+ Scratch int64
+ Price float64
}
// GetThisSystemNode returns a SystemNode for the node we're running
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index a541a8d..20d007c 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -41,9 +41,9 @@ type Mount struct {
// CPU) and network connectivity.
type RuntimeConstraints struct {
API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int `json:"keep_cache_ram"`
+ RAM int64 `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int64 `json:"keep_cache_ram"`
}
// SchedulingParameters specify a container's scheduling parameters
diff --git a/services/dispatchcloud/gocheck_test.go b/services/dispatchcloud/gocheck_test.go
new file mode 100644
index 0000000..22f89f0
--- /dev/null
+++ b/services/dispatchcloud/gocheck_test.go
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "testing"
+
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
new file mode 100644
index 0000000..1abccc5
--- /dev/null
+++ b/services/dispatchcloud/node_size.go
@@ -0,0 +1,44 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "errors"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+ ErrConstraintsNotSatisfiable = errors.New("constraints not satisfiable by any configured instance type")
+ ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+)
+
+// ChooseInstanceType returns the cheapest available
+// arvados.InstanceType big enough to run ctr.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+ needVCPUs := ctr.RuntimeConstraints.VCPUs
+ needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+
+ if len(cc.InstanceTypes) == 0 {
+ err = ErrInstanceTypesNotConfigured
+ return
+ }
+
+ err = ErrConstraintsNotSatisfiable
+ for _, it := range cc.InstanceTypes {
+ switch {
+ case err == nil && it.Price > best.Price:
+ case it.RAM < needRAM:
+ case it.VCPUs < needVCPUs:
+ case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
+ // Equal price, but worse specs
+ default:
+ // Lower price || (same price && better specs)
+ best = it
+ err = nil
+ }
+ }
+ return
+}
diff --git a/services/dispatchcloud/node_size_test.go b/services/dispatchcloud/node_size_test.go
new file mode 100644
index 0000000..bc628b5
--- /dev/null
+++ b/services/dispatchcloud/node_size_test.go
@@ -0,0 +1,73 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+type NodeSizeSuite struct{}
+
+func (*NodeSizeSuite) TestChooseNotConfigured(c *check.C) {
+ _, err := ChooseInstanceType(&arvados.Cluster{}, &arvados.Container{
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ RAM: 1234567890,
+ VCPUs: 2,
+ },
+ })
+ c.Check(err, check.Equals, ErrInstanceTypesNotConfigured)
+}
+
+func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
+ for _, rc := range []arvados.RuntimeConstraints{
+ {RAM: 9876543210, VCPUs: 2},
+ {RAM: 1234567890, VCPUs: 20},
+ {RAM: 1234567890, VCPUs: 2, KeepCacheRAM: 9876543210},
+ } {
+ _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4"},
+ }}, &arvados.Container{RuntimeConstraints: rc})
+ c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+ }
+}
+
+func (*NodeSizeSuite) TestChoose(c *check.C) {
+ for _, menu := range [][]arvados.InstanceType{
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ },
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ },
+ {
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ },
+ } {
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+ c.Check(best.RAM >= 1234567890, check.Equals, true)
+ c.Check(best.VCPUs >= 2, check.Equals, true)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list