[ARVADOS] updated: f16505d89d60d3bd5abc04762712f4ddf78e39fe

git at public.curoverse.com git at public.curoverse.com
Mon Nov 16 16:22:03 EST 2015


Summary of changes:
 services/nodemanager/arvnodeman/config.py   |  1 +
 services/nodemanager/arvnodeman/daemon.py   | 29 +++++++++++---
 services/nodemanager/arvnodeman/jobqueue.py |  6 ---
 services/nodemanager/arvnodeman/launcher.py |  3 +-
 services/nodemanager/doc/azure.example.cfg  | 28 +++++++++----
 services/nodemanager/doc/ec2.example.cfg    | 21 ++++++++--
 services/nodemanager/doc/gce.example.cfg    | 21 ++++++++--
 services/nodemanager/tests/test_daemon.py   | 62 ++++++++++++++++++++++++++---
 services/nodemanager/tests/test_jobqueue.py | 40 ++++++++++++++++++-
 9 files changed, 176 insertions(+), 35 deletions(-)

       via  f16505d89d60d3bd5abc04762712f4ddf78e39fe (commit)
      from  f684cda0d2a332f1d2054e7ac8e9de3cb67e23a9 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit f16505d89d60d3bd5abc04762712f4ddf78e39fe
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Nov 16 16:21:34 2015 -0500

    5353: Added max_total_price.  Added more tests for multiple node sizes.
    Updated config file examples.

diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 315df1c..777e082 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -41,6 +41,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'poll_time': '60',
                        'max_poll_time': '300',
                        'poll_stale_after': '600',
+                       'max_total_price': 0,
                        'boot_fail_after': str(sys.maxint),
                        'node_stale_after': str(60 * 60 * 2)},
             'Logging': {'file': '/dev/stderr',
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index f4cd456..cd967a4 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -110,7 +110,8 @@ class NodeManagerDaemonActor(actor_class):
                  node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
-                 node_actor_class=dispatch.ComputeNodeMonitorActor):
+                 node_actor_class=dispatch.ComputeNodeMonitorActor,
+                 max_total_price=0):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
@@ -127,6 +128,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_cloud_size = self.server_calculator.cheapest_size()
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
+        self.max_total_price = max_total_price
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
@@ -223,6 +225,15 @@ class NodeManagerDaemonActor(actor_class):
                   if size is None or c.cloud_node.size.id == size.id)
         return up
 
+    def _total_price(self):
+        cost = 0
+        cost += sum(c.cloud_size.get().price
+                  for c in self.booting.itervalues())
+        cost += sum(c.cloud_node.size.price
+                  for i in (self.booted, self.cloud_nodes.nodes)
+                  for c in i.itervalues())
+        return cost
+
     def _nodes_busy(self, size):
         return sum(1 for busy in
                    pykka.get_all(rec.actor.in_state('busy') for rec in
@@ -248,15 +259,21 @@ class NodeManagerDaemonActor(actor_class):
         total_up_count = self._nodes_up(None)
         under_min = self.min_nodes - total_up_count
         over_max = total_up_count - self.max_nodes
+        total_price = self._total_price()
+
         if over_max >= 0:
             return -over_max
+        elif self.max_total_price and ((total_price + size.price) > self.max_total_price):
+            self._logger.info("Not booting new %s (price %s) because with current total_price of %s it would exceed max_total_price of %s",
+                              size.name, size.price, total_price, self.max_total_price)
+            return 0
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
-        else:
-            up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
-                                               self._nodes_busy(size) +
-                                               self._nodes_missing(size))
-            return self._size_wishlist(size) - up_count
+
+        up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+                                           self._nodes_busy(size) +
+                                           self._nodes_missing(size))
+        return self._size_wishlist(size) - up_count
 
     def _nodes_excess(self, size):
         up_count = self._nodes_up(size) - self._size_shutdowns(size)
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index ebe79fd..06f66b7 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -83,12 +83,6 @@ class ServerCalculator(object):
     def cheapest_size(self):
         return self.cloud_sizes[0]
 
-    def find_size(self, sz):
-        for s in self.cloud_sizes:
-            if s.id == sz.id:
-                return s
-        return None
-
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 5dfdb1d..592d217 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -120,7 +120,8 @@ def main(args=None):
         config.getint('Daemon', 'poll_stale_after'),
         config.getint('Daemon', 'boot_fail_after'),
         config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
+        node_setup, node_shutdown, node_monitor,
+        max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
 
     signal.pause()
     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
diff --git a/services/nodemanager/doc/azure.example.cfg b/services/nodemanager/doc/azure.example.cfg
index 50161c2..ef56ce1 100644
--- a/services/nodemanager/doc/azure.example.cfg
+++ b/services/nodemanager/doc/azure.example.cfg
@@ -15,6 +15,11 @@ min_nodes = 0
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll Azure nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -138,16 +143,25 @@ tag_cluster = zyxwv
 # the API server to ping
 ping_host = hostname:port
 
-[Size Standard_D3]
-# You can define any number of Size sections to list Azure sizes you're
-# willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use.  The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue.  You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Microsoft's provided
-# data fields by setting the same names here.
+# data fields by setting them here.
+
+[Size Standard_D3]
 cores = 4
-scratch = 200
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index 9b41ca1..5c882e5 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -15,6 +15,11 @@ min_nodes = 0
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll EC2 nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -123,16 +128,24 @@ subnet_id = idstring
 # compute node.
 security_groups = idstring1, idstring2
 
-[Size t2.medium]
+
 # You can define any number of Size sections to list EC2 sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Amazon's provided
-# data fields by setting the same names here.
+# data fields (such as price per hour) by setting them here.
+
+[Size m4.large]
 cores = 2
+price = 0.126
+scratch = 100
+
+[Size m4.xlarge]
+cores = 4
+price = 0.252
 scratch = 100
diff --git a/services/nodemanager/doc/gce.example.cfg b/services/nodemanager/doc/gce.example.cfg
index 6770370..2da9c5f 100644
--- a/services/nodemanager/doc/gce.example.cfg
+++ b/services/nodemanager/doc/gce.example.cfg
@@ -17,6 +17,11 @@ poll_time = 60
 # This is the longest time to wait between polls.
 max_poll_time = 300
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # If Node Manager can't succesfully poll a service for this long,
 # it will never start or stop compute nodes, on the assumption that its
 # information is too outdated.
@@ -118,11 +123,10 @@ image = debian-7
 # See http://libcloud.readthedocs.org/en/latest/compute/drivers/gce.html#specifying-service-account-scopes
 # service_accounts = [{'email':'account at example.com', 'scopes':['storage-ro']}]
 
-[Size n1-standard-2]
+
 # You can define any number of Size sections to list node sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
 #
 # The Size fields are interpreted the same way as with a libcloud NodeSize:
 # http://libcloud.readthedocs.org/en/latest/compute/api.html#libcloud.compute.base.NodeSize
@@ -135,6 +139,15 @@ image = debian-7
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.
+# You can also override Google's provided data fields (such as price per hour)
+# by setting them here.
+
+[Size n1-standard-2]
 cores = 2
+price = 0.076
 scratch = 100
-ram = 512
+
+[Size n1-standard-4]
+cores = 4
+price = 0.152
+scratch = 200
\ No newline at end of file
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 865191e..2510c79 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -49,7 +49,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
                     avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
                     min_nodes=0, max_nodes=8,
-                    shutdown_windows=[54, 5, 1]):
+                    shutdown_windows=[54, 5, 1],
+                    max_total_price=None):
         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
         self.arv_factory = mock.MagicMock(name='arvados_mock')
@@ -71,7 +72,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.arv_factory, self.cloud_factory,
             shutdown_windows, ServerCalculator(avail_sizes),
             min_nodes, max_nodes, 600, 1800, 3600,
-            self.node_setup, self.node_shutdown).proxy()
+            self.node_setup, self.node_shutdown,
+            max_total_price=max_total_price).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if arvados_nodes is not None:
@@ -597,16 +599,40 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertEqual(1, self.last_shutdown.stop.call_count)
 
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 10:
+            time.sleep(.1)
+            n += 1
+        self.assertTrue(f())
+
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
         big = testutil.MockSize(2)
         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
                         (testutil.MockSize(2), {"cores":2})]
-        self.make_daemon(want_sizes=[small, small, big],
-                         avail_sizes=avail_sizes)
-        booting = self.daemon.booting.get()
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=4)
+        self.busywait(lambda: self.node_setup.start.call_count == 4)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
+
+    def test_node_max_nodes_two_sizes(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(3, self.node_setup.start.call_count)
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in booting.itervalues():
             sizecounts[b.cloud_size.get().id] += 1
@@ -643,14 +669,38 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(1, self.node_setup.start.call_count)
         self.assertEqual(1, self.node_shutdown.start.call_count)
 
+        # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in booting.itervalues():
             sizecounts[b.cloud_size.get().id] += 1
         self.assertEqual(0, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
 
+        # shutting down a small node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in shutdowns.itervalues():
             sizecounts[b.cloud_node.get().size.id] += 1
         self.assertEqual(1, sizecounts[small.id])
         self.assertEqual(0, sizecounts[big.id])
+
+    def test_node_max_price(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+                        (testutil.MockSize(2), {"cores":2, "price":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes,
+                         max_nodes=4,
+                         max_total_price=4)
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get()
+        self.stop_proxy(self.daemon)
+        self.assertEqual(3, self.node_setup.start.call_count)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+        # The way the update_server_wishlist() works effectively results in a
+        # round-robin creation of one node of each size in the wishlist
+        self.assertEqual(2, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
diff --git a/services/nodemanager/tests/test_jobqueue.py b/services/nodemanager/tests/test_jobqueue.py
index 4c97aed..2ddecd0 100644
--- a/services/nodemanager/tests/test_jobqueue.py
+++ b/services/nodemanager/tests/test_jobqueue.py
@@ -57,6 +57,45 @@ class ServerCalculatorTestCase(unittest.TestCase):
         servcalc = self.make_calculator([2, 4, 1, 3])
         self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
 
+    def test_next_biggest(self):
+        servcalc = self.make_calculator([1, 2, 4, 8])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 3},
+                                  {'min_cores_per_node': 6})
+        self.assertEqual([servcalc.cloud_sizes[2].id,
+                          servcalc.cloud_sizes[3].id],
+                         [s.id for s in servlist])
+
+    def test_multiple_sizes(self):
+        servcalc = self.make_calculator([1, 2])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id],
+                         [s.id for s in servlist])
+
+
 
 class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):
@@ -82,4 +121,3 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
 if __name__ == '__main__':
     unittest.main()
-

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list