[ARVADOS] updated: da2492bfc43032c3374b6509a7208127ec48093a
git at public.curoverse.com
git at public.curoverse.com
Wed Feb 11 14:31:00 EST 2015
Summary of changes:
services/arv-web/arv-web.py | 428 +++++++++++++++++----------------
services/fuse/arvados_fuse/__init__.py | 5 +
2 files changed, 224 insertions(+), 209 deletions(-)
via da2492bfc43032c3374b6509a7208127ec48093a (commit)
from 8233babd1d979b545e0b8f15455787af66307d9a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit da2492bfc43032c3374b6509a7208127ec48093a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Feb 11 14:33:12 2015 -0500
4904: Refactor arv-web main() into ArvWeb class. Add CollectionDirectory.change_collection to FUSE.
diff --git a/services/arv-web/arv-web.py b/services/arv-web/arv-web.py
index d3f7b84..1fd61fd 100755
--- a/services/arv-web/arv-web.py
+++ b/services/arv-web/arv-web.py
@@ -18,231 +18,241 @@ import signal
import sys
import functools
-# Run an arvados_fuse mount under the control of the local process. This lets
-# us switch out the contents of the directory without having to unmount and
-# remount.
-def run_fuse_mount(api, collection):
- mountdir = tempfile.mkdtemp()
-
- operations = Operations(os.getuid(), os.getgid(), "utf-8")
- cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
- operations.inodes.add_entry(cdir)
-
- # Initialize the fuse connection
- llfuse.init(operations, mountdir, ['allow_other'])
-
- t = threading.Thread(None, llfuse.main)
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
-
- return (mountdir, cdir)
-
-# Handle messages from Arvados event bus.
-def on_message(project, evqueue, ev):
- if 'event_type' in ev:
- old_attr = None
- if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
- old_attr = ev['properties']['old_attributes']
- if project not in (ev['properties']['new_attributes']['owner_uuid'],
- old_attr['owner_uuid'] if old_attr else None):
- return
-
- et = ev['event_type']
- if ev['event_type'] == 'update':
- if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
- if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
- et = 'add'
- else:
+logger = logging.getLogger('arvados.arv-web')
+logger.setLevel(logging.INFO)
+
+class ArvWeb(object):
+ def __init__(self, project, docker_image, port):
+ self.project = project
+ self.loop = True
+ self.cid = None
+ self.docker_proc = None
+ self.prev_docker_image = None
+ self.mountdir = None
+ self.collection = None
+ self.override_docker_image = docker_image
+ self.port = port
+ self.evqueue = Queue.Queue()
+ self.api = SafeApi(arvados.config)
+
+ if arvados.util.group_uuid_patternmatch(project) is None:
+ raise arvados.errors.ArgumentError("Project uuid is not valid")
+
+ collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items']
+ self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+
+ self.ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
+
+ # Handle messages from Arvados event bus.
+ def on_message(self, ev):
+ if 'event_type' in ev:
+ old_attr = None
+ if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
+ old_attr = ev['properties']['old_attributes']
+ if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
+ old_attr['owner_uuid'] if old_attr else None):
+ return
+
+ et = ev['event_type']
+ if ev['event_type'] == 'update':
+ if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
+ if self.project == ev['properties']['new_attributes']['owner_uuid']:
+ et = 'add'
+ else:
+ et = 'remove'
+ if ev['properties']['new_attributes']['expires_at'] is not None:
et = 'remove'
- if ev['properties']['new_attributes']['expires_at'] is not None:
- et = 'remove'
- evqueue.put((project, et, ev['object_uuid']))
+ self.evqueue.put((self.project, et, ev['object_uuid']))
-def main(argv):
- logger = logging.getLogger('arvados.arv-web')
- logger.setLevel(logging.INFO)
+ # Run an arvados_fuse mount under the control of the local process. This lets
+ # us switch out the contents of the directory without having to unmount and
+ # remount.
+ def run_fuse_mount(self):
+ self.mountdir = tempfile.mkdtemp()
- parser = argparse.ArgumentParser()
- parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
- parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
- parser.add_argument('--image', type=str, help="Docker image to run")
+ self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
+ self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, api, 2, self.collection)
+ self.operations.inodes.add_entry(cdir)
- args = parser.parse_args(argv)
+ # Initialize the fuse connection
+ llfuse.init(operations, mountdir, ['allow_other'])
- api = SafeApi(arvados.config)
- project = args.project_uuid
- docker_image = args.image
- port = args.port
- evqueue = Queue.Queue()
+ t = threading.Thread(None, llfuse.main)
+ t.start()
- collections = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items']
- newcollection = collections[0]['uuid'] if len(collections) > 0 else None
- collection = None
+ # wait until the driver is finished initializing
+ self.operations.initlock.wait()
- ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
-
- signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
-
- loop = True
- cid = None
- docker_proc = None
- prev_docker_image = None
- mountdir = None
-
- try:
- while loop:
- loop = False
- if newcollection != collection:
- collection = newcollection
- if not mountdir:
- (mountdir, cdir) = run_fuse_mount(api, collection)
+ def mount_collection(self):
+ if self.newcollection != self.collection:
+ self.collection = self.newcollection
+ if not self.mountdir and self.collection:
+ self.run_fuse_mount()
+ if self.mountdir:
with llfuse.lock:
- cdir.clear()
- if collection:
+ self.cdir.clear()
+ if self.collection:
# Switch the FUSE directory object so that it stores
# the newly selected collection
- logger.info("Mounting %s", collection)
- cdir.collection_locator = collection
- cdir.collection_object = None
- cdir.update()
+ logger.info("Mounting %s", self.collection)
+ cdir.change_collection(self.collection)
+ def stop_docker(self):
+ if self.cid:
+ logger.info("Stopping Docker container")
+ subprocess.check_call(["docker", "stop", cid])
+ self.cid = None
+ self.docker_proc = None
+
+ def run_docker(self):
+ try:
+ if self.collection is None:
+ self.stop_docker()
+ return
+
+ docker_image = None
+ if self.override_docker_image:
+ docker_image = self.override_docker_image
+ else:
+ try:
+ with llfuse.lock:
+ if "docker_image" in self.cdir:
+ docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
+ except IOError as e:
+ pass
+
+ has_reload = False
try:
+ with llfuse.lock:
+ has_reload = "reload" in self.cdir
+ except IOError as e:
+ pass
+
+ if docker_image is None:
+ logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
+ self.stop_docker()
+ return
+
+ if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
+ logger.info("Running container reload command")
+ subprocess.check_call(["docker", "exec", cid, "/mnt/reload"])
+ return
+
+ self.stop_docker()
+
+ logger.info("Starting Docker container %s", docker_image)
+ ciddir = tempfile.mkdtemp()
+ cidfilepath = os.path.join(ciddir, "cidfile")
+ self.docker_proc = subprocess.Popen(["docker", "run",
+ "--cidfile=%s" % (cidfilepath),
+ "--publish=%i:80" % (self.port),
+ "--volume=%s:/mnt:ro" % self.mountdir,
+ docker_image])
+ self.cid = None
+ while self.cid is None and self.docker_proc.poll() is None:
try:
- if collection:
- if not args.image:
- docker_image = None
-
- # FUSE is asynchronous, so there is a race between
- # the directory being updated above and the kernel
- # cache being refreshed. This manifests as the
- # bizare behavior where os.path.exists() returns
- # True, but open() raises "file not found". The
- # workaround is to keep trying until the kernel
- # catches up.
- while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
- try:
- with open(os.path.join(mountdir, "docker_image")) as di:
- docker_image = di.read().strip()
- except IOError as e:
- pass
-
- if not docker_image:
- logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
-
- if docker_image and ((docker_image != prev_docker_image) or cid is None):
- if cid:
- logger.info("Stopping Docker container")
- subprocess.check_call(["docker", "stop", cid])
- cid = None
- docker_proc = None
-
- if docker_image:
- logger.info("Starting Docker container %s", docker_image)
- ciddir = tempfile.mkdtemp()
- cidfilepath = os.path.join(ciddir, "cidfile")
- docker_proc = subprocess.Popen(["docker", "run",
- "--cidfile=%s" % (cidfilepath),
- "--publish=%i:80" % (port),
- "--volume=%s:/mnt:ro" % mountdir,
- docker_image])
- cid = None
- while not cid and docker_proc.poll() is None:
- try:
- with open(cidfilepath) as cidfile:
- cid = cidfile.read().strip()
- except IOError:
- pass
- try:
- os.unlink(cidfilepath)
- os.rmdir(ciddir)
- except OSError:
- pass
-
- prev_docker_image = docker_image
- logger.info("Container id %s", cid)
- elif cid:
- logger.info("Sending refresh signal to container")
- # Send SIGHUP to all the processes inside the
- # container. By convention, services are expected
- # to reload their configuration. If they die
- # instead, that's okay, because then we'll just
- # start a new container.
- #
- # Getting the services inside the container to
- # refresh turned out to be really hard. Here are
- # some of the other things I tried:
- #
- # docker kill --signal=HUP # no effect
- # docker_proc.send_signal(signal.SIGHUP) # no effect
- # os.killpg(os.getpgid(docker_proc.pid), signal.SIGHUP) # docker-proxy dies as collatoral damage
- # docker exec apache2ctl restart # only works if service is using apache.
- # Sending HUP directly to the processes inside the container: permission denied
-
- subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
- elif cid:
- logger.info("Stopping docker container")
- subprocess.check_call(["docker", "stop", cid])
- except subprocess.CalledProcessError:
- cid = None
-
- if not cid:
- logger.warning("No service running! Will wait for a new collection to appear in the project.")
- else:
- logger.info("Waiting for events")
-
- running = True
- loop = True
- while running:
- # Main run loop. Wait on project events, signals, or the
- # Docker container stopping.
-
- try:
- # Poll the queue with a 1 second timeout, if we have no
- # timeout the Python runtime doesn't have a chance to
- # process SIGINT or SIGTERM.
- eq = evqueue.get(True, 1)
- logger.info("%s %s", eq[1], eq[2])
- newcollection = collection
- if eq[1] in ('add', 'update', 'create'):
- newcollection = eq[2]
- elif eq[1] == 'remove':
- collections = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items']
- newcollection = collections[0]['uuid'] if len(collections) > 0 else None
- running = False
- except Queue.Empty:
- pass
-
- if docker_proc and docker_proc.poll() is not None:
- logger.warning("Service has terminated. Will try to restart.")
- cid = None
- docker_proc = None
- running = False
-
- except (KeyboardInterrupt):
- logger.info("Got keyboard interrupt")
- ws.close()
- loop = False
- except Exception as e:
- logger.exception(e)
- ws.close()
- loop = False
- finally:
- if cid:
- logger.info("Stopping docker container")
- subprocess.check_call(["docker", "stop", cid])
+ with open(cidfilepath) as cidfile:
+ self.cid = cidfile.read().strip()
+ except IOError as e:
+ # XXX check for ENOENT
+ pass
+
+ try:
+ if os.path.exists(cidfilepath):
+ os.unlink(cidfilepath)
+ os.rmdir(ciddir)
+ except OSError:
+ pass
+
+ self.prev_docker_image = docker_image
+ logger.info("Container id %s", self.cid)
+
+ except subprocess.CalledProcessError:
+ self.cid = None
+
+ def wait_for_events(self):
+ if not self.cid:
+ logger.warning("No service running! Will wait for a new collection to appear in the project.")
+ else:
+ logger.info("Waiting for events")
+
+ running = True
+ self.loop = True
+ while running:
+ # Main run loop. Wait on project events, signals, or the
+ # Docker container stopping.
+
+ try:
+ # Poll the queue with a 1 second timeout, if we have no
+ # timeout the Python runtime doesn't have a chance to
+ # process SIGINT or SIGTERM.
+ eq = self.evqueue.get(True, 1)
+ logger.info("%s %s", eq[1], eq[2])
+ self.newcollection = self.collection
+ if eq[1] in ('add', 'update', 'create'):
+ self.newcollection = eq[2]
+ elif eq[1] == 'remove':
+ collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items']
+ self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+ running = False
+ except Queue.Empty:
+ pass
+
+ if self.docker_proc and self.docker_proc.poll() is not None:
+ logger.warning("Service has terminated. Will try to restart.")
+ self.cid = None
+ self.docker_proc = None
+ running = False
+
+
+ def run(self):
+ try:
+ while self.loop:
+ self.loop = False
+ self.mount_collection()
+ try:
+ self.run_docker()
+ self.wait_for_events()
+ except (KeyboardInterrupt):
+ logger.info("Got keyboard interrupt")
+ self.ws.close()
+ self.loop = False
+ except Exception as e:
+ logger.exception("Caught fatal exception, shutting down")
+ self.ws.close()
+ self.loop = False
+ finally:
+ if self.cid:
+ logger.info("Stopping docker container")
+ subprocess.call(["docker", "stop", self.cid])
+
+ if self.mountdir:
+ logger.info("Unmounting")
+ subprocess.call(["fusermount", "-u", self.mountdir])
+ os.rmdir(self.mountdir)
+
+
+def main(argv):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
+ parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
+ parser.add_argument('--image', type=str, help="Docker image to run")
- if mountdir:
- logger.info("Unmounting")
- subprocess.call(["fusermount", "-u", "-z", mountdir])
- os.rmdir(mountdir)
+ args = parser.parse_args(argv)
+
+ signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+
+ try:
+ arvweb = ArvWeb(args.project_uuid, args.image, args.ports)
+ arvweb.run()
+ except arvados.errors.ArgumentError as e:
+ logger.error(e)
if __name__ == '__main__':
main(sys.argv[1:])
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 26098a6..870b9a0 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -305,6 +305,11 @@ class CollectionDirectory(Directory):
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
+ def change_collection(self, new_locator):
+ self.collection_locator = new_locator
+ self.collection_object = None
+ self.update()
+
def new_collection(self, new_collection_object, coll_reader):
self.collection_object = new_collection_object
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list