[ARVADOS] created: 4dc84fdc1f380b9d796308972648e6e36299684a

git at public.curoverse.com git at public.curoverse.com
Thu Apr 30 09:25:05 EDT 2015


        at  4dc84fdc1f380b9d796308972648e6e36299684a (commit)


commit 4dc84fdc1f380b9d796308972648e6e36299684a
Author: Brett Smith <brett at curoverse.com>
Date:   Thu Apr 30 09:19:15 2015 -0400

    3793: Add Docker image cleaner service for compute nodes.
    
    This service monitors Docker events.  When no containers are active,
    it arranges to keep the most recently used images up to a configured
    size limit, then deletes the rest.  This will prevent Docker images
    from growing indefinitely on physical compute nodes.

diff --git a/services/dockercleaner/.gitignore b/services/dockercleaner/.gitignore
new file mode 120000
index 0000000..ed3b362
--- /dev/null
+++ b/services/dockercleaner/.gitignore
@@ -0,0 +1 @@
+../../sdk/python/.gitignore
\ No newline at end of file
diff --git a/services/dockercleaner/arvados_docker/__init__.py b/services/dockercleaner/arvados_docker/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/dockercleaner/arvados_docker/cleaner.py b/services/dockercleaner/arvados_docker/cleaner.py
new file mode 100644
index 0000000..a87741d
--- /dev/null
+++ b/services/dockercleaner/arvados_docker/cleaner.py
@@ -0,0 +1,242 @@
+#!/usr/bin/env python3
+"""arvados_docker.cleaner - Remove unused Docker images from compute nodes
+
+Usage:
+  python3 -m arvados_docker.cleaner --keep 50G
+"""
+
+import argparse
+import collections
+import copy
+import functools
+import logging
+import json
+import time
+import sys
+
+import docker
+
+SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
+
+logger = logging.getLogger('arvados_docker.cleaner')
+
+def return_when_docker_not_found(result=None):
+    # If the decorated function raises a 404 error from Docker, return
+    # `result` instead.
+    def docker_not_found_decorator(orig_func):
+        @functools.wraps(orig_func)
+        def docker_not_found_wrapper(*args, **kwargs):
+            try:
+                return orig_func(*args, **kwargs)
+            except docker.errors.APIError as error:
+                if error.response.status_code != 404:
+                    raise
+                return result
+        return docker_not_found_wrapper
+    return docker_not_found_decorator
+
+class DockerImage:
+    def __init__(self, image_hash):
+        self.docker_id = image_hash['Id']
+        self.size = image_hash['VirtualSize']
+        self.last_used = -1
+
+    def used_at(self, use_time):
+        self.last_used = max(self.last_used, use_time)
+
+
+class DockerImages:
+    def __init__(self, target_size):
+        self.target_size = target_size
+        self.images = {}
+        self.container_image_map = {}
+
+    @classmethod
+    def from_daemon(cls, target_size, docker_client):
+        images = cls(target_size)
+        for image in docker_client.images():
+            images.add_image(image)
+        return images
+
+    def add_image(self, image_hash):
+        image = DockerImage(image_hash)
+        self.images[image.docker_id] = image
+        logger.debug("Registered image %s", image.docker_id)
+
+    def del_image(self, image_id):
+        if image_id in self.images:
+            del self.images[image_id]
+            self.container_image_map = {
+                cid: cid_image
+                for cid, cid_image in self.container_image_map.items()
+                if cid_image != image_id}
+            logger.debug("Unregistered image %s", image_id)
+
+    def has_image(self, image_id):
+        return image_id in self.images
+
+    def add_user(self, container_hash, use_time):
+        image_id = container_hash['Image']
+        if image_id in self.images:
+            self.container_image_map[container_hash['Id']] = image_id
+            self.images[image_id].used_at(use_time)
+            logger.debug("Registered container %s using image %s",
+                         container_hash['Id'], image_id)
+
+    def end_user(self, cid):
+        self.container_image_map.pop(cid, None)
+        logger.debug("Unregistered container %s", cid)
+
+    def any_users(self):
+        return bool(self.container_image_map)
+
+    def should_delete(self):
+        used_images = set(self.container_image_map.values())
+        space_left = (self.target_size - sum(self.images[image_id].size
+                                             for image_id in used_images))
+        lru_images = [image for image in self.images.values()
+                      if image.docker_id not in used_images]
+        lru_images.sort(key=lambda image: image.last_used)
+        keep_ids = set()
+        for image in reversed(lru_images):
+            if image.size <= space_left:
+                keep_ids.add(image.docker_id)
+                space_left -= image.size
+        for image in lru_images:
+            if image.docker_id not in keep_ids:
+                yield image.docker_id
+
+
+class DockerEventHandlers:
+    # This class maps Docker event types to the names of methods that should
+    # receive those events.
+    def __init__(self):
+        self.handler_names = collections.defaultdict(list)
+
+    def on(self, *status_names):
+        def register_handler(handler_method):
+            for status in status_names:
+                self.handler_names[status].append(handler_method.__name__)
+            return handler_method
+        return register_handler
+
+    def for_event(self, status):
+        return iter(self.handler_names[status])
+
+    def copy(self):
+        result = self.__class__()
+        result.handler_names = copy.deepcopy(self.handler_names)
+        return result
+
+
+class DockerEventListener:
+    # To use this class, define event_handlers as an instance of
+    # DockerEventHandlers.  Call run() to iterate over events and call the
+    # handler methods as they come in.
+    ENCODING = 'utf-8'
+
+    def __init__(self, events):
+        self.events = events
+
+    def run(self):
+        for event in self.events:
+            event = json.loads(event.decode(self.ENCODING))
+            for method_name in self.event_handlers.for_event(event['status']):
+                getattr(self, method_name)(event)
+
+
+class DockerImageUseRecorder(DockerEventListener):
+    event_handlers = DockerEventHandlers()
+
+    def __init__(self, images, docker_client, events):
+        self.images = images
+        self.docker_client = docker_client
+        super().__init__(events)
+
+    @event_handlers.on('create')
+    @return_when_docker_not_found()
+    def load_container(self, event):
+        container_hash = self.docker_client.inspect_container(event['id'])
+        self.new_container(event, container_hash)
+
+    def new_container(self, event, container_hash):
+        self.images.add_user(container_hash, event['time'])
+
+    @event_handlers.on('destroy')
+    def container_stopped(self, event):
+        self.images.end_user(event['id'])
+
+
+class DockerImageCleaner(DockerImageUseRecorder):
+    event_handlers = DockerImageUseRecorder.event_handlers.copy()
+
+    def new_container(self, event, container_hash):
+        container_image_id = container_hash['Image']
+        if not self.images.has_image(container_image_id):
+            image_hash = self.docker_client.inspect_image(container_image_id)
+            self.images.add_image(image_hash)
+        return super().new_container(event, container_hash)
+
+    @event_handlers.on('destroy')
+    def clean_images(self, event=None):
+        if self.images.any_users():
+            return
+        for image_id in self.images.should_delete():
+            try:
+                self.docker_client.remove_image(image_id)
+            except docker.errors.APIError as error:
+                logger.warning("Failed to remove image %s: %s", image_id, error)
+            else:
+                logger.info("Removed image %s", image_id)
+                self.images.del_image(image_id)
+
+
+def human_size(size_str):
+    size_str = size_str.lower().rstrip('b')
+    multiplier = SUFFIX_SIZES.get(size_str[-1])
+    if multiplier is None:
+        multiplier = 1
+    else:
+        size_str = size_str[:-1]
+    return int(size_str) * multiplier
+
+def parse_arguments(arguments):
+    parser = argparse.ArgumentParser(
+        prog="dockerclean",
+        description="clean old Docker images from Arvados compute nodes")
+    parser.add_argument(
+        '--keep', action='store', type=human_size, required=True,
+        help="size of Docker images to keep, suffixed with K/M/G/T")
+    parser.add_argument(
+        '--verbose', '-v', action='count', default=0,
+        help="log more information")
+    return parser.parse_args(arguments)
+
+def setup_logging(args):
+    log_handler = logging.StreamHandler()
+    log_handler.setFormatter(logging.Formatter(
+            '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+            '%Y-%m-%d %H:%M:%S'))
+    logger.addHandler(log_handler)
+    logger.setLevel(logging.ERROR - (10 * args.verbose))
+
+def run(args, docker_client):
+    start_time = int(time.time())
+    logger.debug("Loading Docker activity through present")
+    images = DockerImages.from_daemon(args.keep, docker_client)
+    use_recorder = DockerImageUseRecorder(
+        images, docker_client, docker_client.events(since=1, until=start_time))
+    use_recorder.run()
+    cleaner = DockerImageCleaner(
+        images, docker_client, docker_client.events(since=start_time))
+    logger.info("Starting cleanup loop")
+    cleaner.clean_images()
+    cleaner.run()
+
+def main(arguments):
+    args = parse_arguments(arguments)
+    setup_logging(args)
+    run(args, docker.Client())
+
+if __name__ == '__main__':
+    main(sys.argv[1:])
diff --git a/services/dockercleaner/gittaggers.py b/services/dockercleaner/gittaggers.py
new file mode 120000
index 0000000..a9ad861
--- /dev/null
+++ b/services/dockercleaner/gittaggers.py
@@ -0,0 +1 @@
+../../sdk/python/gittaggers.py
\ No newline at end of file
diff --git a/services/dockercleaner/setup.py b/services/dockercleaner/setup.py
new file mode 100644
index 0000000..a799ffe
--- /dev/null
+++ b/services/dockercleaner/setup.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+import setuptools.command.egg_info as egg_info_cmd
+
+from setuptools import setup, find_packages
+
+try:
+    import gittaggers
+    tagger = gittaggers.EggInfoFromGit
+except ImportError:
+    tagger = egg_info_cmd.egg_info
+
+setup(name="arvados-docker-cleaner",
+      version="0.1",
+      description="Arvados Docker cleaner",
+      author="Arvados",
+      author_email="info at arvados.org",
+      url="https://arvados.org",
+      download_url="https://github.com/curoverse/arvados.git",
+      license="GNU Affero General Public License version 3.0",
+      packages=find_packages(),
+      install_requires=[
+        'docker-py',
+        ],
+      tests_require=[
+        'mock',
+        ],
+      test_suite='tests',
+      zip_safe=False,
+      cmdclass={'egg_info': tagger},
+      )
diff --git a/services/dockercleaner/tests/__init__.py b/services/dockercleaner/tests/__init__.py
new file mode 100644
index 0000000..ab92cab
--- /dev/null
+++ b/services/dockercleaner/tests/__init__.py
@@ -0,0 +1,4 @@
+#!/usr/bin/env python3
+
+import logging
+logging.getLogger('').setLevel(logging.CRITICAL)
diff --git a/services/dockercleaner/tests/test_cleaner.py b/services/dockercleaner/tests/test_cleaner.py
new file mode 100644
index 0000000..818cc9d
--- /dev/null
+++ b/services/dockercleaner/tests/test_cleaner.py
@@ -0,0 +1,362 @@
+#!/usr/bin/env python3
+
+import collections
+import itertools
+import json
+import random
+import time
+import unittest
+
+import docker
+import mock
+
+from arvados_docker import cleaner
+
+MAX_DOCKER_ID = (16 ** 64) - 1
+
+def MockDockerId():
+    return '{:064x}'.format(random.randint(0, MAX_DOCKER_ID))
+
+def MockContainer(image_hash):
+    return {'Id': MockDockerId(),
+            'Image': image_hash['Id']}
+
+def MockImage(*, size=0, vsize=None, tags=[]):
+    if vsize is None:
+        vsize = random.randint(100, 2000000)
+    return {'Id': MockDockerId(),
+            'ParentId': MockDockerId(),
+            'RepoTags': list(tags),
+            'Size': size,
+            'VirtualSize': vsize}
+
+class MockEvent(dict):
+    ENCODING = 'utf-8'
+    event_seq = itertools.count(1)
+
+    def __init__(self, status, docker_id=None, **event_data):
+        if docker_id is None:
+            docker_id = MockDockerId()
+        super().__init__(self, **event_data)
+        self['status'] = status
+        self['id'] = docker_id
+        self.setdefault('time', next(self.event_seq))
+
+    def encoded(self):
+        return json.dumps(self).encode(self.ENCODING)
+
+
+class MockException(docker.errors.APIError):
+    def __init__(self, status_code):
+        response = mock.Mock(name='response')
+        response.status_code = status_code
+        super().__init__("mock exception", response)
+
+
+class DockerImageTestCase(unittest.TestCase):
+    def test_used_at_sets_last_used(self):
+        image = cleaner.DockerImage(MockImage())
+        image.used_at(5)
+        self.assertEqual(5, image.last_used)
+
+    def test_used_at_moves_forward(self):
+        image = cleaner.DockerImage(MockImage())
+        image.used_at(6)
+        image.used_at(8)
+        self.assertEqual(8, image.last_used)
+
+    def test_used_at_does_not_go_backward(self):
+        image = cleaner.DockerImage(MockImage())
+        image.used_at(4)
+        image.used_at(2)
+        self.assertEqual(4, image.last_used)
+
+
+class DockerImagesTestCase(unittest.TestCase):
+    def setUp(self):
+        self.mock_images = []
+
+    def setup_mock_images(self, *vsizes):
+        self.mock_images.extend(MockImage(vsize=vsize) for vsize in vsizes)
+
+    def setup_images(self, *vsizes, target_size=1000000):
+        self.setup_mock_images(*vsizes)
+        images = cleaner.DockerImages(target_size)
+        for image in self.mock_images:
+            images.add_image(image)
+        return images
+
+    def test_has_image(self):
+        images = self.setup_images(None)
+        self.assertTrue(images.has_image(self.mock_images[0]['Id']))
+        self.assertFalse(images.has_image(MockDockerId()))
+
+    def test_del_image(self):
+        images = self.setup_images(None)
+        images.del_image(self.mock_images[0]['Id'])
+        self.assertFalse(images.has_image(self.mock_images[0]['Id']))
+
+    def test_del_nonexistent_image(self):
+        images = self.setup_images(None)
+        images.del_image(MockDockerId())
+        self.assertTrue(images.has_image(self.mock_images[0]['Id']))
+
+    def test_no_users_at_start(self):
+        images = self.setup_images(None)
+        self.assertFalse(images.any_users())
+
+    def test_users_recorded(self):
+        images = self.setup_images(None)
+        images.add_user(MockContainer(self.mock_images[-1]), 1)
+        self.assertTrue(images.any_users())
+
+    def test_users_unrecorded(self):
+        images = self.setup_images(None)
+        user = MockContainer(self.mock_images[-1])
+        images.add_user(user, 1)
+        images.end_user(user['Id'])
+        self.assertFalse(images.any_users())
+
+    def test_users_can_restart(self):
+        images = self.setup_images(None)
+        user = MockContainer(self.mock_images[-1])
+        images.add_user(user, 1)
+        images.end_user(user['Id'])
+        images.add_user(user, 2)
+        self.assertTrue(images.any_users())
+
+    def test_multiple_users(self):
+        images = self.setup_images(None, None)
+        users = [MockContainer(image) for image in self.mock_images]
+        images.add_user(users[0], 1)
+        images.add_user(users[1], 2)
+        images.end_user(users[0]['Id'])
+        self.assertTrue(images.any_users())
+        images.end_user(users[1]['Id'])
+        self.assertFalse(images.any_users())
+
+    def test_one_image_multiple_users(self):
+        images = self.setup_images(None)
+        users = [MockContainer(self.mock_images[0]) for ii in range(2)]
+        images.add_user(users[0], 1)
+        images.add_user(users[1], 2)
+        images.end_user(users[0]['Id'])
+        self.assertTrue(images.any_users())
+        images.end_user(users[1]['Id'])
+        self.assertFalse(images.any_users())
+
+    def test_nonexistent_user_added(self):
+        images = self.setup_images()
+        images.add_user(MockContainer(MockImage()), 1)
+        self.assertFalse(images.any_users())
+
+    def test_nonexistent_user_removed(self):
+        images = self.setup_images()
+        images.end_user('nonexistent')
+        self.assertFalse(images.any_users())
+
+    def test_nonexistent_user_removed_amongst_real_users(self):
+        images = self.setup_images(None)
+        user = MockContainer(self.mock_images[-1])
+        images.add_user(user, 1)
+        images.add_user(MockContainer(MockImage()), 2)
+        self.assertTrue(images.any_users())
+        images.end_user(user['Id'])
+        self.assertFalse(images.any_users())
+
+    def test_del_image_removes_users(self):
+        images = self.setup_images(None)
+        user = MockContainer(self.mock_images[0])
+        images.add_user(user, 1)
+        images.del_image(self.mock_images[0]['Id'])
+        self.assertFalse(images.any_users())
+
+    def test_images_under_target_not_deletable(self):
+        images = self.setup_images(200, 100, 300, target_size=150)
+        self.assertEqual({self.mock_images[ii]['Id'] for ii in (0, 2)},
+                         set(images.should_delete()))
+
+    def test_all_images_deletable(self):
+        images = self.setup_images(None, None, target_size=1)
+        self.assertEqual({image['Id'] for image in self.mock_images},
+                         set(images.should_delete()))
+
+    def test_images_in_use_not_deletable(self):
+        images = self.setup_images(None, None, target_size=1)
+        users = [MockContainer(image) for image in self.mock_images]
+        images.add_user(users[0], 1)
+        images.add_user(users[1], 2)
+        images.end_user(users[1]['Id'])
+        self.assertEqual([self.mock_images[1]['Id']],
+                         list(images.should_delete()))
+
+    def test_images_suggested_for_deletion_by_lru(self):
+        images = self.setup_images(10, 10, 10, target_size=1)
+        users = [MockContainer(image) for image in self.mock_images]
+        images.add_user(users[0], 3)
+        images.add_user(users[1], 1)
+        images.add_user(users[2], 2)
+        for user in users:
+            images.end_user(user['Id'])
+        self.assertEqual([self.mock_images[ii]['Id'] for ii in (1, 2, 0)],
+                         list(images.should_delete()))
+
+    def setup_from_daemon(self, *vsizes, target_size=1500000):
+        self.setup_mock_images(*vsizes)
+        docker_client = mock.MagicMock(name='docker_client')
+        docker_client.images.return_value = iter(self.mock_images)
+        return cleaner.DockerImages.from_daemon(target_size, docker_client)
+
+    def test_images_loaded_from_daemon(self):
+        images = self.setup_from_daemon(None, None)
+        for image in self.mock_images:
+            self.assertTrue(images.has_image(image['Id']))
+
+    def test_target_size_set_from_daemon(self):
+        images = self.setup_from_daemon(20, 10, target_size=15)
+        self.assertEqual([self.mock_images[0]['Id']],
+                         list(images.should_delete()))
+
+
+class DockerImageUseRecorderTestCase(unittest.TestCase):
+    TEST_CLASS = cleaner.DockerImageUseRecorder
+
+    def setUp(self):
+        self.images = mock.MagicMock(name='images')
+        self.docker_client = mock.MagicMock(name='docker_client')
+        self.events = []
+        self.recorder = self.TEST_CLASS(self.images, self.docker_client,
+                                        self.encoded_events)
+
+    @property
+    def encoded_events(self):
+        return (event.encoded() for event in self.events)
+
+    def test_unknown_events_ignored(self):
+        self.events.append(MockEvent('mock!event'))
+        self.recorder.run()
+        # No exception should be raised.
+
+    def test_fetches_container_on_create(self):
+        self.events.append(MockEvent('create'))
+        self.recorder.run()
+        self.docker_client.inspect_container.assert_called_with(
+            self.events[0]['id'])
+
+    def test_adds_user_on_container_create(self):
+        self.events.append(MockEvent('create'))
+        self.recorder.run()
+        self.images.add_user.assert_called_with(
+            self.docker_client.inspect_container(), self.events[0]['time'])
+
+    def test_unknown_image_handling(self):
+        # The use recorder should not fetch any images.
+        self.events.append(MockEvent('create'))
+        self.recorder.run()
+        self.assertFalse(self.docker_client.inspect_image.called)
+
+    def test_unfetchable_containers_ignored(self):
+        self.events.append(MockEvent('create'))
+        self.docker_client.inspect_container.side_effect = MockException(404)
+        self.recorder.run()
+        self.assertFalse(self.images.add_user.called)
+
+    def test_ends_user_on_container_destroy(self):
+        self.events.append(MockEvent('destroy'))
+        self.recorder.run()
+        self.images.end_user.assert_called_with(self.events[0]['id'])
+
+
+class DockerImageCleanerTestCase(DockerImageUseRecorderTestCase):
+    TEST_CLASS = cleaner.DockerImageCleaner
+
+    def test_unknown_image_handling(self):
+        # The image cleaner should fetch and record new images.
+        self.images.has_image.return_value = False
+        self.events.append(MockEvent('create'))
+        self.recorder.run()
+        self.docker_client.inspect_image.assert_called_with(
+            self.docker_client.inspect_container()['Image'])
+        self.images.add_image.assert_called_with(
+            self.docker_client.inspect_image())
+
+    def test_unfetchable_images_ignored(self):
+        self.images.has_image.return_value = False
+        self.docker_client.inspect_image.side_effect = MockException(404)
+        self.events.append(MockEvent('create'))
+        self.recorder.run()
+        self.docker_client.inspect_image.assert_called_with(
+            self.docker_client.inspect_container()['Image'])
+        self.assertFalse(self.images.add_image.called)
+
+    def test_no_deletions_when_containers_running(self):
+        self.images.any_users.return_value = True
+        self.events.append(MockEvent('destroy'))
+        self.recorder.run()
+        self.assertFalse(self.images.should_delete.called)
+        self.assertFalse(self.docker_client.remove_image.called)
+
+    def test_deletions_after_destroy(self):
+        delete_id = MockDockerId()
+        self.images.any_users.return_value = False
+        self.images.should_delete.return_value = [delete_id]
+        self.events.append(MockEvent('destroy'))
+        self.recorder.run()
+        self.docker_client.remove_image.assert_called_with(delete_id)
+        self.images.del_image.assert_called_with(delete_id)
+
+    def test_failed_deletion_handling(self):
+        delete_id = MockDockerId()
+        self.images.any_users.return_value = False
+        self.images.should_delete.return_value = [delete_id]
+        self.docker_client.remove_image.side_effect = MockException(500)
+        self.events.append(MockEvent('destroy'))
+        self.recorder.run()
+        self.docker_client.remove_image.assert_called_with(delete_id)
+        self.assertFalse(self.images.del_image.called)
+
+
+class HumanSizeTestCase(unittest.TestCase):
+    def check(self, human_str, count, exp):
+        self.assertEqual(count * (1024 ** exp),
+                         cleaner.human_size(human_str))
+
+    def test_bytes(self):
+        self.check('1', 1, 0)
+        self.check('82', 82, 0)
+
+    def test_kibibytes(self):
+        self.check('2K', 2, 1)
+        self.check('3k', 3, 1)
+
+    def test_mebibytes(self):
+        self.check('4M', 4, 2)
+        self.check('5m', 5, 2)
+
+    def test_gibibytes(self):
+        self.check('6G', 6, 3)
+        self.check('7g', 7, 3)
+
+    def test_tebibytes(self):
+        self.check('8T', 8, 4)
+        self.check('9t', 9, 4)
+
+
+class RunTestCase(unittest.TestCase):
+    def setUp(self):
+        self.args = mock.MagicMock(name='args')
+        self.args.keep = 1000000
+        self.docker_client = mock.MagicMock(name='docker_client')
+
+    def test_run(self):
+        test_start_time = int(time.time())
+        self.docker_client.events.return_value = []
+        cleaner.run(self.args, self.docker_client)
+        self.assertEqual(2, self.docker_client.events.call_count)
+        event_kwargs = [args[1] for args in
+                        self.docker_client.events.call_args_list]
+        self.assertIn('since', event_kwargs[0])
+        self.assertIn('until', event_kwargs[0])
+        self.assertLessEqual(test_start_time, event_kwargs[0]['until'])
+        self.assertIn('since', event_kwargs[1])
+        self.assertEqual(event_kwargs[0]['until'], event_kwargs[1]['since'])

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list