[ARVADOS] created: 4dc84fdc1f380b9d796308972648e6e36299684a
git at public.curoverse.com
git at public.curoverse.com
Thu Apr 30 09:25:05 EDT 2015
at 4dc84fdc1f380b9d796308972648e6e36299684a (commit)
commit 4dc84fdc1f380b9d796308972648e6e36299684a
Author: Brett Smith <brett at curoverse.com>
Date: Thu Apr 30 09:19:15 2015 -0400
3793: Add Docker image cleaner service for compute nodes.
This service monitors Docker events. When no containers are active,
it arranges to keep the most recently used images up to a configured
size limit, then deletes the rest. This will prevent Docker images
from growing indefinitely on physical compute nodes.
diff --git a/services/dockercleaner/.gitignore b/services/dockercleaner/.gitignore
new file mode 120000
index 0000000..ed3b362
--- /dev/null
+++ b/services/dockercleaner/.gitignore
@@ -0,0 +1 @@
+../../sdk/python/.gitignore
\ No newline at end of file
diff --git a/services/dockercleaner/arvados_docker/__init__.py b/services/dockercleaner/arvados_docker/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/services/dockercleaner/arvados_docker/cleaner.py b/services/dockercleaner/arvados_docker/cleaner.py
new file mode 100644
index 0000000..a87741d
--- /dev/null
+++ b/services/dockercleaner/arvados_docker/cleaner.py
@@ -0,0 +1,242 @@
+#!/usr/bin/env python3
+"""arvados_docker.cleaner - Remove unused Docker images from compute nodes
+
+Usage:
+ python3 -m arvados_docker.cleaner --keep 50G
+"""
+
+import argparse
+import collections
+import copy
+import functools
+import logging
+import json
+import time
+import sys
+
+import docker
+
+SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
+
+logger = logging.getLogger('arvados_docker.cleaner')
+
+def return_when_docker_not_found(result=None):
+ # If the decorated function raises a 404 error from Docker, return
+ # `result` instead.
+ def docker_not_found_decorator(orig_func):
+ @functools.wraps(orig_func)
+ def docker_not_found_wrapper(*args, **kwargs):
+ try:
+ return orig_func(*args, **kwargs)
+ except docker.errors.APIError as error:
+ if error.response.status_code != 404:
+ raise
+ return result
+ return docker_not_found_wrapper
+ return docker_not_found_decorator
+
+class DockerImage:
+ def __init__(self, image_hash):
+ self.docker_id = image_hash['Id']
+ self.size = image_hash['VirtualSize']
+ self.last_used = -1
+
+ def used_at(self, use_time):
+ self.last_used = max(self.last_used, use_time)
+
+
+class DockerImages:
+ def __init__(self, target_size):
+ self.target_size = target_size
+ self.images = {}
+ self.container_image_map = {}
+
+ @classmethod
+ def from_daemon(cls, target_size, docker_client):
+ images = cls(target_size)
+ for image in docker_client.images():
+ images.add_image(image)
+ return images
+
+ def add_image(self, image_hash):
+ image = DockerImage(image_hash)
+ self.images[image.docker_id] = image
+ logger.debug("Registered image %s", image.docker_id)
+
+ def del_image(self, image_id):
+ if image_id in self.images:
+ del self.images[image_id]
+ self.container_image_map = {
+ cid: cid_image
+ for cid, cid_image in self.container_image_map.items()
+ if cid_image != image_id}
+ logger.debug("Unregistered image %s", image_id)
+
+ def has_image(self, image_id):
+ return image_id in self.images
+
+ def add_user(self, container_hash, use_time):
+ image_id = container_hash['Image']
+ if image_id in self.images:
+ self.container_image_map[container_hash['Id']] = image_id
+ self.images[image_id].used_at(use_time)
+ logger.debug("Registered container %s using image %s",
+ container_hash['Id'], image_id)
+
+ def end_user(self, cid):
+ self.container_image_map.pop(cid, None)
+ logger.debug("Unregistered container %s", cid)
+
+ def any_users(self):
+ return bool(self.container_image_map)
+
+ def should_delete(self):
+ used_images = set(self.container_image_map.values())
+ space_left = (self.target_size - sum(self.images[image_id].size
+ for image_id in used_images))
+ lru_images = [image for image in self.images.values()
+ if image.docker_id not in used_images]
+ lru_images.sort(key=lambda image: image.last_used)
+ keep_ids = set()
+ for image in reversed(lru_images):
+ if image.size <= space_left:
+ keep_ids.add(image.docker_id)
+ space_left -= image.size
+ for image in lru_images:
+ if image.docker_id not in keep_ids:
+ yield image.docker_id
+
+
+class DockerEventHandlers:
+ # This class maps Docker event types to the names of methods that should
+ # receive those events.
+ def __init__(self):
+ self.handler_names = collections.defaultdict(list)
+
+ def on(self, *status_names):
+ def register_handler(handler_method):
+ for status in status_names:
+ self.handler_names[status].append(handler_method.__name__)
+ return handler_method
+ return register_handler
+
+ def for_event(self, status):
+ return iter(self.handler_names[status])
+
+ def copy(self):
+ result = self.__class__()
+ result.handler_names = copy.deepcopy(self.handler_names)
+ return result
+
+
+class DockerEventListener:
+ # To use this class, define event_handlers as an instance of
+ # DockerEventHandlers. Call run() to iterate over events and call the
+ # handler methods as they come in.
+ ENCODING = 'utf-8'
+
+ def __init__(self, events):
+ self.events = events
+
+ def run(self):
+ for event in self.events:
+ event = json.loads(event.decode(self.ENCODING))
+ for method_name in self.event_handlers.for_event(event['status']):
+ getattr(self, method_name)(event)
+
+
+class DockerImageUseRecorder(DockerEventListener):
+ event_handlers = DockerEventHandlers()
+
+ def __init__(self, images, docker_client, events):
+ self.images = images
+ self.docker_client = docker_client
+ super().__init__(events)
+
+ @event_handlers.on('create')
+ @return_when_docker_not_found()
+ def load_container(self, event):
+ container_hash = self.docker_client.inspect_container(event['id'])
+ self.new_container(event, container_hash)
+
+ def new_container(self, event, container_hash):
+ self.images.add_user(container_hash, event['time'])
+
+ @event_handlers.on('destroy')
+ def container_stopped(self, event):
+ self.images.end_user(event['id'])
+
+
+class DockerImageCleaner(DockerImageUseRecorder):
+ event_handlers = DockerImageUseRecorder.event_handlers.copy()
+
+ def new_container(self, event, container_hash):
+ container_image_id = container_hash['Image']
+ if not self.images.has_image(container_image_id):
+ image_hash = self.docker_client.inspect_image(container_image_id)
+ self.images.add_image(image_hash)
+ return super().new_container(event, container_hash)
+
+ @event_handlers.on('destroy')
+ def clean_images(self, event=None):
+ if self.images.any_users():
+ return
+ for image_id in self.images.should_delete():
+ try:
+ self.docker_client.remove_image(image_id)
+ except docker.errors.APIError as error:
+ logger.warning("Failed to remove image %s: %s", image_id, error)
+ else:
+ logger.info("Removed image %s", image_id)
+ self.images.del_image(image_id)
+
+
+def human_size(size_str):
+ size_str = size_str.lower().rstrip('b')
+ multiplier = SUFFIX_SIZES.get(size_str[-1])
+ if multiplier is None:
+ multiplier = 1
+ else:
+ size_str = size_str[:-1]
+ return int(size_str) * multiplier
+
+def parse_arguments(arguments):
+ parser = argparse.ArgumentParser(
+ prog="dockerclean",
+ description="clean old Docker images from Arvados compute nodes")
+ parser.add_argument(
+ '--keep', action='store', type=human_size, required=True,
+ help="size of Docker images to keep, suffixed with K/M/G/T")
+ parser.add_argument(
+ '--verbose', '-v', action='count', default=0,
+ help="log more information")
+ return parser.parse_args(arguments)
+
+def setup_logging(args):
+ log_handler = logging.StreamHandler()
+ log_handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+ logger.addHandler(log_handler)
+ logger.setLevel(logging.ERROR - (10 * args.verbose))
+
+def run(args, docker_client):
+ start_time = int(time.time())
+ logger.debug("Loading Docker activity through present")
+ images = DockerImages.from_daemon(args.keep, docker_client)
+ use_recorder = DockerImageUseRecorder(
+ images, docker_client, docker_client.events(since=1, until=start_time))
+ use_recorder.run()
+ cleaner = DockerImageCleaner(
+ images, docker_client, docker_client.events(since=start_time))
+ logger.info("Starting cleanup loop")
+ cleaner.clean_images()
+ cleaner.run()
+
+def main(arguments):
+ args = parse_arguments(arguments)
+ setup_logging(args)
+ run(args, docker.Client())
+
+if __name__ == '__main__':
+ main(sys.argv[1:])
diff --git a/services/dockercleaner/gittaggers.py b/services/dockercleaner/gittaggers.py
new file mode 120000
index 0000000..a9ad861
--- /dev/null
+++ b/services/dockercleaner/gittaggers.py
@@ -0,0 +1 @@
+../../sdk/python/gittaggers.py
\ No newline at end of file
diff --git a/services/dockercleaner/setup.py b/services/dockercleaner/setup.py
new file mode 100644
index 0000000..a799ffe
--- /dev/null
+++ b/services/dockercleaner/setup.py
@@ -0,0 +1,33 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+import setuptools.command.egg_info as egg_info_cmd
+
+from setuptools import setup, find_packages
+
+try:
+ import gittaggers
+ tagger = gittaggers.EggInfoFromGit
+except ImportError:
+ tagger = egg_info_cmd.egg_info
+
+setup(name="arvados-docker-cleaner",
+ version="0.1",
+ description="Arvados Docker cleaner",
+ author="Arvados",
+ author_email="info at arvados.org",
+ url="https://arvados.org",
+ download_url="https://github.com/curoverse/arvados.git",
+ license="GNU Affero General Public License version 3.0",
+ packages=find_packages(),
+ install_requires=[
+ 'docker-py',
+ ],
+ tests_require=[
+ 'mock',
+ ],
+ test_suite='tests',
+ zip_safe=False,
+ cmdclass={'egg_info': tagger},
+ )
diff --git a/services/dockercleaner/tests/__init__.py b/services/dockercleaner/tests/__init__.py
new file mode 100644
index 0000000..ab92cab
--- /dev/null
+++ b/services/dockercleaner/tests/__init__.py
@@ -0,0 +1,4 @@
+#!/usr/bin/env python3
+
+import logging
+logging.getLogger('').setLevel(logging.CRITICAL)
diff --git a/services/dockercleaner/tests/test_cleaner.py b/services/dockercleaner/tests/test_cleaner.py
new file mode 100644
index 0000000..818cc9d
--- /dev/null
+++ b/services/dockercleaner/tests/test_cleaner.py
@@ -0,0 +1,362 @@
+#!/usr/bin/env python3
+
+import collections
+import itertools
+import json
+import random
+import time
+import unittest
+
+import docker
+import mock
+
+from arvados_docker import cleaner
+
+MAX_DOCKER_ID = (16 ** 64) - 1
+
+def MockDockerId():
+ return '{:064x}'.format(random.randint(0, MAX_DOCKER_ID))
+
+def MockContainer(image_hash):
+ return {'Id': MockDockerId(),
+ 'Image': image_hash['Id']}
+
+def MockImage(*, size=0, vsize=None, tags=[]):
+ if vsize is None:
+ vsize = random.randint(100, 2000000)
+ return {'Id': MockDockerId(),
+ 'ParentId': MockDockerId(),
+ 'RepoTags': list(tags),
+ 'Size': size,
+ 'VirtualSize': vsize}
+
+class MockEvent(dict):
+ ENCODING = 'utf-8'
+ event_seq = itertools.count(1)
+
+ def __init__(self, status, docker_id=None, **event_data):
+ if docker_id is None:
+ docker_id = MockDockerId()
+ super().__init__(self, **event_data)
+ self['status'] = status
+ self['id'] = docker_id
+ self.setdefault('time', next(self.event_seq))
+
+ def encoded(self):
+ return json.dumps(self).encode(self.ENCODING)
+
+
+class MockException(docker.errors.APIError):
+ def __init__(self, status_code):
+ response = mock.Mock(name='response')
+ response.status_code = status_code
+ super().__init__("mock exception", response)
+
+
+class DockerImageTestCase(unittest.TestCase):
+ def test_used_at_sets_last_used(self):
+ image = cleaner.DockerImage(MockImage())
+ image.used_at(5)
+ self.assertEqual(5, image.last_used)
+
+ def test_used_at_moves_forward(self):
+ image = cleaner.DockerImage(MockImage())
+ image.used_at(6)
+ image.used_at(8)
+ self.assertEqual(8, image.last_used)
+
+ def test_used_at_does_not_go_backward(self):
+ image = cleaner.DockerImage(MockImage())
+ image.used_at(4)
+ image.used_at(2)
+ self.assertEqual(4, image.last_used)
+
+
+class DockerImagesTestCase(unittest.TestCase):
+ def setUp(self):
+ self.mock_images = []
+
+ def setup_mock_images(self, *vsizes):
+ self.mock_images.extend(MockImage(vsize=vsize) for vsize in vsizes)
+
+ def setup_images(self, *vsizes, target_size=1000000):
+ self.setup_mock_images(*vsizes)
+ images = cleaner.DockerImages(target_size)
+ for image in self.mock_images:
+ images.add_image(image)
+ return images
+
+ def test_has_image(self):
+ images = self.setup_images(None)
+ self.assertTrue(images.has_image(self.mock_images[0]['Id']))
+ self.assertFalse(images.has_image(MockDockerId()))
+
+ def test_del_image(self):
+ images = self.setup_images(None)
+ images.del_image(self.mock_images[0]['Id'])
+ self.assertFalse(images.has_image(self.mock_images[0]['Id']))
+
+ def test_del_nonexistent_image(self):
+ images = self.setup_images(None)
+ images.del_image(MockDockerId())
+ self.assertTrue(images.has_image(self.mock_images[0]['Id']))
+
+ def test_no_users_at_start(self):
+ images = self.setup_images(None)
+ self.assertFalse(images.any_users())
+
+ def test_users_recorded(self):
+ images = self.setup_images(None)
+ images.add_user(MockContainer(self.mock_images[-1]), 1)
+ self.assertTrue(images.any_users())
+
+ def test_users_unrecorded(self):
+ images = self.setup_images(None)
+ user = MockContainer(self.mock_images[-1])
+ images.add_user(user, 1)
+ images.end_user(user['Id'])
+ self.assertFalse(images.any_users())
+
+ def test_users_can_restart(self):
+ images = self.setup_images(None)
+ user = MockContainer(self.mock_images[-1])
+ images.add_user(user, 1)
+ images.end_user(user['Id'])
+ images.add_user(user, 2)
+ self.assertTrue(images.any_users())
+
+ def test_multiple_users(self):
+ images = self.setup_images(None, None)
+ users = [MockContainer(image) for image in self.mock_images]
+ images.add_user(users[0], 1)
+ images.add_user(users[1], 2)
+ images.end_user(users[0]['Id'])
+ self.assertTrue(images.any_users())
+ images.end_user(users[1]['Id'])
+ self.assertFalse(images.any_users())
+
+ def test_one_image_multiple_users(self):
+ images = self.setup_images(None)
+ users = [MockContainer(self.mock_images[0]) for ii in range(2)]
+ images.add_user(users[0], 1)
+ images.add_user(users[1], 2)
+ images.end_user(users[0]['Id'])
+ self.assertTrue(images.any_users())
+ images.end_user(users[1]['Id'])
+ self.assertFalse(images.any_users())
+
+ def test_nonexistent_user_added(self):
+ images = self.setup_images()
+ images.add_user(MockContainer(MockImage()), 1)
+ self.assertFalse(images.any_users())
+
+ def test_nonexistent_user_removed(self):
+ images = self.setup_images()
+ images.end_user('nonexistent')
+ self.assertFalse(images.any_users())
+
+ def test_nonexistent_user_removed_amongst_real_users(self):
+ images = self.setup_images(None)
+ user = MockContainer(self.mock_images[-1])
+ images.add_user(user, 1)
+ images.add_user(MockContainer(MockImage()), 2)
+ self.assertTrue(images.any_users())
+ images.end_user(user['Id'])
+ self.assertFalse(images.any_users())
+
+ def test_del_image_removes_users(self):
+ images = self.setup_images(None)
+ user = MockContainer(self.mock_images[0])
+ images.add_user(user, 1)
+ images.del_image(self.mock_images[0]['Id'])
+ self.assertFalse(images.any_users())
+
+ def test_images_under_target_not_deletable(self):
+ images = self.setup_images(200, 100, 300, target_size=150)
+ self.assertEqual({self.mock_images[ii]['Id'] for ii in (0, 2)},
+ set(images.should_delete()))
+
+ def test_all_images_deletable(self):
+ images = self.setup_images(None, None, target_size=1)
+ self.assertEqual({image['Id'] for image in self.mock_images},
+ set(images.should_delete()))
+
+ def test_images_in_use_not_deletable(self):
+ images = self.setup_images(None, None, target_size=1)
+ users = [MockContainer(image) for image in self.mock_images]
+ images.add_user(users[0], 1)
+ images.add_user(users[1], 2)
+ images.end_user(users[1]['Id'])
+ self.assertEqual([self.mock_images[1]['Id']],
+ list(images.should_delete()))
+
+ def test_images_suggested_for_deletion_by_lru(self):
+ images = self.setup_images(10, 10, 10, target_size=1)
+ users = [MockContainer(image) for image in self.mock_images]
+ images.add_user(users[0], 3)
+ images.add_user(users[1], 1)
+ images.add_user(users[2], 2)
+ for user in users:
+ images.end_user(user['Id'])
+ self.assertEqual([self.mock_images[ii]['Id'] for ii in (1, 2, 0)],
+ list(images.should_delete()))
+
+ def setup_from_daemon(self, *vsizes, target_size=1500000):
+ self.setup_mock_images(*vsizes)
+ docker_client = mock.MagicMock(name='docker_client')
+ docker_client.images.return_value = iter(self.mock_images)
+ return cleaner.DockerImages.from_daemon(target_size, docker_client)
+
+ def test_images_loaded_from_daemon(self):
+ images = self.setup_from_daemon(None, None)
+ for image in self.mock_images:
+ self.assertTrue(images.has_image(image['Id']))
+
+ def test_target_size_set_from_daemon(self):
+ images = self.setup_from_daemon(20, 10, target_size=15)
+ self.assertEqual([self.mock_images[0]['Id']],
+ list(images.should_delete()))
+
+
+class DockerImageUseRecorderTestCase(unittest.TestCase):
+ TEST_CLASS = cleaner.DockerImageUseRecorder
+
+ def setUp(self):
+ self.images = mock.MagicMock(name='images')
+ self.docker_client = mock.MagicMock(name='docker_client')
+ self.events = []
+ self.recorder = self.TEST_CLASS(self.images, self.docker_client,
+ self.encoded_events)
+
+ @property
+ def encoded_events(self):
+ return (event.encoded() for event in self.events)
+
+ def test_unknown_events_ignored(self):
+ self.events.append(MockEvent('mock!event'))
+ self.recorder.run()
+ # No exception should be raised.
+
+ def test_fetches_container_on_create(self):
+ self.events.append(MockEvent('create'))
+ self.recorder.run()
+ self.docker_client.inspect_container.assert_called_with(
+ self.events[0]['id'])
+
+ def test_adds_user_on_container_create(self):
+ self.events.append(MockEvent('create'))
+ self.recorder.run()
+ self.images.add_user.assert_called_with(
+ self.docker_client.inspect_container(), self.events[0]['time'])
+
+ def test_unknown_image_handling(self):
+ # The use recorder should not fetch any images.
+ self.events.append(MockEvent('create'))
+ self.recorder.run()
+ self.assertFalse(self.docker_client.inspect_image.called)
+
+ def test_unfetchable_containers_ignored(self):
+ self.events.append(MockEvent('create'))
+ self.docker_client.inspect_container.side_effect = MockException(404)
+ self.recorder.run()
+ self.assertFalse(self.images.add_user.called)
+
+ def test_ends_user_on_container_destroy(self):
+ self.events.append(MockEvent('destroy'))
+ self.recorder.run()
+ self.images.end_user.assert_called_with(self.events[0]['id'])
+
+
+class DockerImageCleanerTestCase(DockerImageUseRecorderTestCase):
+ TEST_CLASS = cleaner.DockerImageCleaner
+
+ def test_unknown_image_handling(self):
+ # The image cleaner should fetch and record new images.
+ self.images.has_image.return_value = False
+ self.events.append(MockEvent('create'))
+ self.recorder.run()
+ self.docker_client.inspect_image.assert_called_with(
+ self.docker_client.inspect_container()['Image'])
+ self.images.add_image.assert_called_with(
+ self.docker_client.inspect_image())
+
+ def test_unfetchable_images_ignored(self):
+ self.images.has_image.return_value = False
+ self.docker_client.inspect_image.side_effect = MockException(404)
+ self.events.append(MockEvent('create'))
+ self.recorder.run()
+ self.docker_client.inspect_image.assert_called_with(
+ self.docker_client.inspect_container()['Image'])
+ self.assertFalse(self.images.add_image.called)
+
+ def test_no_deletions_when_containers_running(self):
+ self.images.any_users.return_value = True
+ self.events.append(MockEvent('destroy'))
+ self.recorder.run()
+ self.assertFalse(self.images.should_delete.called)
+ self.assertFalse(self.docker_client.remove_image.called)
+
+ def test_deletions_after_destroy(self):
+ delete_id = MockDockerId()
+ self.images.any_users.return_value = False
+ self.images.should_delete.return_value = [delete_id]
+ self.events.append(MockEvent('destroy'))
+ self.recorder.run()
+ self.docker_client.remove_image.assert_called_with(delete_id)
+ self.images.del_image.assert_called_with(delete_id)
+
+ def test_failed_deletion_handling(self):
+ delete_id = MockDockerId()
+ self.images.any_users.return_value = False
+ self.images.should_delete.return_value = [delete_id]
+ self.docker_client.remove_image.side_effect = MockException(500)
+ self.events.append(MockEvent('destroy'))
+ self.recorder.run()
+ self.docker_client.remove_image.assert_called_with(delete_id)
+ self.assertFalse(self.images.del_image.called)
+
+
+class HumanSizeTestCase(unittest.TestCase):
+ def check(self, human_str, count, exp):
+ self.assertEqual(count * (1024 ** exp),
+ cleaner.human_size(human_str))
+
+ def test_bytes(self):
+ self.check('1', 1, 0)
+ self.check('82', 82, 0)
+
+ def test_kibibytes(self):
+ self.check('2K', 2, 1)
+ self.check('3k', 3, 1)
+
+ def test_mebibytes(self):
+ self.check('4M', 4, 2)
+ self.check('5m', 5, 2)
+
+ def test_gibibytes(self):
+ self.check('6G', 6, 3)
+ self.check('7g', 7, 3)
+
+ def test_tebibytes(self):
+ self.check('8T', 8, 4)
+ self.check('9t', 9, 4)
+
+
+class RunTestCase(unittest.TestCase):
+ def setUp(self):
+ self.args = mock.MagicMock(name='args')
+ self.args.keep = 1000000
+ self.docker_client = mock.MagicMock(name='docker_client')
+
+ def test_run(self):
+ test_start_time = int(time.time())
+ self.docker_client.events.return_value = []
+ cleaner.run(self.args, self.docker_client)
+ self.assertEqual(2, self.docker_client.events.call_count)
+ event_kwargs = [args[1] for args in
+ self.docker_client.events.call_args_list]
+ self.assertIn('since', event_kwargs[0])
+ self.assertIn('until', event_kwargs[0])
+ self.assertLessEqual(test_start_time, event_kwargs[0]['until'])
+ self.assertIn('since', event_kwargs[1])
+ self.assertEqual(event_kwargs[0]['until'], event_kwargs[1]['since'])
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list