[ARVADOS] updated: 8f9f0dece977ccf5a778f3b3bd2379375e723c18

git at public.curoverse.com git at public.curoverse.com
Wed Nov 12 13:59:50 EST 2014


Summary of changes:
 .../nodemanager/arvnodeman/computenode/__init__.py | 353 ---------------------
 .../computenode/{ => dispatch}/__init__.py         | 123 +------
 .../arvnodeman/computenode/driver/__init__.py      |  64 ++++
 .../arvnodeman/computenode/{ => driver}/dummy.py   |   3 +-
 .../arvnodeman/computenode/{ => driver}/ec2.py     |   3 +-
 services/nodemanager/arvnodeman/config.py          |   2 +-
 services/nodemanager/arvnodeman/daemon.py          |   7 +-
 services/nodemanager/arvnodeman/launcher.py        |   4 +-
 services/nodemanager/tests/test_computenode.py     | 279 ----------------
 ...computenode.py => test_computenode_dispatch.py} |  35 +-
 ...enode_ec2.py => test_computenode_driver_ec2.py} |   2 +-
 services/nodemanager/tests/test_daemon.py          |   4 +-
 12 files changed, 85 insertions(+), 794 deletions(-)
 copy services/nodemanager/arvnodeman/computenode/{ => dispatch}/__init__.py (69%)
 create mode 100644 services/nodemanager/arvnodeman/computenode/driver/__init__.py
 rename services/nodemanager/arvnodeman/computenode/{ => driver}/dummy.py (96%)
 rename services/nodemanager/arvnodeman/computenode/{ => driver}/ec2.py (98%)
 copy services/nodemanager/tests/{test_computenode.py => test_computenode_dispatch.py} (89%)
 rename services/nodemanager/tests/{test_computenode_ec2.py => test_computenode_driver_ec2.py} (98%)

       via  8f9f0dece977ccf5a778f3b3bd2379375e723c18 (commit)
       via  b626a85eb86fd4909712852040cd305c71c37ee5 (commit)
      from  5141c3ee23e89696773e227a93236ef2a51543c2 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 8f9f0dece977ccf5a778f3b3bd2379375e723c18
Merge: 5141c3e b626a85
Author: Brett Smith <brett at curoverse.com>
Date:   Wed Nov 12 13:59:14 2014 -0500

    Merge branch '4380-node-manager-computenode-reorg-wip'
    
    Refs #4380.  Closes #4496.


commit b626a85eb86fd4909712852040cd305c71c37ee5
Author: Brett Smith <brett at curoverse.com>
Date:   Tue Nov 11 17:23:14 2014 -0500

    4380: Reorganize arvnodeman.computenode.
    
    This makes the hierarchy a little richer:
    
    * arvnodeman.computenode.driver has all the cloud driver wrappers.
    * arvnodeman.computenode.dispatch will be just like that, except it
      will consider local dispatch concerns.  For example, I'm going to
      add a SLURM submodule here to take care of draining.
    * arvnodeman.computenode still has utility functions and
      ShutdownTimer.

diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py
index 63effe9..4955992 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/__init__.py
@@ -2,16 +2,9 @@
 
 from __future__ import absolute_import, print_function
 
-import functools
 import itertools
-import logging
 import time
 
-import pykka
-
-from ..clientactor import _notify_subscribers
-from .. import config
-
 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
     hostname = arvados_node.get('hostname') or default_hostname
     return '{}.{}'.format(hostname, arvados_node['domain'])
@@ -23,252 +16,6 @@ def arvados_node_mtime(node):
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
 
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
-
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
-    """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
-
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
-
-class ComputeNodeStateChangeBase(config.actor_class):
-    """Base class for actors that change a compute node's state.
-
-    This base class takes care of retrying changes and notifying
-    subscribers when the change is finished.
-    """
-    def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
-        super(ComputeNodeStateChangeBase, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._timer = timer_actor
-        self._logger = logging.getLogger(logger_name)
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
-        self.subscribers = set()
-
-    @staticmethod
-    def _retry(errors):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises any of the given errors.
-        """
-        def decorator(orig_func):
-            @functools.wraps(orig_func)
-            def wrapper(self, *args, **kwargs):
-                try:
-                    orig_func(self, *args, **kwargs)
-                except errors as error:
-                    self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
-                    self.retry_wait = min(self.retry_wait * 2,
-                                          self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return wrapper
-        return decorator
-
-    def _finished(self):
-        _notify_subscribers(self._later, self.subscribers)
-        self.subscribers = None
-
-    def subscribe(self, subscriber):
-        if self.subscribers is None:
-            try:
-                subscriber(self._later)
-            except pykka.ActorDeadError:
-                pass
-        else:
-            self.subscribers.add(subscriber)
-
-
-class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
-    """Actor to create and set up a cloud compute node.
-
-    This actor prepares an Arvados node record for a new compute node
-    (either creating one or cleaning one passed in), then boots the
-    actual compute node.  It notifies subscribers when the cloud node
-    is successfully created (the last step in the process for Node
-    Manager to handle).
-    """
-    def __init__(self, timer_actor, arvados_client, cloud_client,
-                 cloud_size, arvados_node=None,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
-        self._arvados = arvados_client
-        self._cloud = cloud_client
-        self.cloud_size = cloud_size
-        self.arvados_node = None
-        self.cloud_node = None
-        if arvados_node is None:
-            self._later.create_arvados_node()
-        else:
-            self._later.prepare_arvados_node(arvados_node)
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
-    def prepare_arvados_node(self, node):
-        self.arvados_node = self._arvados.nodes().update(
-            uuid=node['uuid'],
-            body={'hostname': None,
-                  'ip_address': None,
-                  'slot_number': None,
-                  'first_ping_at': None,
-                  'last_ping_at': None,
-                  'info': {'ec2_instance_id': None,
-                           'last_action': "Prepared by Node Manager"}}
-            ).execute()
-        self._later.create_cloud_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
-                          self.cloud_size.name)
-        self.cloud_node = self._cloud.create_node(self.cloud_size,
-                                                  self.arvados_node)
-        self._logger.info("Cloud node %s created.", self.cloud_node.id)
-        self._finished()
-
-    def stop_if_no_cloud_node(self):
-        if self.cloud_node is None:
-            self.stop()
-
-
-class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
-    """Actor to shut down a compute node.
-
-    This actor simply destroys a cloud node, retrying as needed.
-    """
-    def __init__(self, timer_actor, cloud_client, cloud_node,
-                 retry_wait=1, max_retry_wait=180):
-        super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
-        self._cloud = cloud_client
-        self.cloud_node = cloud_node
-        self._later.shutdown_node()
-
-    @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
-    def shutdown_node(self):
-        self._cloud.destroy_node(self.cloud_node)
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
-        self._finished()
-
-
-class ComputeNodeUpdateActor(config.actor_class):
-    """Actor to dispatch one-off cloud management requests.
-
-    This actor receives requests for small cloud updates, and
-    dispatches them to a real driver.  ComputeNodeMonitorActors use
-    this to perform maintenance tasks on themselves.  Having a
-    dedicated actor for this gives us the opportunity to control the
-    flow of requests; e.g., by backing off when errors occur.
-
-    This actor is most like a "traditional" Pykka actor: there's no
-    subscribing, but instead methods return real driver results.  If
-    you're interested in those results, you should get them from the
-    Future that the proxy method returns.  Be prepared to handle exceptions
-    from the cloud driver when you do.
-    """
-    def __init__(self, cloud_factory, max_retry_wait=180):
-        super(ComputeNodeUpdateActor, self).__init__()
-        self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
-
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except config.CLOUD_ERRORS:
-                self.error_streak += 1
-                self.next_request_time += min(2 ** self.error_streak,
-                                              self.max_retry_wait)
-                raise
-            else:
-                self.error_streak = 0
-                return result
-        return wrapper
-
-    @_throttle_errors
-    def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
-
-
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
@@ -309,103 +56,3 @@ class ShutdownTimer(object):
     def window_open(self):
         self._advance_opening()
         return 0 < (time.time() - self._open_start) < self._open_for
-
-
-class ComputeNodeMonitorActor(config.actor_class):
-    """Actor to manage a running compute node.
-
-    This actor gets updates about a compute node's cloud and Arvados records.
-    It uses this information to notify subscribers when the node is eligible
-    for shutdown.
-    """
-    def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 timer_actor, update_actor, arvados_node=None,
-                 poll_stale_after=600, node_stale_after=3600):
-        super(ComputeNodeMonitorActor, self).__init__()
-        self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
-        self._last_log = None
-        self._shutdowns = shutdown_timer
-        self._timer = timer_actor
-        self._update = update_actor
-        self.cloud_node = cloud_node
-        self.cloud_node_start_time = cloud_node_start_time
-        self.poll_stale_after = poll_stale_after
-        self.node_stale_after = node_stale_after
-        self.subscribers = set()
-        self.arvados_node = None
-        self._later.update_arvados_node(arvados_node)
-        self.last_shutdown_opening = None
-        self._later.consider_shutdown()
-
-    def subscribe(self, subscriber):
-        self.subscribers.add(subscriber)
-
-    def _debug(self, msg, *args):
-        if msg == self._last_log:
-            return
-        self._last_log = msg
-        self._logger.debug(msg, *args)
-
-    def in_state(self, *states):
-        # Return a boolean to say whether or not our Arvados node record is in
-        # one of the given states.  If state information is not
-        # available--because this node has no Arvados record, the record is
-        # stale, or the record has no state information--return None.
-        if (self.arvados_node is None) or not timestamp_fresh(
-              arvados_node_mtime(self.arvados_node), self.node_stale_after):
-            return None
-        state = self.arvados_node['info'].get('slurm_state')
-        if not state:
-            return None
-        result = state in states
-        if state == 'idle':
-            result = result and not self.arvados_node['job_uuid']
-        return result
-
-    def _shutdown_eligible(self):
-        if self.arvados_node is None:
-            # If this is a new, unpaired node, it's eligible for
-            # shutdown--we figure there was an error during bootstrap.
-            return timestamp_fresh(self.cloud_node_start_time,
-                                   self.node_stale_after)
-        else:
-            return self.in_state('idle')
-
-    def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self._shutdowns.window_open():
-            if self._shutdown_eligible():
-                self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-                _notify_subscribers(self._later, self.subscribers)
-            else:
-                self._debug("Node %s shutdown window open but node busy.",
-                            self.cloud_node.id)
-        else:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-        if self.last_shutdown_opening != next_opening:
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
-
-    def offer_arvados_pair(self, arvados_node):
-        if self.arvados_node is not None:
-            return None
-        elif arvados_node['ip_address'] in self.cloud_node.private_ips:
-            self._later.update_arvados_node(arvados_node)
-            return self.cloud_node.id
-        else:
-            return None
-
-    def update_cloud_node(self, cloud_node):
-        if cloud_node is not None:
-            self.cloud_node = cloud_node
-            self._later.consider_shutdown()
-
-    def update_arvados_node(self, arvados_node):
-        if arvados_node is not None:
-            self.arvados_node = arvados_node
-            new_hostname = arvados_node_fqdn(self.arvados_node)
-            if new_hostname != self.cloud_node.name:
-                self._update.sync_node(self.cloud_node, self.arvados_node)
-            self._later.consider_shutdown()
diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
similarity index 69%
copy from services/nodemanager/arvnodeman/computenode/__init__.py
copy to services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
index 63effe9..d613ef1 100644
--- a/services/nodemanager/arvnodeman/computenode/__init__.py
+++ b/services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
@@ -3,89 +3,14 @@
 from __future__ import absolute_import, print_function
 
 import functools
-import itertools
 import logging
 import time
 
 import pykka
 
-from ..clientactor import _notify_subscribers
-from .. import config
-
-def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
-    hostname = arvados_node.get('hostname') or default_hostname
-    return '{}.{}'.format(hostname, arvados_node['domain'])
-
-def arvados_node_mtime(node):
-    return time.mktime(time.strptime(node['modified_at'] + 'UTC',
-                                     '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
-
-def timestamp_fresh(timestamp, fresh_time):
-    return (time.time() - timestamp) < fresh_time
-
-class BaseComputeNodeDriver(object):
-    """Abstract base class for compute node drivers.
-
-    libcloud abstracts away many of the differences between cloud providers,
-    but managing compute nodes requires some cloud-specific features (e.g.,
-    on EC2 we use tags to identify compute nodes).  Compute node drivers
-    are responsible for translating the node manager's cloud requests to a
-    specific cloud's vocabulary.
-
-    Subclasses must implement arvados_create_kwargs (to update node
-    creation kwargs with information about the specific Arvados node
-    record), sync_node, and node_start_time.
-    """
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
-        self.real = driver_class(**auth_kwargs)
-        self.list_kwargs = list_kwargs
-        self.create_kwargs = create_kwargs
-
-    def __getattr__(self, name):
-        # Proxy non-extension methods to the real driver.
-        if (not name.startswith('_') and not name.startswith('ex_')
-              and hasattr(self.real, name)):
-            return getattr(self.real, name)
-        else:
-            return super(BaseComputeNodeDriver, self).__getattr__(name)
-
-    def search_for(self, term, list_method, key=lambda item: item.id):
-        cache_key = (list_method, term)
-        if cache_key not in self.SEARCH_CACHE:
-            results = [item for item in getattr(self.real, list_method)()
-                       if key(item) == term]
-            count = len(results)
-            if count != 1:
-                raise ValueError("{} returned {} results for '{}'".format(
-                        list_method, count, term))
-            self.SEARCH_CACHE[cache_key] = results[0]
-        return self.SEARCH_CACHE[cache_key]
-
-    def list_nodes(self):
-        return self.real.list_nodes(**self.list_kwargs)
-
-    def arvados_create_kwargs(self, arvados_node):
-        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
-
-    def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
-
-    def sync_node(self, cloud_node, arvados_node):
-        # When a compute node first pings the API server, the API server
-        # will automatically assign some attributes on the corresponding
-        # node record, like hostname.  This method should propagate that
-        # information back to the cloud node appropriately.
-        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
-
-    @classmethod
-    def node_start_time(cls, node):
-        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
-
-
-ComputeNodeDriverClass = BaseComputeNodeDriver
+from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
+from ...clientactor import _notify_subscribers
+from ... import config
 
 class ComputeNodeStateChangeBase(config.actor_class):
     """Base class for actors that change a compute node's state.
@@ -269,48 +194,6 @@ class ComputeNodeUpdateActor(config.actor_class):
         return self._cloud.sync_node(cloud_node, arvados_node)
 
 
-class ShutdownTimer(object):
-    """Keep track of a cloud node's shutdown windows.
-
-    Instantiate this class with a timestamp of when a cloud node started,
-    and a list of durations (in minutes) of when the node must not and may
-    be shut down, alternating.  The class will tell you when a shutdown
-    window is open, and when the next open window will start.
-    """
-    def __init__(self, start_time, shutdown_windows):
-        # The implementation is easiest if we have an even number of windows,
-        # because then windows always alternate between open and closed.
-        # Rig that up: calculate the first shutdown window based on what's
-        # passed in.  Then, if we were given an odd number of windows, merge
-        # that first window into the last one, since they both# represent
-        # closed state.
-        first_window = shutdown_windows[0]
-        shutdown_windows = list(shutdown_windows[1:])
-        self._next_opening = start_time + (60 * first_window)
-        if len(shutdown_windows) % 2:
-            shutdown_windows.append(first_window)
-        else:
-            shutdown_windows[-1] += first_window
-        self.shutdown_windows = itertools.cycle([60 * n
-                                                 for n in shutdown_windows])
-        self._open_start = self._next_opening
-        self._open_for = next(self.shutdown_windows)
-
-    def _advance_opening(self):
-        while self._next_opening < time.time():
-            self._open_start = self._next_opening
-            self._next_opening += self._open_for + next(self.shutdown_windows)
-            self._open_for = next(self.shutdown_windows)
-
-    def next_opening(self):
-        self._advance_opening()
-        return self._next_opening
-
-    def window_open(self):
-        self._advance_opening()
-        return 0 < (time.time() - self._open_start) < self._open_for
-
-
 class ComputeNodeMonitorActor(config.actor_class):
     """Actor to manage a running compute node.
 
diff --git a/services/nodemanager/arvnodeman/computenode/driver/__init__.py b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
new file mode 100644
index 0000000..a20cfde
--- /dev/null
+++ b/services/nodemanager/arvnodeman/computenode/driver/__init__.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+
+class BaseComputeNodeDriver(object):
+    """Abstract base class for compute node drivers.
+
+    libcloud abstracts away many of the differences between cloud providers,
+    but managing compute nodes requires some cloud-specific features (e.g.,
+    on EC2 we use tags to identify compute nodes).  Compute node drivers
+    are responsible for translating the node manager's cloud requests to a
+    specific cloud's vocabulary.
+
+    Subclasses must implement arvados_create_kwargs (to update node
+    creation kwargs with information about the specific Arvados node
+    record), sync_node, and node_start_time.
+    """
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+        self.real = driver_class(**auth_kwargs)
+        self.list_kwargs = list_kwargs
+        self.create_kwargs = create_kwargs
+
+    def __getattr__(self, name):
+        # Proxy non-extension methods to the real driver.
+        if (not name.startswith('_') and not name.startswith('ex_')
+              and hasattr(self.real, name)):
+            return getattr(self.real, name)
+        else:
+            return super(BaseComputeNodeDriver, self).__getattr__(name)
+
+    def search_for(self, term, list_method, key=lambda item: item.id):
+        cache_key = (list_method, term)
+        if cache_key not in self.SEARCH_CACHE:
+            results = [item for item in getattr(self.real, list_method)()
+                       if key(item) == term]
+            count = len(results)
+            if count != 1:
+                raise ValueError("{} returned {} results for '{}'".format(
+                        list_method, count, term))
+            self.SEARCH_CACHE[cache_key] = results[0]
+        return self.SEARCH_CACHE[cache_key]
+
+    def list_nodes(self):
+        return self.real.list_nodes(**self.list_kwargs)
+
+    def arvados_create_kwargs(self, arvados_node):
+        raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
+
+    def create_node(self, size, arvados_node):
+        kwargs = self.create_kwargs.copy()
+        kwargs.update(self.arvados_create_kwargs(arvados_node))
+        kwargs['size'] = size
+        return self.real.create_node(**kwargs)
+
+    def sync_node(self, cloud_node, arvados_node):
+        # When a compute node first pings the API server, the API server
+        # will automatically assign some attributes on the corresponding
+        # node record, like hostname.  This method should propagate that
+        # information back to the cloud node appropriately.
+        raise NotImplementedError("BaseComputeNodeDriver.sync_node")
+
+    @classmethod
+    def node_start_time(cls, node):
+        raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
diff --git a/services/nodemanager/arvnodeman/computenode/dummy.py b/services/nodemanager/arvnodeman/computenode/driver/dummy.py
similarity index 96%
rename from services/nodemanager/arvnodeman/computenode/dummy.py
rename to services/nodemanager/arvnodeman/computenode/driver/dummy.py
index 6c39fea..3a286bb 100644
--- a/services/nodemanager/arvnodeman/computenode/dummy.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/dummy.py
@@ -7,7 +7,8 @@ import time
 import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 class ComputeNodeDriver(BaseComputeNodeDriver):
     """Compute node driver wrapper for libcloud's dummy driver.
diff --git a/services/nodemanager/arvnodeman/computenode/ec2.py b/services/nodemanager/arvnodeman/computenode/driver/ec2.py
similarity index 98%
rename from services/nodemanager/arvnodeman/computenode/ec2.py
rename to services/nodemanager/arvnodeman/computenode/driver/ec2.py
index 359bed4..c0992f7 100644
--- a/services/nodemanager/arvnodeman/computenode/ec2.py
+++ b/services/nodemanager/arvnodeman/computenode/driver/ec2.py
@@ -9,7 +9,8 @@ import libcloud.compute.providers as cloud_provider
 import libcloud.compute.types as cloud_types
 from libcloud.compute.drivers import ec2 as cloud_ec2
 
-from . import BaseComputeNodeDriver, arvados_node_fqdn
+from . import BaseComputeNodeDriver
+from .. import arvados_node_fqdn
 
 ### Monkeypatch libcloud to support AWS' new SecurityGroup API.
 # These classes can be removed when libcloud support specifying
diff --git a/services/nodemanager/arvnodeman/config.py b/services/nodemanager/arvnodeman/config.py
index 754b931..24fd828 100644
--- a/services/nodemanager/arvnodeman/config.py
+++ b/services/nodemanager/arvnodeman/config.py
@@ -85,7 +85,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                            http=http)
 
     def new_cloud_client(self):
-        module = importlib.import_module('arvnodeman.computenode.' +
+        module = importlib.import_module('arvnodeman.computenode.driver.' +
                                          self.get('Cloud', 'provider'))
         auth_kwargs = self.get_section('Cloud Credentials')
         if 'timeout' in auth_kwargs:
diff --git a/services/nodemanager/arvnodeman/daemon.py b/services/nodemanager/arvnodeman/daemon.py
index eaf10be..7ff736f 100644
--- a/services/nodemanager/arvnodeman/daemon.py
+++ b/services/nodemanager/arvnodeman/daemon.py
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from .computenode import dispatch
 from .config import actor_class
 
 class _ComputeNodeRecord(object):
@@ -96,9 +97,9 @@ class NodeManagerDaemonActor(actor_class):
                  arvados_factory, cloud_factory,
                  shutdown_windows, min_nodes, max_nodes,
                  poll_stale_after=600, node_stale_after=7200,
-                 node_setup_class=cnode.ComputeNodeSetupActor,
-                 node_shutdown_class=cnode.ComputeNodeShutdownActor,
-                 node_actor_class=cnode.ComputeNodeMonitorActor):
+                 node_setup_class=dispatch.ComputeNodeSetupActor,
+                 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
+                 node_actor_class=dispatch.ComputeNodeMonitorActor):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py
index f4ad716..d2f4afe 100644
--- a/services/nodemanager/arvnodeman/launcher.py
+++ b/services/nodemanager/arvnodeman/launcher.py
@@ -12,9 +12,7 @@ import daemon
 import pykka
 
 from . import config as nmconfig
-from .computenode import \
-    ComputeNodeSetupActor, ComputeNodeShutdownActor, ComputeNodeUpdateActor, \
-    ShutdownTimer
+from .computenode.dispatch import ComputeNodeUpdateActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode.py
index 5ced5f9..e22cccc 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode.py
@@ -6,137 +6,11 @@ import time
 import unittest
 
 import arvados.errors as arverror
-import httplib2
 import mock
-import pykka
 
 import arvnodeman.computenode as cnode
 from . import testutil
 
-class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
-    def make_mocks(self, arvados_effect=None, cloud_effect=None):
-        if arvados_effect is None:
-            arvados_effect = [testutil.arvados_node_mock()]
-        self.arvados_effect = arvados_effect
-        self.timer = testutil.MockTimer()
-        self.api_client = mock.MagicMock(name='api_client')
-        self.api_client.nodes().create().execute.side_effect = arvados_effect
-        self.api_client.nodes().update().execute.side_effect = arvados_effect
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks(arvados_effect=[arv_node])
-        self.setup_actor = cnode.ComputeNodeSetupActor.start(
-            self.timer, self.api_client, self.cloud_client,
-            testutil.MockSize(1), arv_node).proxy()
-
-    def test_creation_without_arvados_node(self):
-        self.make_actor()
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().create().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_creation_with_arvados_node(self):
-        self.make_actor(testutil.arvados_node_mock())
-        self.assertEqual(self.arvados_effect[-1],
-                         self.setup_actor.arvados_node.get(self.TIMEOUT))
-        self.assertTrue(self.api_client.nodes().update().execute.called)
-        self.assertEqual(self.cloud_client.create_node(),
-                         self.setup_actor.cloud_node.get(self.TIMEOUT))
-
-    def test_failed_calls_retried(self):
-        self.make_mocks([
-                arverror.ApiError(httplib2.Response({'status': '500'}), ""),
-                testutil.arvados_node_mock(),
-                ])
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-
-    def test_stop_when_no_cloud_node(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        self.setup_actor.stop_if_no_cloud_node()
-        self.assertTrue(
-            self.setup_actor.actor_ref.actor_stopped.wait(self.TIMEOUT))
-
-    def test_no_stop_when_cloud_node(self):
-        self.make_actor()
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.stop_if_no_cloud_node().get(self.TIMEOUT)
-        self.assertTrue(self.stop_proxy(self.setup_actor),
-                        "actor was stopped by stop_if_no_cloud_node")
-
-    def test_subscribe(self):
-        self.make_mocks(
-            arverror.ApiError(httplib2.Response({'status': '500'}), ""))
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.setup_actor.subscribe(subscriber)
-        self.api_client.nodes().create().execute.side_effect = [
-            testutil.arvados_node_mock()]
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.wait_for_assignment(self.setup_actor, 'cloud_node')
-        self.setup_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.setup_actor)
-        self.assertEqual(self.setup_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
-                                       unittest.TestCase):
-    def make_mocks(self, cloud_node=None):
-        self.timer = testutil.MockTimer()
-        self.cloud_client = mock.MagicMock(name='cloud_client')
-        if cloud_node is None:
-            cloud_node = testutil.cloud_node_mock()
-        self.cloud_node = cloud_node
-
-    def make_actor(self, arv_node=None):
-        if not hasattr(self, 'timer'):
-            self.make_mocks()
-        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
-            self.timer, self.cloud_client, self.cloud_node).proxy()
-
-    def test_easy_shutdown(self):
-        self.make_actor()
-        self.shutdown_actor.cloud_node.get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertTrue(self.cloud_client.destroy_node.called)
-
-    def test_late_subscribe(self):
-        self.make_actor()
-        subscriber = mock.Mock(name='subscriber_mock')
-        self.shutdown_actor.subscribe(subscriber).get(self.TIMEOUT)
-        self.stop_proxy(self.shutdown_actor)
-        self.assertEqual(self.shutdown_actor.actor_ref.actor_urn,
-                         subscriber.call_args[0][0].actor_ref.actor_urn)
-
-
-class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
-                                     unittest.TestCase):
-    def make_actor(self):
-        self.driver = mock.MagicMock(name='driver_mock')
-        self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
-
-    def test_node_sync(self):
-        self.make_actor()
-        cloud_node = testutil.cloud_node_mock()
-        arv_node = testutil.arvados_node_mock()
-        self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        self.driver().sync_node.assert_called_with(cloud_node, arv_node)
-
-
 @mock.patch('time.time', return_value=1)
 class ShutdownTimerTestCase(unittest.TestCase):
     def test_two_length_window(self, time_mock):
@@ -160,156 +34,3 @@ class ShutdownTimerTestCase(unittest.TestCase):
         time_mock.return_value += 200
         self.assertEqual(961, timer.next_opening())
         self.assertFalse(timer.window_open())
-
-
-class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
-                                      unittest.TestCase):
-    class MockShutdownTimer(object):
-        def _set_state(self, is_open, next_opening):
-            self.window_open = lambda: is_open
-            self.next_opening = lambda: next_opening
-
-
-    def make_mocks(self, node_num):
-        self.shutdowns = self.MockShutdownTimer()
-        self.shutdowns._set_state(False, 300)
-        self.timer = mock.MagicMock(name='timer_mock')
-        self.updates = mock.MagicMock(name='update_mock')
-        self.cloud_mock = testutil.cloud_node_mock(node_num)
-        self.subscriber = mock.Mock(name='subscriber_mock')
-
-    def make_actor(self, node_num=1, arv_node=None, start_time=None):
-        if not hasattr(self, 'cloud_mock'):
-            self.make_mocks(node_num)
-        if start_time is None:
-            start_time = time.time()
-        self.node_actor = cnode.ComputeNodeMonitorActor.start(
-            self.cloud_mock, start_time, self.shutdowns, self.timer,
-            self.updates, arv_node).proxy()
-        self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
-    def node_state(self, *states):
-        return self.node_actor.in_state(*states).get(self.TIMEOUT)
-
-    def test_in_state_when_unpaired(self):
-        self.make_actor()
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_pairing_stale(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(
-                job_uuid=None, age=90000))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_state_when_no_state_available(self):
-        self.make_actor(arv_node=testutil.arvados_node_mock(info={}))
-        self.assertIsNone(self.node_state('idle', 'alloc'))
-
-    def test_in_idle_state(self):
-        self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
-        self.assertTrue(self.node_state('idle'))
-        self.assertFalse(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_in_alloc_state(self):
-        self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
-        self.assertFalse(self.node_state('idle'))
-        self.assertTrue(self.node_state('alloc'))
-        self.assertTrue(self.node_state('idle', 'alloc'))
-
-    def test_init_shutdown_scheduling(self):
-        self.make_actor()
-        self.assertTrue(self.timer.schedule.called)
-        self.assertEqual(300, self.timer.schedule.call_args[0][0])
-
-    def test_shutdown_subscription(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-        self.assertEqual(self.node_actor.actor_ref.actor_urn,
-                         self.subscriber.call_args[0][0].actor_ref.actor_urn)
-
-    def test_shutdown_without_arvados_node(self):
-        self.make_actor()
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertTrue(self.subscriber.called)
-
-    def test_no_shutdown_without_arvados_node_and_old_cloud_node(self):
-        self.make_actor(start_time=0)
-        self.shutdowns._set_state(True, 600)
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.assertFalse(self.subscriber.called)
-
-    def check_shutdown_rescheduled(self, window_open, next_window,
-                                   schedule_time=None):
-        self.shutdowns._set_state(window_open, next_window)
-        self.timer.schedule.reset_mock()
-        self.node_actor.consider_shutdown().get(self.TIMEOUT)
-        self.stop_proxy(self.node_actor)
-        self.assertTrue(self.timer.schedule.called)
-        if schedule_time is not None:
-            self.assertEqual(schedule_time, self.timer.schedule.call_args[0][0])
-        self.assertFalse(self.subscriber.called)
-
-    def test_shutdown_window_close_scheduling(self):
-        self.make_actor()
-        self.check_shutdown_rescheduled(False, 600, 600)
-
-    def test_no_shutdown_when_node_running_job(self):
-        self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_unknown(self):
-        self.make_actor(5, testutil.arvados_node_mock(5, info={}))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_no_shutdown_when_node_state_stale(self):
-        self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
-        self.check_shutdown_rescheduled(True, 600)
-
-    def test_arvados_node_match(self):
-        self.make_actor(2)
-        arv_node = testutil.arvados_node_mock(
-            2, hostname='compute-two.zzzzz.arvadosapi.com')
-        pair_id = self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT)
-        self.assertEqual(self.cloud_mock.id, pair_id)
-        self.stop_proxy(self.node_actor)
-        self.updates.sync_node.assert_called_with(self.cloud_mock, arv_node)
-
-    def test_arvados_node_mismatch(self):
-        self.make_actor(3)
-        arv_node = testutil.arvados_node_mock(1)
-        self.assertIsNone(
-            self.node_actor.offer_arvados_pair(arv_node).get(self.TIMEOUT))
-
-    def test_update_cloud_node(self):
-        self.make_actor(1)
-        self.make_mocks(2)
-        self.cloud_mock.id = '1'
-        self.node_actor.update_cloud_node(self.cloud_mock)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(2)],
-                         current_cloud.private_ips)
-
-    def test_missing_cloud_node_update(self):
-        self.make_actor(1)
-        self.node_actor.update_cloud_node(None)
-        current_cloud = self.node_actor.cloud_node.get(self.TIMEOUT)
-        self.assertEqual([testutil.ip_address_mock(1)],
-                         current_cloud.private_ips)
-
-    def test_update_arvados_node(self):
-        self.make_actor(3)
-        job_uuid = 'zzzzz-jjjjj-updatejobnode00'
-        new_arvados = testutil.arvados_node_mock(3, job_uuid)
-        self.node_actor.update_arvados_node(new_arvados)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(job_uuid, current_arvados['job_uuid'])
-
-    def test_missing_arvados_node_update(self):
-        self.make_actor(4, testutil.arvados_node_mock(4))
-        self.node_actor.update_arvados_node(None)
-        current_arvados = self.node_actor.arvados_node.get(self.TIMEOUT)
-        self.assertEqual(testutil.ip_address_mock(4),
-                         current_arvados['ip_address'])
diff --git a/services/nodemanager/tests/test_computenode.py b/services/nodemanager/tests/test_computenode_dispatch.py
similarity index 89%
copy from services/nodemanager/tests/test_computenode.py
copy to services/nodemanager/tests/test_computenode_dispatch.py
index 5ced5f9..ece186b 100644
--- a/services/nodemanager/tests/test_computenode.py
+++ b/services/nodemanager/tests/test_computenode_dispatch.py
@@ -10,7 +10,7 @@ import httplib2
 import mock
 import pykka
 
-import arvnodeman.computenode as cnode
+import arvnodeman.computenode.dispatch as dispatch
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
@@ -28,7 +28,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
             self.make_mocks(arvados_effect=[arv_node])
-        self.setup_actor = cnode.ComputeNodeSetupActor.start(
+        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
             self.timer, self.api_client, self.cloud_client,
             testutil.MockSize(1), arv_node).proxy()
 
@@ -105,7 +105,7 @@ class ComputeNodeShutdownActorTestCase(testutil.ActorTestMixin,
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
             self.make_mocks()
-        self.shutdown_actor = cnode.ComputeNodeShutdownActor.start(
+        self.shutdown_actor = dispatch.ComputeNodeShutdownActor.start(
             self.timer, self.cloud_client, self.cloud_node).proxy()
 
     def test_easy_shutdown(self):
@@ -127,7 +127,7 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
     def make_actor(self):
         self.driver = mock.MagicMock(name='driver_mock')
-        self.updater = cnode.ComputeNodeUpdateActor.start(self.driver).proxy()
+        self.updater = dispatch.ComputeNodeUpdateActor.start(self.driver).proxy()
 
     def test_node_sync(self):
         self.make_actor()
@@ -137,31 +137,6 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
         self.driver().sync_node.assert_called_with(cloud_node, arv_node)
 
 
- at mock.patch('time.time', return_value=1)
-class ShutdownTimerTestCase(unittest.TestCase):
-    def test_two_length_window(self, time_mock):
-        timer = cnode.ShutdownTimer(time_mock.return_value, [8, 2])
-        self.assertEqual(481, timer.next_opening())
-        self.assertFalse(timer.window_open())
-        time_mock.return_value += 500
-        self.assertEqual(1081, timer.next_opening())
-        self.assertTrue(timer.window_open())
-        time_mock.return_value += 200
-        self.assertEqual(1081, timer.next_opening())
-        self.assertFalse(timer.window_open())
-
-    def test_three_length_window(self, time_mock):
-        timer = cnode.ShutdownTimer(time_mock.return_value, [6, 3, 1])
-        self.assertEqual(361, timer.next_opening())
-        self.assertFalse(timer.window_open())
-        time_mock.return_value += 400
-        self.assertEqual(961, timer.next_opening())
-        self.assertTrue(timer.window_open())
-        time_mock.return_value += 200
-        self.assertEqual(961, timer.next_opening())
-        self.assertFalse(timer.window_open())
-
-
 class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
                                       unittest.TestCase):
     class MockShutdownTimer(object):
@@ -183,7 +158,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
             self.make_mocks(node_num)
         if start_time is None:
             start_time = time.time()
-        self.node_actor = cnode.ComputeNodeMonitorActor.start(
+        self.node_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_mock, start_time, self.shutdowns, self.timer,
             self.updates, arv_node).proxy()
         self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
diff --git a/services/nodemanager/tests/test_computenode_ec2.py b/services/nodemanager/tests/test_computenode_driver_ec2.py
similarity index 98%
rename from services/nodemanager/tests/test_computenode_ec2.py
rename to services/nodemanager/tests/test_computenode_driver_ec2.py
index d1c9e43..fde103e 100644
--- a/services/nodemanager/tests/test_computenode_ec2.py
+++ b/services/nodemanager/tests/test_computenode_driver_ec2.py
@@ -7,7 +7,7 @@ import unittest
 
 import mock
 
-import arvnodeman.computenode.ec2 as ec2
+import arvnodeman.computenode.driver.ec2 as ec2
 from . import testutil
 
 class EC2ComputeNodeDriverTestCase(unittest.TestCase):
diff --git a/services/nodemanager/tests/test_daemon.py b/services/nodemanager/tests/test_daemon.py
index 4bffd09..394fb88 100644
--- a/services/nodemanager/tests/test_daemon.py
+++ b/services/nodemanager/tests/test_daemon.py
@@ -8,8 +8,8 @@ import unittest
 import mock
 import pykka
 
-import arvnodeman.computenode as nmcnode
 import arvnodeman.daemon as nmdaemon
+from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -39,7 +39,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.daemon.update_server_wishlist(want_sizes).get(self.TIMEOUT)
 
     def monitor_list(self):
-        return pykka.ActorRegistry.get_by_class(nmcnode.ComputeNodeMonitorActor)
+        return pykka.ActorRegistry.get_by_class(ComputeNodeMonitorActor)
 
     def alive_monitor_count(self):
         return sum(1 for actor in self.monitor_list() if actor.is_alive())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list