[ARVADOS] created: 744ed51008c0bffb9e88c0eb2f57af429c3ebed6
Git user
git at public.curoverse.com
Fri Apr 7 17:25:49 EDT 2017
at 744ed51008c0bffb9e88c0eb2f57af429c3ebed6 (commit)
commit 744ed51008c0bffb9e88c0eb2f57af429c3ebed6
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Apr 7 15:20:04 2017 -0400
11349: Add management server with /status.json
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index aa9b3e2..30e8995 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -47,6 +47,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
'node_stale_after': str(60 * 60 * 2),
'watchdog': '600',
'node_mem_scaling': '0.95'},
+ 'Manage': {'address': '127.0.0.1',
+ 'port': '-1'},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
}.iteritems():
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 9441546..5522877 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -9,6 +9,7 @@ import time
import pykka
from . import computenode as cnode
+from . import status
from .computenode import dispatch
from .config import actor_class
@@ -253,6 +254,17 @@ class NodeManagerDaemonActor(actor_class):
states.append("shutdown")
return states + pykka.get_all(proxy_states)
+ def _update_tracker(self):
+ updates = {
+ k: 0
+ for k in status.tracker.keys()
+ if k.startswith('nodes_')
+ }
+ for s in self._node_states(size=None):
+ updates.setdefault('nodes_'+s, 0)
+ updates['nodes_'+s] += 1
+ status.tracker.update(updates)
+
def _state_counts(self, size):
states = self._node_states(size)
counts = {
@@ -337,6 +349,10 @@ class NodeManagerDaemonActor(actor_class):
self._later.stop_booting_node(size)
except Exception as e:
self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+ try:
+ self._update_tracker()
+ except:
+ self._logger.exception("while updating tracker")
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 93f6cbd..11d38ec 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -13,6 +13,7 @@ import pykka
import libcloud
from . import config as nmconfig
+from . import status
from .baseactor import WatchdogActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
@@ -112,6 +113,8 @@ def main(args=None):
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
+ status.Server(config).start()
+
try:
root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
diff --git a/services/nodemanager/arvnodeman/status.py b/services/nodemanager/arvnodeman/status.py
new file mode 100644
index 0000000..cda91bb
--- /dev/null
+++ b/services/nodemanager/arvnodeman/status.py
@@ -0,0 +1,63 @@
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+ def __init__(self, config):
+ port = config.getint('Manage', 'port')
+ self.enabled = port >= 0
+ if not self.enabled:
+ return
+ self._config = config
+ self._tracker = tracker
+ super(Server, self).__init__(
+ (config.get('Manage', 'address'), port), Handler)
+ self._thread = threading.Thread(target=self.serve_forever)
+ self._thread.daemon = True
+
+ def start(self):
+ if self.enabled:
+ self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+ def do_GET(self):
+ if self.path == '/status.json':
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(tracker.get_json())
+ else:
+ self.send_response(404)
+
+ def log_message(self, fmt, *args, **kwargs):
+ _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+ def __init__(self):
+ self._mtx = threading.Lock()
+ self._latest = {}
+
+ def get_json(self):
+ with self._mtx:
+ return json.dumps(self._latest)
+
+ def keys(self):
+ with self._mtx:
+ return self._latest.keys()
+
+ def update(self, updates):
+ with self._mtx:
+ self._latest.update(updates)
+
+
+tracker = Tracker()
diff --git a/services/nodemanager/doc/azure.example.cfg b/services/nodemanager/doc/azure.example.cfg
index f253621..e0d806e 100644
--- a/services/nodemanager/doc/azure.example.cfg
+++ b/services/nodemanager/doc/azure.example.cfg
@@ -1,6 +1,16 @@
# Azure configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Management]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8900
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
diff --git a/services/nodemanager/doc/ec2.example.cfg b/services/nodemanager/doc/ec2.example.cfg
index b25bf94..f722a71 100644
--- a/services/nodemanager/doc/ec2.example.cfg
+++ b/services/nodemanager/doc/ec2.example.cfg
@@ -1,6 +1,16 @@
# EC2 configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Management]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8900
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
diff --git a/services/nodemanager/doc/gce.example.cfg b/services/nodemanager/doc/gce.example.cfg
index ed7bdc3..ea740c1 100644
--- a/services/nodemanager/doc/gce.example.cfg
+++ b/services/nodemanager/doc/gce.example.cfg
@@ -1,6 +1,16 @@
# Google Compute Engine configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Management]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8900
+
[Daemon]
# Node Manager will ensure that there are at least this many nodes running at
# all times. If node manager needs to start new idle nodes for the purpose of
diff --git a/services/nodemanager/doc/local.example.cfg b/services/nodemanager/doc/local.example.cfg
index 314750e..00f4a9b 100644
--- a/services/nodemanager/doc/local.example.cfg
+++ b/services/nodemanager/doc/local.example.cfg
@@ -5,6 +5,10 @@
# is through the API server Rails console: load the Node object, set its
# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+[Management]
+address = 0.0.0.0
+port = 8900
+
[Daemon]
min_nodes = 0
max_nodes = 8
diff --git a/services/nodemanager/setup.py b/services/nodemanager/setup.py
index c30108f..5eb923e 100644
--- a/services/nodemanager/setup.py
+++ b/services/nodemanager/setup.py
@@ -29,17 +29,23 @@ setup(name='arvados-node-manager',
('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'apache-libcloud>=0.16',
- 'arvados-python-client>=0.1.20150206225333',
- 'pykka',
- 'python-daemon',
- 'setuptools'
- ],
- dependency_links = [
+ 'apache-libcloud>=0.16',
+ 'arvados-python-client>=0.1.20150206225333',
+ 'future',
+ 'pykka',
+ 'python-daemon',
+ 'setuptools'
+ ],
+ dependency_links=[
"https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+ tests_require=[
+ 'requests',
+ 'pbr<1.7.0',
+ 'mock>=1.0',
+ 'apache-libcloud==0.18.1.dev4',
+ ],
zip_safe=False,
cmdclass={'egg_info': tagger},
)
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index e49fc39..84ede2a 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -9,9 +9,11 @@ import mock
import pykka
import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
from arvnodeman.jobqueue import ServerCalculator
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
+from . import test_status
import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -355,10 +357,15 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_shutdown.start.called,
"daemon did not shut down booted node on offer")
+ with test_status.TestServer() as srv:
+ self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+ self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+
def test_booted_node_lifecycle(self):
cloud_node = testutil.cloud_node_mock(6)
setup = self.start_node_boot(cloud_node, id_num=6)
diff --git a/services/nodemanager/tests/test_status.py b/services/nodemanager/tests/test_status.py
new file mode 100644
index 0000000..df7b2ba
--- /dev/null
+++ b/services/nodemanager/tests/test_status.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+ def __enter__(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '0')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ addr, port = self.srv.server_address
+ self.srv_base = 'http://127.0.0.1:'+str(port)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.srv.shutdown()
+
+ def get_status_response(self):
+ return requests.get(self.srv_base+'/status.json')
+
+ def get_status(self):
+ return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+ def test_updates(self):
+ with TestServer() as srv:
+ for n in [1, 2, 3]:
+ status.tracker.update({'nodes_'+str(n): n})
+ r = srv.get_status_response()
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual(1, resp['nodes_1'])
+ self.assertEqual(n, resp['nodes_'+str(n)])
+
+
+class StatusServerDisabled(unittest.TestCase):
+ def test_config_disabled(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '-1')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ self.assertFalse(self.srv.enabled)
+ self.assertFalse(getattr(self.srv, '_thread', False))
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list