[ARVADOS] created: 81aa936c50bda0257d62dd59e45d5afba73a4bac

Git user git at public.curoverse.com
Mon Jun 5 15:41:33 EDT 2017


        at  81aa936c50bda0257d62dd59e45d5afba73a4bac (commit)


commit 81aa936c50bda0257d62dd59e45d5afba73a4bac
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 2 17:35:15 2017 -0400

    10312: Tests pass for booting single node, multiple nodes, hitting quota, quota
    probe.  Add node manager integration to run-tests.sh.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/build/run-tests.sh b/build/run-tests.sh
index b6a93d4..1111ab8 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -854,6 +854,12 @@ test_login-sync() {
 }
 do_test services/login-sync login-sync
 
+test_nodemanager-integration() {
+    cd "$WORKSPACE/services/nodemanager" \
+        && tests/integration_test.py ${testargs[services/nodemanager-integration]}
+}
+do_test services/nodemanager-integration nodemanager-integration
+
 for p in "${pythonstuff[@]}"
 do
     dir=${p%:py3}
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index ac0d0bd..f4ffff5 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -36,7 +36,10 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
         ConfigParser.SafeConfigParser.__init__(self, *args, **kwargs)
         for sec_name, settings in {
             'Arvados': {'insecure': 'no',
-                        'timeout': '15'},
+                        'timeout': '15',
+                        'jobs_queue': 'yes',
+                        'slurm_queue': 'yes'
+                    },
             'Daemon': {'min_nodes': '0',
                        'max_nodes': '1',
                        'poll_time': '60',
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 029d818..c0413f6 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -407,14 +407,13 @@ class NodeManagerDaemonActor(actor_class):
             setup_proxy, 'cloud_node', 'arvados_node', 'error')
         setup_proxy.stop()
 
-        total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
         if cloud_node is None:
             # If cloud_node is None then the node create wasn't successful.
             if error == dispatch.QuotaExceeded:
                 # We've hit a quota limit, so adjust node_quota to stop trying to
                 # boot new nodes until the node count goes down.
-                self.node_quota = min(total_node_count-1, self.max_nodes)
-                self._logger.warning("Setting node quota to %s", self.node_quota)
+                self.node_quota = len(self.cloud_nodes)
+                self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
         else:
             # Node creation succeeded.  Update cloud node list.
             cloud_node._nodemanager_recently_booted = True
@@ -432,8 +431,11 @@ class NodeManagerDaemonActor(actor_class):
             # now booting single core machines (actual quota 20), we want to
             # allow the quota to expand so we don't get stuck at 10 machines
             # forever.
-            if total_node_count == self.node_quota and self.node_quota < self.max_nodes:
-                self.node_quota += 1
+            if len(self.cloud_nodes) >= self.node_quota:
+                self.node_quota = len(self.cloud_nodes)+1
+                self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
+
+        self.node_quota = min(self.node_quota, self.max_nodes)
         del self.booting[setup_proxy.actor_ref.actor_urn]
         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
 
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index c0de691..1716a57 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -112,9 +112,12 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     CLIENT_ERRORS = ARVADOS_ERRORS
 
-    def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
+    def __init__(self, client, timer_actor, server_calc,
+                 jobs_queue, slurm_queue, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
             client, timer_actor, *args, **kwargs)
+        self.jobs_queue = jobs_queue
+        self.slurm_queue = slurm_queue
         self._calculator = server_calc
 
     @staticmethod
@@ -132,25 +135,27 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             return int(x)
 
     def _send_request(self):
-        # cpus, memory, tempory disk space, reason, job name
-        squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
         queuelist = []
-        for out in squeue_out.splitlines():
-            try:
-                cpu, ram, disk, reason, jobname = out.split("|", 4)
-                if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
-                    queuelist.append({
-                        "uuid": jobname,
-                        "runtime_constraints": {
-                            "min_cores_per_node": cpu,
-                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                        }
-                    })
-            except ValueError:
-                pass
-
-        queuelist.extend(self._client.jobs().queue().execute()['items'])
+        if self.slurm_queue:
+            # cpus, memory, tempory disk space, reason, job name
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            for out in squeue_out.splitlines():
+                try:
+                    cpu, ram, disk, reason, jobname = out.split("|", 4)
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                        queuelist.append({
+                            "uuid": jobname,
+                            "runtime_constraints": {
+                                "min_cores_per_node": cpu,
+                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                            }
+                        })
+                except ValueError:
+                    pass
+
+        if self.jobs_queue:
+            queuelist.extend(self._client.jobs().queue().execute()['items'])
 
         return queuelist
 
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index cb80bbf..3cd097a 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -87,7 +87,10 @@ def launch_pollers(config, server_calculator):
         config.new_arvados_client(), timer, poll_time, max_poll_time).tell_proxy()
     job_queue_poller = JobQueueMonitorActor.start(
         config.new_arvados_client(), timer, server_calculator,
-        poll_time, max_poll_time).tell_proxy()
+        config.getboolean('Arvados', 'jobs_queue'),
+        config.getboolean('Arvados', 'slurm_queue'),
+        poll_time, max_poll_time
+    ).tell_proxy()
     return timer, cloud_node_poller, arvados_node_poller, job_queue_poller
 
 _caught_signals = {}
diff --git a/services/nodemanager/arvnodeman/test/fake_driver.py b/services/nodemanager/arvnodeman/test/fake_driver.py
index be0789e..ee49305 100644
--- a/services/nodemanager/arvnodeman/test/fake_driver.py
+++ b/services/nodemanager/arvnodeman/test/fake_driver.py
@@ -3,8 +3,11 @@ import urllib
 import ssl
 
 from libcloud.compute.base import NodeSize, Node, NodeDriver, NodeState
+from libcloud.common.exceptions import BaseHTTPError
 
 all_nodes = []
+create_calls = 0
+quota = 2
 
 class FakeDriver(NodeDriver):
     def __init__(self, *args, **kwargs):
@@ -27,14 +30,16 @@ class FakeDriver(NodeDriver):
                     ex_user_name=None,
                     ex_tags=None,
                     ex_network=None):
-        global all_nodes
-        all_nodes.append(Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags}))
+        global all_nodes, create_calls
+        create_calls += 1
+        n = Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
+        all_nodes.append(n)
         ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0] + "&instance_id=" + name
         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
         ctx.verify_mode = ssl.CERT_NONE
         f = urllib.urlopen(ping_url, "", context=ctx)
         f.close()
-        return all_nodes[-1]
+        return n
 
     def destroy_node(self, cloud_node):
         global all_nodes
@@ -46,3 +51,36 @@ class FakeDriver(NodeDriver):
 
     def ex_create_tags(self, cloud_node, tags):
         pass
+
+class QuotaDriver(FakeDriver):
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        global all_nodes, create_calls, quota
+        if len(all_nodes) >= quota:
+            raise BaseHTTPError(503, "Quota exceeded")
+        else:
+            return super(QuotaDriver, self).create_node(name=name,
+                    size=size,
+                    image=image,
+                    auth=auth,
+                    ex_storage_account=ex_storage_account,
+                    ex_customdata=ex_customdata,
+                    ex_resource_group=ex_resource_group,
+                    ex_user_name=ex_user_name,
+                    ex_tags=ex_tags,
+                    ex_network=ex_network)
+
+    def destroy_node(self, cloud_node):
+        global all_nodes, quota
+        all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+        if len(all_nodes) == 0:
+            quota = 4
+        return True
diff --git a/services/nodemanager/tests/fake.cfg.template b/services/nodemanager/tests/fake.cfg.template
index 631745a..eacd53f 100644
--- a/services/nodemanager/tests/fake.cfg.template
+++ b/services/nodemanager/tests/fake.cfg.template
@@ -49,7 +49,7 @@ poll_stale_after = 600
 # node before this long, assume that there was a cloud bootstrap failure and
 # shut it down.  Note that normal shutdown windows apply (see the Cloud
 # section), so this should be shorter than the first shutdown window value.
-boot_fail_after = 20
+boot_fail_after = 45
 
 # "Node stale time" affects two related behaviors.
 # 1. If a compute node has been running for at least this long, but it
@@ -93,6 +93,7 @@ apiclient = WARNING
 host = {host}
 token = {token}
 timeout = 15
+jobs_queue = no
 
 # Accept an untrusted SSL certificate from the API server?
 insecure = yes
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
index 90bf237..53cbc10 100755
--- a/services/nodemanager/tests/integration_test.py
+++ b/services/nodemanager/tests/integration_test.py
@@ -8,54 +8,96 @@ import logging
 import stat
 import tempfile
 import shutil
+from functools import partial
+import arvados
 
 logging.basicConfig(level=logging.INFO)
 
 fake_slurm = None
 compute_nodes = None
+all_jobs = None
 
 def update_script(path, val):
     with open(path+"_", "w") as f:
         f.write(val)
     os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
     os.rename(path+"_", path)
+    logging.info("Update script %s: %s", path, val)
 
+def set_squeue(g):
+    global all_jobs
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
+                  "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+    return 0
+
+
+def node_paired(g):
+    global compute_nodes
+    compute_nodes[g.group(1)] = g.group(3)
+
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+
+    for k,v in all_jobs.items():
+        if v == "ReqNodeNotAvail":
+            all_jobs[k] = "Running"
+            break
+
+    set_squeue(g)
 
-def set_squeue(actions, checks, k, g):
-    update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
-echo '1|100|100|ReqNodeNotAvail|34t0i-dz642-h42bg3hq4bdfpf9'
-""")
     return 0
 
-def set_sinfo_alloc(actions, checks, k, g):
-    update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
-echo '%s alloc'
-""" % (g.group(3)))
+def remaining_jobs(g):
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
 
-    update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
-echo '1|100|100|Running|34t0i-dz642-h42bg3hq4bdfpf9'
-""")
+    for k,v in all_jobs.items():
+        all_jobs[k] = "Running"
 
+    set_squeue(g)
+
+    return 0
+
+
+def node_busy(g):
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
+                  "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+    return 0
+
+def node_shutdown(g):
     global compute_nodes
-    compute_nodes[g.group(1)] = g.group(3)
+    del compute_nodes[g.group(1)]
     return 0
 
-def set_sinfo_idle(actions, checks, k, g):
-    update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
-echo '%s idle'
-""" % (compute_nodes[g.group(1)]))
+def jobs_req(g):
+    global all_jobs
+    for k,v in all_jobs.items():
+        all_jobs[k] = "ReqNodeNotAvail"
+    set_squeue(g)
     return 0
 
-def noop(actions, checks, k, g):
+def noop(g):
     return 0
 
-def down_fail(actions, checks, k, g):
+def fail(checks, pattern, g):
     return 1
 
+def expect_count(count, checks, pattern, g):
+    if count == 0:
+        return 1
+    else:
+        checks[pattern] = partial(expect_count, count-1)
+        return 0
 
-def run_test(actions, checks, driver_class):
+def run_test(name, actions, checks, driver_class, jobs):
     code = 0
 
+    api = arvados.api('v1')
+    for n in api.nodes().list().execute()['items']:
+        api.nodes().delete(uuid=n["uuid"]).execute()
+
+    logging.info("Start %s", name)
+
     global fake_slurm
     fake_slurm = tempfile.mkdtemp()
     logging.info("fake_slurm is %s", fake_slurm)
@@ -63,6 +105,9 @@ def run_test(actions, checks, driver_class):
     global compute_nodes
     compute_nodes = {}
 
+    global all_jobs
+    all_jobs = jobs
+
     env = os.environ.copy()
     env["PATH"] = fake_slurm + ":" + env["PATH"]
 
@@ -78,56 +123,181 @@ def run_test(actions, checks, driver_class):
                                       driver_class=driver_class,
                                       ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
 
-    timeout = time.time() + 300
+    timeout = time.time() + 180
+    terminated = False
 
     p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
-                         bufsize=1, stderr=subprocess.PIPE, env=env)
-    for line in p.stderr:
-        sys.stdout.write(line)
+                         bufsize=0, stderr=subprocess.PIPE, env=env)
 
-        if time.time() > timeout:
-            logging.error("Exceeded timeout")
-            code = 1
-            p.terminate()
+    try:
+        # naive line iteration over pipes gets buffered, which isn't what we want,
+        # see https://bugs.python.org/issue3907
+        for line in iter(p.stderr.readline, ""):
+            sys.stdout.write(line)
 
-        for k,v in actions.items():
-            g = re.match(k, line)
-            if g:
-                logging.info("Triggered action %s", k)
-                del actions[k]
-                code = v(actions, checks, k, g)
-                if code != 0:
-                    logging.error("Action failed")
+            for k,v in checks.items():
+                g = re.match(k, line)
+                if g:
+                    logging.info("Matched check %s", k)
+                    code += v(checks, k, g)
+                    if code != 0:
+                        logging.error("Check failed")
+                        if not terminated:
+                            p.terminate()
+                            terminated = True
+
+            if terminated:
+                continue
+
+            if time.time() > timeout:
+                logging.error("Exceeded timeout with actions remaining: %s", actions)
+                code += 1
+                if not terminated:
                     p.terminate()
+                    terminated = True
 
-        for k,v in checks.items():
+            k, v = actions[0]
             g = re.match(k, line)
             if g:
-                logging.info("Triggered check %s", k)
-                code = v(actions, checks, k, g)
+                logging.info("Matched action %s", k)
+                actions.pop(0)
+                code += v(g)
                 if code != 0:
-                    logging.error("Check failed")
+                    logging.error("Action failed")
                     p.terminate()
+                    terminated = True
+
+            if not actions:
+                p.terminate()
+                terminated = True
+    except KeyboardInterrupt:
+        p.kill()
 
-        if not actions:
-            p.terminate()
+    if actions:
+        logging.error("Ended with remaining actions: %s", actions)
+        code = 1
 
-    #shutil.rmtree(fake_slurm)
+    shutil.rmtree(fake_slurm)
+
+    if code == 0:
+        logging.info("%s passed", name)
+    else:
+        logging.info("%s failed", name)
 
     return code
 
 
 def main():
-    code = run_test({
-        r".*Daemon started": set_squeue,
-        r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": set_sinfo_alloc,
-        r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)": set_sinfo_idle,
-        r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)": noop,
-        r".*Shutdown success": noop,
-    }, {
-        r".*Suggesting shutdown because node state is \('down', .*\)": down_fail
-    },
-    "arvnodeman.test.fake_driver.FakeDriver")
+    # Test lifecycle.
+
+    tests = {
+        "test1": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
+                r".*Setting node quota.*": fail,
+            },
+            "arvnodeman.test.fake_driver.FakeDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"}),
+        "test2": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
+                r".*Setting node quota.*": fail,
+            },
+            "arvnodeman.test.fake_driver.FakeDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test3": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*setting node quota to 3", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
+                r".*Sending create_node request.*": partial(expect_count, 5)
+            },
+            "arvnodeman.test.fake_driver.QuotaDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test4": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*setting node quota to 3", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*sending request", jobs_req),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
+                (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+                (r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
+            ], {
+                r".*Suggesting shutdown because node state is \('down', .*\)": fail,
+                r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
+                r".*Sending create_node request.*": partial(expect_count, 9)
+            },
+            "arvnodeman.test.fake_driver.QuotaDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
+             "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         })
+    }
+
+    code = 0
+    if len(sys.argv) > 1:
+        code = run_test(sys.argv[1], *tests[sys.argv[1]])
+    else:
+        for t in sorted(tests.keys()):
+            code += run_test(t, *tests[t])
+
+    if code == 0:
+        logging.info("Tests passed")
+    else:
+        logging.info("Tests failed")
+
     exit(code)
 
 main()

commit 09794d996eca79b85d3ac0c21a4a43c65a51d0d7
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Jun 2 11:58:55 2017 -0400

    10312: Integration test framework for node manager, runs full node manager with
    fake cloud driver and monitors logging output.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 8f9207e..029d818 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -146,8 +146,8 @@ class NodeManagerDaemonActor(actor_class):
         self.last_polls[poll_key] = time.time()
 
     def _pair_nodes(self, node_record, arvados_node):
-        self._logger.info("Cloud node %s is now paired with Arvados node %s",
-                          node_record.cloud_node.name, arvados_node['uuid'])
+        self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
+                          node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
         self._arvados_nodes_actor.subscribe_to(
             arvados_node['uuid'], node_record.actor.update_arvados_node)
         node_record.arvados_node = arvados_node
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index 11d38ec..cb80bbf 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -22,6 +22,7 @@ from .timedcallback import TimedCallBackActor
 from ._version import __version__
 
 node_daemon = None
+watchdog = None
 
 def abort(msg, code=1):
     print("arvados-node-manager: " + msg)
@@ -97,6 +98,7 @@ def shutdown_signal(signal_code, frame):
         pykka.ActorRegistry.stop_all()
         sys.exit(-signal_code)
     elif current_count == 0:
+        watchdog.stop()
         node_daemon.shutdown()
     elif current_count == 1:
         pykka.ActorRegistry.stop_all()
@@ -104,7 +106,7 @@ def shutdown_signal(signal_code, frame):
         sys.exit(-signal_code)
 
 def main(args=None):
-    global node_daemon
+    global node_daemon, watchdog
     args = parse_cli(args)
     config = load_config(args.config)
 
@@ -138,7 +140,7 @@ def main(args=None):
             node_setup, node_shutdown, node_monitor,
             max_total_price=config.getfloat('Daemon', 'max_total_price')).tell_proxy()
 
-        WatchdogActor.start(config.getint('Daemon', 'watchdog'),
+        watchdog = WatchdogActor.start(config.getint('Daemon', 'watchdog'),
                             cloud_node_poller.actor_ref,
                             arvados_node_poller.actor_ref,
                             job_queue_poller.actor_ref,
diff --git a/services/nodemanager/arvnodeman/test/__init__.py b/services/nodemanager/arvnodeman/test/__init__.py
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/services/nodemanager/arvnodeman/test/__init__.py
@@ -0,0 +1 @@
+
diff --git a/services/nodemanager/arvnodeman/fake_driver.py b/services/nodemanager/arvnodeman/test/fake_driver.py
similarity index 91%
rename from services/nodemanager/arvnodeman/fake_driver.py
rename to services/nodemanager/arvnodeman/test/fake_driver.py
index 89a3dbb..be0789e 100644
--- a/services/nodemanager/arvnodeman/fake_driver.py
+++ b/services/nodemanager/arvnodeman/test/fake_driver.py
@@ -27,18 +27,19 @@ class FakeDriver(NodeDriver):
                     ex_user_name=None,
                     ex_tags=None,
                     ex_network=None):
+        global all_nodes
         all_nodes.append(Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags}))
         ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0] + "&instance_id=" + name
-        print(ping_url)
         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
         ctx.verify_mode = ssl.CERT_NONE
         f = urllib.urlopen(ping_url, "", context=ctx)
-        print(f.read())
         f.close()
         return all_nodes[-1]
 
     def destroy_node(self, cloud_node):
-        return None
+        global all_nodes
+        all_nodes = [n for n in all_nodes if n.id != cloud_node.id]
+        return True
 
     def get_image(self, img):
         pass
diff --git a/services/nodemanager/fake_slurm/sinfo b/services/nodemanager/fake_slurm/sinfo
deleted file mode 100755
index e57d0d3..0000000
--- a/services/nodemanager/fake_slurm/sinfo
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/sh
-echo
\ No newline at end of file
diff --git a/services/nodemanager/fake_slurm/squeue b/services/nodemanager/fake_slurm/squeue
deleted file mode 100755
index dd114a0..0000000
--- a/services/nodemanager/fake_slurm/squeue
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/sh
-echo '1|100|100|ReqNodeNotAvail|34t0i-dz642-h42bg3hq4bdfpf9'
diff --git a/services/nodemanager/tests/fake.azure.cfg b/services/nodemanager/tests/fake.cfg.template
similarity index 95%
rename from services/nodemanager/tests/fake.azure.cfg
rename to services/nodemanager/tests/fake.cfg.template
index 7f7629f..631745a 100644
--- a/services/nodemanager/tests/fake.azure.cfg
+++ b/services/nodemanager/tests/fake.cfg.template
@@ -34,7 +34,7 @@ max_nodes = 8
 max_total_price = 0
 
 # Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 15
+poll_time = 5
 
 # Polls have exponential backoff when services fail to respond.
 # This is the longest time to wait between polls.
@@ -49,7 +49,7 @@ poll_stale_after = 600
 # node before this long, assume that there was a cloud bootstrap failure and
 # shut it down.  Note that normal shutdown windows apply (see the Cloud
 # section), so this should be shorter than the first shutdown window value.
-boot_fail_after = 1800
+boot_fail_after = 20
 
 # "Node stale time" affects two related behaviors.
 # 1. If a compute node has been running for at least this long, but it
@@ -90,8 +90,8 @@ pykka = WARNING
 apiclient = WARNING
 
 [Arvados]
-host = 192.168.5.2:8000
-token = 2tnmn9ou33o3vk3bynzyzrc7aedhijo7ufa11j9kyv7509cygx
+host = {host}
+token = {token}
 timeout = 15
 
 # Accept an untrusted SSL certificate from the API server?
@@ -99,7 +99,7 @@ insecure = yes
 
 [Cloud]
 provider = azure
-driver_class = arvnodeman.fake_driver.FakeDriver
+driver_class = {driver_class}
 
 # Shutdown windows define periods of time when a node may and may not be shut
 # down.  These are windows in full minutes, separated by commas.  Counting from
@@ -110,7 +110,7 @@ driver_class = arvnodeman.fake_driver.FakeDriver
 # Azure bills by the minute, so it makes sense to agressively shut down idle
 # nodes.  Specify at least two windows.  You can add as many as you need beyond
 # that.
-shutdown_windows = 5, 999999
+shutdown_windows = 1, 999999
 
 [Cloud Credentials]
 # Use "azure account list" with the azure CLI to get these values.
@@ -123,7 +123,7 @@ subscription_id = 00000000-0000-0000-0000-000000000000
 # azure config mode arm
 # azure ad app create --name "<Your Application Display Name>" --home-page "<https://YourApplicationHomePage>" --identifier-uris "<https://YouApplicationUri>" --password <Your_Password>
 # azure ad sp create "<Application_Id>"
-# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/{subscriptionId}/
+# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/<subscriptionId>/
 #
 # Use <Application_Id> for "key" and the <Your_Password> for "secret"
 #
@@ -142,7 +142,7 @@ ex_resource_group = ArvadosResourceGroup
 image = Canonical:UbuntuServer:14.04.3-LTS:14.04.201508050
 
 # Path to a local ssh key file that will be used to provision new nodes.
-ssh_key = /home/peter/.ssh/id_rsa.pub
+ssh_key = {ssh_key}
 
 # The account name for the admin user that will be provisioned on new nodes.
 ex_user_name = arvadosuser
@@ -161,7 +161,7 @@ tag_arvados-class = dynamic-compute
 tag_cluster = zyxwv
 
 # the API server to ping
-ping_host = 192.168.5.2:8000
+ping_host = {host}
 
 # You can define any number of Size sections to list Azure sizes you're willing
 # to use.  The Node Manager should boot the cheapest size(s) that can run jobs
diff --git a/services/nodemanager/tests/integration_test.py b/services/nodemanager/tests/integration_test.py
new file mode 100755
index 0000000..90bf237
--- /dev/null
+++ b/services/nodemanager/tests/integration_test.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+import subprocess
+import os
+import sys
+import re
+import time
+import logging
+import stat
+import tempfile
+import shutil
+
+logging.basicConfig(level=logging.INFO)
+
+fake_slurm = None
+compute_nodes = None
+
+def update_script(path, val):
+    with open(path+"_", "w") as f:
+        f.write(val)
+    os.chmod(path+"_", stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
+    os.rename(path+"_", path)
+
+
+def set_squeue(actions, checks, k, g):
+    update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
+echo '1|100|100|ReqNodeNotAvail|34t0i-dz642-h42bg3hq4bdfpf9'
+""")
+    return 0
+
+def set_sinfo_alloc(actions, checks, k, g):
+    update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
+echo '%s alloc'
+""" % (g.group(3)))
+
+    update_script(os.path.join(fake_slurm, "squeue"), """#!/bin/sh
+echo '1|100|100|Running|34t0i-dz642-h42bg3hq4bdfpf9'
+""")
+
+    global compute_nodes
+    compute_nodes[g.group(1)] = g.group(3)
+    return 0
+
+def set_sinfo_idle(actions, checks, k, g):
+    update_script(os.path.join(fake_slurm, "sinfo"), """#!/bin/sh
+echo '%s idle'
+""" % (compute_nodes[g.group(1)]))
+    return 0
+
+def noop(actions, checks, k, g):
+    return 0
+
+def down_fail(actions, checks, k, g):
+    return 1
+
+
+def run_test(actions, checks, driver_class):
+    code = 0
+
+    global fake_slurm
+    fake_slurm = tempfile.mkdtemp()
+    logging.info("fake_slurm is %s", fake_slurm)
+
+    global compute_nodes
+    compute_nodes = {}
+
+    env = os.environ.copy()
+    env["PATH"] = fake_slurm + ":" + env["PATH"]
+
+    update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n")
+    update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n")
+
+    with open("tests/fake.cfg.template") as f:
+        with open(os.path.join(fake_slurm, "id_rsa.pub"), "w") as ssh:
+            pass
+        with open(os.path.join(fake_slurm, "fake.cfg"), "w") as cfg:
+            cfg.write(f.read().format(host=os.environ["ARVADOS_API_HOST"],
+                                      token=os.environ["ARVADOS_API_TOKEN"],
+                                      driver_class=driver_class,
+                                      ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
+
+    timeout = time.time() + 300
+
+    p = subprocess.Popen(["bin/arvados-node-manager", "--foreground", "--config", os.path.join(fake_slurm, "fake.cfg")],
+                         bufsize=1, stderr=subprocess.PIPE, env=env)
+    for line in p.stderr:
+        sys.stdout.write(line)
+
+        if time.time() > timeout:
+            logging.error("Exceeded timeout")
+            code = 1
+            p.terminate()
+
+        for k,v in actions.items():
+            g = re.match(k, line)
+            if g:
+                logging.info("Triggered action %s", k)
+                del actions[k]
+                code = v(actions, checks, k, g)
+                if code != 0:
+                    logging.error("Action failed")
+                    p.terminate()
+
+        for k,v in checks.items():
+            g = re.match(k, line)
+            if g:
+                logging.info("Triggered check %s", k)
+                code = v(actions, checks, k, g)
+                if code != 0:
+                    logging.error("Check failed")
+                    p.terminate()
+
+        if not actions:
+            p.terminate()
+
+    #shutil.rmtree(fake_slurm)
+
+    return code
+
+
+def main():
+    code = run_test({
+        r".*Daemon started": set_squeue,
+        r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": set_sinfo_alloc,
+        r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)": set_sinfo_idle,
+        r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)": noop,
+        r".*Shutdown success": noop,
+    }, {
+        r".*Suggesting shutdown because node state is \('down', .*\)": down_fail
+    },
+    "arvnodeman.test.fake_driver.FakeDriver")
+    exit(code)
+
+main()

commit ea47e67af0e7528f0bcb23f3b34019b308eaa68a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 1 17:37:09 2017 -0400

    10312: Adding ability to substitute fake libcloud driver but run full node manager for integration testing.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index ec8b1d1..63dac3f 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -5,6 +5,7 @@ from __future__ import absolute_import, print_function
 import functools
 import logging
 import time
+import re
 
 import libcloud.common.types as cloud_types
 import pykka
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 30e8995..ac0d0bd 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -103,12 +103,19 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
     def new_cloud_client(self):
         module = importlib.import_module('arvnodeman.computenode.driver.' +
                                          self.get('Cloud', 'provider'))
+        driver_class = module.ComputeNodeDriver.DEFAULT_DRIVER
+        if self.get('Cloud', 'driver_class'):
+            d = self.get('Cloud', 'driver_class').split('.')
+            mod = '.'.join(d[:-1])
+            cls = d[-1]
+            driver_class = importlib.import_module(mod).__dict__[cls]
         auth_kwargs = self.get_section('Cloud Credentials')
         if 'timeout' in auth_kwargs:
             auth_kwargs['timeout'] = int(auth_kwargs['timeout'])
         return module.ComputeNodeDriver(auth_kwargs,
                                         self.get_section('Cloud List'),
-                                        self.get_section('Cloud Create'))
+                                        self.get_section('Cloud Create'),
+                                        driver_class=driver_class)
 
     def node_sizes(self, all_sizes):
         """Finds all acceptable NodeSizes for our installation.
diff --git a/services/nodemanager/arvnodeman/fake_driver.py b/services/nodemanager/arvnodeman/fake_driver.py
new file mode 100644
index 0000000..89a3dbb
--- /dev/null
+++ b/services/nodemanager/arvnodeman/fake_driver.py
@@ -0,0 +1,47 @@
+import re
+import urllib
+import ssl
+
+from libcloud.compute.base import NodeSize, Node, NodeDriver, NodeState
+
+all_nodes = []
+
+class FakeDriver(NodeDriver):
+    def __init__(self, *args, **kwargs):
+        self.name = "FakeDriver"
+
+    def list_sizes(self, **kwargs):
+        return [NodeSize("Standard_D3", "Standard_D3", 3500, 200, 0, 0, self),
+                NodeSize("Standard_D4", "Standard_D4", 7000, 400, 0, 0, self)]
+
+    def list_nodes(self, **kwargs):
+        return all_nodes
+
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        all_nodes.append(Node(name, name, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags}))
+        ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0] + "&instance_id=" + name
+        print(ping_url)
+        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+        ctx.verify_mode = ssl.CERT_NONE
+        f = urllib.urlopen(ping_url, "", context=ctx)
+        print(f.read())
+        f.close()
+        return all_nodes[-1]
+
+    def destroy_node(self, cloud_node):
+        return None
+
+    def get_image(self, img):
+        pass
+
+    def ex_create_tags(self, cloud_node, tags):
+        pass
diff --git a/services/nodemanager/arvnodeman/jobqueue.py b/services/nodemanager/arvnodeman/jobqueue.py
index 0340918..c0de691 100644
--- a/services/nodemanager/arvnodeman/jobqueue.py
+++ b/services/nodemanager/arvnodeman/jobqueue.py
@@ -136,16 +136,19 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
         queuelist = []
         for out in squeue_out.splitlines():
-            cpu, ram, disk, reason, jobname = out.split("|", 4)
-            if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
-                queuelist.append({
-                    "uuid": jobname,
-                    "runtime_constraints": {
-                        "min_cores_per_node": cpu,
-                        "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                        "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                    }
-                })
+            try:
+                cpu, ram, disk, reason, jobname = out.split("|", 4)
+                if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        }
+                    })
+            except ValueError:
+                pass
 
         queuelist.extend(self._client.jobs().queue().execute()['items'])
 
diff --git a/services/nodemanager/arvnodeman/nodelist.py b/services/nodemanager/arvnodeman/nodelist.py
index 6bf1a8b..7bc3a5e 100644
--- a/services/nodemanager/arvnodeman/nodelist.py
+++ b/services/nodemanager/arvnodeman/nodelist.py
@@ -29,16 +29,19 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
         sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
         nodestates = {}
         for out in sinfo_out.splitlines():
-            nodename, state = out.split(" ", 2)
-            if state in ('alloc', 'alloc*',
-                         'comp',  'comp*',
-                         'mix',   'mix*',
-                         'drng',  'drng*'):
-                nodestates[nodename] = 'busy'
-            elif state == 'idle':
-                nodestates[nodename] = 'idle'
-            else:
-                nodestates[nodename] = 'down'
+            try:
+                nodename, state = out.split(" ", 2)
+                if state in ('alloc', 'alloc*',
+                             'comp',  'comp*',
+                             'mix',   'mix*',
+                             'drng',  'drng*'):
+                    nodestates[nodename] = 'busy'
+                elif state == 'idle':
+                    nodestates[nodename] = 'idle'
+                else:
+                    nodestates[nodename] = 'down'
+            except ValueError:
+                pass
 
         for n in nodelist:
             if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
diff --git a/services/nodemanager/fake_slurm/sinfo b/services/nodemanager/fake_slurm/sinfo
new file mode 100755
index 0000000..e57d0d3
--- /dev/null
+++ b/services/nodemanager/fake_slurm/sinfo
@@ -0,0 +1,2 @@
+#!/bin/sh
+echo
\ No newline at end of file
diff --git a/services/nodemanager/fake_slurm/squeue b/services/nodemanager/fake_slurm/squeue
new file mode 100755
index 0000000..dd114a0
--- /dev/null
+++ b/services/nodemanager/fake_slurm/squeue
@@ -0,0 +1,2 @@
+#!/bin/sh
+echo '1|100|100|ReqNodeNotAvail|34t0i-dz642-h42bg3hq4bdfpf9'
diff --git a/services/nodemanager/tests/fake.azure.cfg b/services/nodemanager/tests/fake.azure.cfg
new file mode 100644
index 0000000..7f7629f
--- /dev/null
+++ b/services/nodemanager/tests/fake.azure.cfg
@@ -0,0 +1,187 @@
+# Azure configuration for Arvados Node Manager.
+# All times are in seconds unless specified otherwise.
+
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
+[Daemon]
+# The dispatcher can customize the start and stop procedure for
+# cloud nodes.  For example, the SLURM dispatcher drains nodes
+# through SLURM before shutting them down.
+#dispatcher = slurm
+
+# Node Manager will ensure that there are at least this many nodes running at
+# all times.  If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type.  However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
+min_nodes = 0
+
+# Node Manager will not start any compute nodes when at least this
+# many are running.
+max_nodes = 8
+
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
+# Poll Azure nodes and Arvados for new information every N seconds.
+poll_time = 15
+
+# Polls have exponential backoff when services fail to respond.
+# This is the longest time to wait between polls.
+max_poll_time = 300
+
+# If Node Manager can't succesfully poll a service for this long,
+# it will never start or stop compute nodes, on the assumption that its
+# information is too outdated.
+poll_stale_after = 600
+
+# If Node Manager boots a cloud node, and it does not pair with an Arvados
+# node before this long, assume that there was a cloud bootstrap failure and
+# shut it down.  Note that normal shutdown windows apply (see the Cloud
+# section), so this should be shorter than the first shutdown window value.
+boot_fail_after = 1800
+
+# "Node stale time" affects two related behaviors.
+# 1. If a compute node has been running for at least this long, but it
+# isn't paired with an Arvados node, do not shut it down, but leave it alone.
+# This prevents the node manager from shutting down a node that might
+# actually be doing work, but is having temporary trouble contacting the
+# API server.
+# 2. When the Node Manager starts a new compute node, it will try to reuse
+# an Arvados node that hasn't been updated for this long.
+node_stale_after = 14400
+
+# Scaling factor to be applied to nodes' available RAM size. Usually there's a
+# variable discrepancy between the advertised RAM value on cloud nodes and the
+# actual amount available.
+# If not set, this value will be set to 0.95
+node_mem_scaling = 0.95
+
+# File path for Certificate Authorities
+certs_file = /etc/ssl/certs/ca-certificates.crt
+
+[Logging]
+# Log file path
+#file = node-manager.log
+
+# Log level for most Node Manager messages.
+# Choose one of DEBUG, INFO, WARNING, ERROR, or CRITICAL.
+# WARNING lets you know when polling a service fails.
+# INFO additionally lets you know when a compute node is started or stopped.
+level = DEBUG
+
+# You can also set different log levels for specific libraries.
+# Pykka is the Node Manager's actor library.
+# Setting this to DEBUG will display tracebacks for uncaught
+# exceptions in the actors, but it's also very chatty.
+pykka = WARNING
+
+# Setting apiclient to INFO will log the URL of every Arvados API request.
+apiclient = WARNING
+
+[Arvados]
+host = 192.168.5.2:8000
+token = 2tnmn9ou33o3vk3bynzyzrc7aedhijo7ufa11j9kyv7509cygx
+timeout = 15
+
+# Accept an untrusted SSL certificate from the API server?
+insecure = yes
+
+[Cloud]
+provider = azure
+driver_class = arvnodeman.fake_driver.FakeDriver
+
+# Shutdown windows define periods of time when a node may and may not be shut
+# down.  These are windows in full minutes, separated by commas.  Counting from
+# the time the node is booted, the node WILL NOT shut down for N1 minutes; then
+# it MAY shut down for N2 minutes; then it WILL NOT shut down for N3 minutes;
+# and so on.  For example, "20, 999999" means the node may shut down between
+# the 20th and 999999th minutes of uptime.
+# Azure bills by the minute, so it makes sense to agressively shut down idle
+# nodes.  Specify at least two windows.  You can add as many as you need beyond
+# that.
+shutdown_windows = 5, 999999
+
+[Cloud Credentials]
+# Use "azure account list" with the azure CLI to get these values.
+tenant_id = 00000000-0000-0000-0000-000000000000
+subscription_id = 00000000-0000-0000-0000-000000000000
+
+# The following directions are based on
+# https://azure.microsoft.com/en-us/documentation/articles/resource-group-authenticate-service-principal/
+#
+# azure config mode arm
+# azure ad app create --name "<Your Application Display Name>" --home-page "<https://YourApplicationHomePage>" --identifier-uris "<https://YouApplicationUri>" --password <Your_Password>
+# azure ad sp create "<Application_Id>"
+# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/{subscriptionId}/
+#
+# Use <Application_Id> for "key" and the <Your_Password> for "secret"
+#
+key = 00000000-0000-0000-0000-000000000000
+secret = PASSWORD
+timeout = 60
+region = East US
+
+[Cloud List]
+# The resource group in which the compute node virtual machines will be created
+# and listed.
+ex_resource_group = ArvadosResourceGroup
+
+[Cloud Create]
+# The image id, in the form "Publisher:Offer:SKU:Version"
+image = Canonical:UbuntuServer:14.04.3-LTS:14.04.201508050
+
+# Path to a local ssh key file that will be used to provision new nodes.
+ssh_key = /home/peter/.ssh/id_rsa.pub
+
+# The account name for the admin user that will be provisioned on new nodes.
+ex_user_name = arvadosuser
+
+# The Azure storage account that will be used to store the node OS disk images.
+ex_storage_account = arvadosstorage
+
+# The virtual network the VMs will be associated with.
+ex_network = ArvadosNetwork
+
+# Optional subnet of the virtual network.
+#ex_subnet = default
+
+# Node tags
+tag_arvados-class = dynamic-compute
+tag_cluster = zyxwv
+
+# the API server to ping
+ping_host = 192.168.5.2:8000
+
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use.  The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue.  You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
+# Each size section MUST define the number of cores are available in this
+# size class (since libcloud does not provide any consistent API for exposing
+# this setting).
+# You may also want to define the amount of scratch space (expressed
+# in GB) for Crunch jobs.  You can also override Microsoft's provided
+# data fields by setting them here.
+
+[Size Standard_D3]
+cores = 4
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12

commit e4c7e6b368cf6d922db341580a2402a07c6cb079
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Jun 1 10:07:49 2017 -0400

    10312: Identify error message that look like we are hitting a quota or account limit.  Set soft node quota in order to stop trying to boot new nodes until the total node count goes down.  Probe node quota upward when at the soft limit and able to boot nodes successfully.
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curoverse.com>

diff --git a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 9ee26e3..ec8b1d1 100644
--- a/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -16,6 +16,8 @@ from ...clientactor import _notify_subscribers
 from ... import config
 from .transitions import transitions
 
+QuotaExceeded = "QuotaExceeded"
+
 class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
@@ -96,6 +98,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self.cloud_size = cloud_size
         self.arvados_node = None
         self.cloud_node = None
+        self.error = None
         if arvados_node is None:
             self._later.create_arvados_node()
         else:
@@ -119,11 +122,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     def create_cloud_node(self):
         self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
+        try:
+            self.cloud_node = self._cloud.create_node(self.cloud_size,
+                                                      self.arvados_node)
+        except Exception as e:
+            # The set of possible error codes / messages isn't documented for
+            # all clouds, so use a keyword heuristic to determine if the
+            # failure is likely due to a quota.
+            if re.search(r'(exceed|quota|limit)', e.message, re.I):
+                self.error = QuotaExceeded
+                self._logger.warning("Quota exceeded: %s", e)
+                self._finished()
+                return
+            else:
+                raise
 
         # The information included in the node size object we get from libcloud
-        # is inconsistent between cloud providers.  Replace libcloud NodeSize
+        # is inconsistent between cloud drivers.  Replace libcloud NodeSize
         # object with compatible CloudSizeWrapper object which merges the size
         # info reported from the cloud with size information from the
         # configuration file.
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index 9bfee79..8f9207e 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -122,6 +122,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_cloud_size = self.server_calculator.cheapest_size()
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
+        self.node_quota = max_nodes
         self.max_total_price = max_total_price
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
@@ -298,16 +299,17 @@ class NodeManagerDaemonActor(actor_class):
     def _nodes_wanted(self, size):
         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
         under_min = self.min_nodes - total_node_count
-        over_max = total_node_count - self.max_nodes
+        over_max = total_node_count - self.node_quota
         total_price = self._total_price()
 
         counts = self._state_counts(size)
 
         up_count = self._nodes_up(counts)
         busy_count = counts["busy"]
+        wishlist_count = self._size_wishlist(size)
 
         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
-                          self._size_wishlist(size),
+                          wishlist_count,
                           up_count,
                           counts["booting"],
                           counts["unpaired"],
@@ -321,7 +323,7 @@ class NodeManagerDaemonActor(actor_class):
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
 
-        wanted = self._size_wishlist(size) - (up_count - busy_count)
+        wanted = wishlist_count - (up_count - busy_count)
         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
             can_boot = int((self.max_total_price - total_price) / size.price)
             if can_boot == 0:
@@ -392,25 +394,46 @@ class NodeManagerDaemonActor(actor_class):
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
                 time.time())
-        new_setup.subscribe(self._later.node_up)
+        new_setup.subscribe(self._later.node_setup_finished)
         if nodes_wanted > 1:
             self._later.start_node(cloud_size)
 
     def _get_actor_attrs(self, actor, *attr_names):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
 
-    def node_up(self, setup_proxy):
+    def node_setup_finished(self, setup_proxy):
         # Called when a SetupActor has completed.
-        cloud_node, arvados_node = self._get_actor_attrs(
-            setup_proxy, 'cloud_node', 'arvados_node')
+        cloud_node, arvados_node, error = self._get_actor_attrs(
+            setup_proxy, 'cloud_node', 'arvados_node', 'error')
         setup_proxy.stop()
 
-        # If cloud_node is None then the node create wasn't
-        # successful and so there isn't anything to do.
-        if cloud_node is not None:
+        total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
+        if cloud_node is None:
+            # If cloud_node is None then the node create wasn't successful.
+            if error == dispatch.QuotaExceeded:
+                # We've hit a quota limit, so adjust node_quota to stop trying to
+                # boot new nodes until the node count goes down.
+                self.node_quota = min(total_node_count-1, self.max_nodes)
+                self._logger.warning("Setting node quota to %s", self.node_quota)
+        else:
             # Node creation succeeded.  Update cloud node list.
             cloud_node._nodemanager_recently_booted = True
             self._register_cloud_node(cloud_node)
+
+            # Different quota policies may in force depending on the cloud
+            # provider, account limits, and the specific mix of nodes sizes
+            # that are already created.  If we are right at the quota limit,
+            # we want to probe to see if the last quota still applies or if we
+            # are allowed to create more nodes.
+            #
+            # For example, if the quota is actually based on core count, the
+            # quota might be 20 single-core machines or 10 dual-core machines.
+            # If we previously set node_quota to 10 dual core machines, but are
+            # now booting single core machines (actual quota 20), we want to
+            # allow the quota to expand so we don't get stuck at 10 machines
+            # forever.
+            if total_node_count == self.node_quota and self.node_quota < self.max_nodes:
+                self.node_quota += 1
         del self.booting[setup_proxy.actor_ref.actor_urn]
         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list