[ARVADOS] updated: da2492bfc43032c3374b6509a7208127ec48093a

git at public.curoverse.com git at public.curoverse.com
Wed Feb 11 14:31:00 EST 2015


Summary of changes:
 services/arv-web/arv-web.py            | 428 +++++++++++++++++----------------
 services/fuse/arvados_fuse/__init__.py |   5 +
 2 files changed, 224 insertions(+), 209 deletions(-)

       via  da2492bfc43032c3374b6509a7208127ec48093a (commit)
      from  8233babd1d979b545e0b8f15455787af66307d9a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit da2492bfc43032c3374b6509a7208127ec48093a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Feb 11 14:33:12 2015 -0500

    4904: Refactor arv-web main() into ArvWeb class.  Add CollectionDirectory.change_collection to FUSE.

diff --git a/services/arv-web/arv-web.py b/services/arv-web/arv-web.py
index d3f7b84..1fd61fd 100755
--- a/services/arv-web/arv-web.py
+++ b/services/arv-web/arv-web.py
@@ -18,231 +18,241 @@ import signal
 import sys
 import functools
 
-# Run an arvados_fuse mount under the control of the local process.  This lets
-# us switch out the contents of the directory without having to unmount and
-# remount.
-def run_fuse_mount(api, collection):
-    mountdir = tempfile.mkdtemp()
-
-    operations = Operations(os.getuid(), os.getgid(), "utf-8")
-    cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
-    operations.inodes.add_entry(cdir)
-
-    # Initialize the fuse connection
-    llfuse.init(operations, mountdir, ['allow_other'])
-
-    t = threading.Thread(None, llfuse.main)
-    t.start()
-
-    # wait until the driver is finished initializing
-    operations.initlock.wait()
-
-    return (mountdir, cdir)
-
-# Handle messages from Arvados event bus.
-def on_message(project, evqueue, ev):
-    if 'event_type' in ev:
-        old_attr = None
-        if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
-            old_attr = ev['properties']['old_attributes']
-        if project not in (ev['properties']['new_attributes']['owner_uuid'],
-                                old_attr['owner_uuid'] if old_attr else None):
-            return
-
-        et = ev['event_type']
-        if ev['event_type'] == 'update':
-            if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
-                if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
-                    et = 'add'
-                else:
+logger = logging.getLogger('arvados.arv-web')
+logger.setLevel(logging.INFO)
+
+class ArvWeb(object):
+    def __init__(self, project, docker_image, port):
+        self.project = project
+        self.loop = True
+        self.cid = None
+        self.docker_proc = None
+        self.prev_docker_image = None
+        self.mountdir = None
+        self.collection = None
+        self.override_docker_image = docker_image
+        self.port = port
+        self.evqueue = Queue.Queue()
+        self.api = SafeApi(arvados.config)
+
+        if arvados.util.group_uuid_patternmatch(project) is None:
+            raise arvados.errors.ArgumentError("Project uuid is not valid")
+
+        collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+                        limit=1,
+                        order='modified_at desc').execute()['items']
+        self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+
+        self.ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
+
+    # Handle messages from Arvados event bus.
+    def on_message(self, ev):
+        if 'event_type' in ev:
+            old_attr = None
+            if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
+                old_attr = ev['properties']['old_attributes']
+            if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
+                                    old_attr['owner_uuid'] if old_attr else None):
+                return
+
+            et = ev['event_type']
+            if ev['event_type'] == 'update':
+                if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
+                    if self.project == ev['properties']['new_attributes']['owner_uuid']:
+                        et = 'add'
+                    else:
+                        et = 'remove'
+                if ev['properties']['new_attributes']['expires_at'] is not None:
                     et = 'remove'
-            if ev['properties']['new_attributes']['expires_at'] is not None:
-                et = 'remove'
 
-        evqueue.put((project, et, ev['object_uuid']))
+            self.evqueue.put((self.project, et, ev['object_uuid']))
 
-def main(argv):
-    logger = logging.getLogger('arvados.arv-web')
-    logger.setLevel(logging.INFO)
+    # Run an arvados_fuse mount under the control of the local process.  This lets
+    # us switch out the contents of the directory without having to unmount and
+    # remount.
+    def run_fuse_mount(self):
+        self.mountdir = tempfile.mkdtemp()
 
-    parser = argparse.ArgumentParser()
-    parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
-    parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
-    parser.add_argument('--image', type=str, help="Docker image to run")
+        self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
+        self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, api, 2, self.collection)
+        self.operations.inodes.add_entry(cdir)
 
-    args = parser.parse_args(argv)
+        # Initialize the fuse connection
+        llfuse.init(operations, mountdir, ['allow_other'])
 
-    api = SafeApi(arvados.config)
-    project = args.project_uuid
-    docker_image = args.image
-    port = args.port
-    evqueue = Queue.Queue()
+        t = threading.Thread(None, llfuse.main)
+        t.start()
 
-    collections = api.collections().list(filters=[["owner_uuid", "=", project]],
-                        limit=1,
-                        order='modified_at desc').execute()['items']
-    newcollection = collections[0]['uuid'] if len(collections) > 0 else None
-    collection = None
+        # wait until the driver is finished initializing
+        self.operations.initlock.wait()
 
-    ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
-
-    signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
-
-    loop = True
-    cid = None
-    docker_proc = None
-    prev_docker_image = None
-    mountdir = None
-
-    try:
-        while loop:
-            loop = False
-            if newcollection != collection:
-                collection = newcollection
-                if not mountdir:
-                    (mountdir, cdir) = run_fuse_mount(api, collection)
+    def mount_collection(self):
+        if self.newcollection != self.collection:
+            self.collection = self.newcollection
+            if not self.mountdir and self.collection:
+                self.run_fuse_mount()
 
+            if self.mountdir:
                 with llfuse.lock:
-                    cdir.clear()
-                    if collection:
+                    self.cdir.clear()
+                    if self.collection:
                         # Switch the FUSE directory object so that it stores
                         # the newly selected collection
-                        logger.info("Mounting %s", collection)
-                        cdir.collection_locator = collection
-                        cdir.collection_object = None
-                        cdir.update()
+                        logger.info("Mounting %s", self.collection)
+                        cdir.change_collection(self.collection)
 
+    def stop_docker(self):
+        if self.cid:
+            logger.info("Stopping Docker container")
+            subprocess.check_call(["docker", "stop", cid])
+            self.cid = None
+            self.docker_proc = None
+
+    def run_docker(self):
+        try:
+            if self.collection is None:
+                self.stop_docker()
+                return
+
+            docker_image = None
+            if self.override_docker_image:
+                docker_image = self.override_docker_image
+            else:
+                try:
+                    with llfuse.lock:
+                        if "docker_image" in self.cdir:
+                            docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
+                except IOError as e:
+                    pass
+
+            has_reload = False
             try:
+                with llfuse.lock:
+                    has_reload = "reload" in self.cdir
+            except IOError as e:
+                pass
+
+            if docker_image is None:
+                logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
+                self.stop_docker()
+                return
+
+            if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
+                logger.info("Running container reload command")
+                subprocess.check_call(["docker", "exec", cid, "/mnt/reload"])
+                return
+
+            self.stop_docker()
+
+            logger.info("Starting Docker container %s", docker_image)
+            ciddir = tempfile.mkdtemp()
+            cidfilepath = os.path.join(ciddir, "cidfile")
+            self.docker_proc = subprocess.Popen(["docker", "run",
+                                            "--cidfile=%s" % (cidfilepath),
+                                            "--publish=%i:80" % (self.port),
+                                            "--volume=%s:/mnt:ro" % self.mountdir,
+                                            docker_image])
+            self.cid = None
+            while self.cid is None and self.docker_proc.poll() is None:
                 try:
-                    if collection:
-                        if not args.image:
-                            docker_image = None
-
-                            # FUSE is asynchronous, so there is a race between
-                            # the directory being updated above and the kernel
-                            # cache being refreshed.  This manifests as the
-                            # bizare behavior where os.path.exists() returns
-                            # True, but open() raises "file not found".  The
-                            # workaround is to keep trying until the kernel
-                            # catches up.
-                            while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
-                                try:
-                                    with open(os.path.join(mountdir, "docker_image")) as di:
-                                        docker_image = di.read().strip()
-                                except IOError as e:
-                                    pass
-
-                        if not docker_image:
-                            logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
-
-                        if docker_image and ((docker_image != prev_docker_image) or cid is None):
-                            if cid:
-                                logger.info("Stopping Docker container")
-                                subprocess.check_call(["docker", "stop", cid])
-                                cid = None
-                                docker_proc = None
-
-                            if docker_image:
-                                logger.info("Starting Docker container %s", docker_image)
-                                ciddir = tempfile.mkdtemp()
-                                cidfilepath = os.path.join(ciddir, "cidfile")
-                                docker_proc = subprocess.Popen(["docker", "run",
-                                                                "--cidfile=%s" % (cidfilepath),
-                                                                "--publish=%i:80" % (port),
-                                                                "--volume=%s:/mnt:ro" % mountdir,
-                                                                docker_image])
-                                cid = None
-                                while not cid and docker_proc.poll() is None:
-                                    try:
-                                        with open(cidfilepath) as cidfile:
-                                            cid = cidfile.read().strip()
-                                    except IOError:
-                                        pass
-                                try:
-                                    os.unlink(cidfilepath)
-                                    os.rmdir(ciddir)
-                                except OSError:
-                                    pass
-
-                                prev_docker_image = docker_image
-                                logger.info("Container id %s", cid)
-                        elif cid:
-                            logger.info("Sending refresh signal to container")
-                            # Send SIGHUP to all the processes inside the
-                            # container.  By convention, services are expected
-                            # to reload their configuration.  If they die
-                            # instead, that's okay, because then we'll just
-                            # start a new container.
-                            #
-                            # Getting the services inside the container to
-                            # refresh turned out to be really hard.  Here are
-                            # some of the other things I tried:
-                            #
-                            # docker kill --signal=HUP               # no effect
-                            # docker_proc.send_signal(signal.SIGHUP) # no effect
-                            # os.killpg(os.getpgid(docker_proc.pid), signal.SIGHUP) # docker-proxy dies as collatoral damage
-                            # docker exec apache2ctl restart         # only works if service is using apache.
-                            # Sending HUP directly to the processes inside the container: permission denied
-
-                            subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
-                    elif cid:
-                        logger.info("Stopping docker container")
-                        subprocess.check_call(["docker", "stop", cid])
-                except subprocess.CalledProcessError:
-                    cid = None
-
-                if not cid:
-                    logger.warning("No service running!  Will wait for a new collection to appear in the project.")
-                else:
-                    logger.info("Waiting for events")
-
-                running = True
-                loop = True
-                while running:
-                    # Main run loop.  Wait on project events, signals, or the
-                    # Docker container stopping.
-
-                    try:
-                        # Poll the queue with a 1 second timeout, if we have no
-                        # timeout the Python runtime doesn't have a chance to
-                        # process SIGINT or SIGTERM.
-                        eq = evqueue.get(True, 1)
-                        logger.info("%s %s", eq[1], eq[2])
-                        newcollection = collection
-                        if eq[1] in ('add', 'update', 'create'):
-                            newcollection = eq[2]
-                        elif eq[1] == 'remove':
-                            collections = api.collections().list(filters=[["owner_uuid", "=", project]],
-                                                                limit=1,
-                                                                order='modified_at desc').execute()['items']
-                            newcollection = collections[0]['uuid'] if len(collections) > 0 else None
-                        running = False
-                    except Queue.Empty:
-                        pass
-
-                    if docker_proc and docker_proc.poll() is not None:
-                        logger.warning("Service has terminated.  Will try to restart.")
-                        cid = None
-                        docker_proc = None
-                        running = False
-
-            except (KeyboardInterrupt):
-                logger.info("Got keyboard interrupt")
-                ws.close()
-                loop = False
-            except Exception as e:
-                logger.exception(e)
-                ws.close()
-                loop = False
-    finally:
-        if cid:
-            logger.info("Stopping docker container")
-            subprocess.check_call(["docker", "stop", cid])
+                    with open(cidfilepath) as cidfile:
+                        self.cid = cidfile.read().strip()
+                except IOError as e:
+                    # XXX check for ENOENT
+                    pass
+
+            try:
+                if os.path.exists(cidfilepath):
+                    os.unlink(cidfilepath)
+                os.rmdir(ciddir)
+            except OSError:
+                pass
+
+            self.prev_docker_image = docker_image
+            logger.info("Container id %s", self.cid)
+
+        except subprocess.CalledProcessError:
+            self.cid = None
+
+    def wait_for_events(self):
+        if not self.cid:
+            logger.warning("No service running!  Will wait for a new collection to appear in the project.")
+        else:
+            logger.info("Waiting for events")
+
+        running = True
+        self.loop = True
+        while running:
+            # Main run loop.  Wait on project events, signals, or the
+            # Docker container stopping.
+
+            try:
+                # Poll the queue with a 1 second timeout, if we have no
+                # timeout the Python runtime doesn't have a chance to
+                # process SIGINT or SIGTERM.
+                eq = self.evqueue.get(True, 1)
+                logger.info("%s %s", eq[1], eq[2])
+                self.newcollection = self.collection
+                if eq[1] in ('add', 'update', 'create'):
+                    self.newcollection = eq[2]
+                elif eq[1] == 'remove':
+                    collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+                                                        limit=1,
+                                                        order='modified_at desc').execute()['items']
+                    self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+                running = False
+            except Queue.Empty:
+                pass
+
+            if self.docker_proc and self.docker_proc.poll() is not None:
+                logger.warning("Service has terminated.  Will try to restart.")
+                self.cid = None
+                self.docker_proc = None
+                running = False
+
+
+    def run(self):
+        try:
+            while self.loop:
+                self.loop = False
+                self.mount_collection()
+                try:
+                    self.run_docker()
+                    self.wait_for_events()
+                except (KeyboardInterrupt):
+                    logger.info("Got keyboard interrupt")
+                    self.ws.close()
+                    self.loop = False
+                except Exception as e:
+                    logger.exception("Caught fatal exception, shutting down")
+                    self.ws.close()
+                    self.loop = False
+        finally:
+            if self.cid:
+                logger.info("Stopping docker container")
+                subprocess.call(["docker", "stop", self.cid])
+
+            if self.mountdir:
+                logger.info("Unmounting")
+                subprocess.call(["fusermount", "-u", self.mountdir])
+                os.rmdir(self.mountdir)
+
+
+def main(argv):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
+    parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
+    parser.add_argument('--image', type=str, help="Docker image to run")
 
-        if mountdir:
-            logger.info("Unmounting")
-            subprocess.call(["fusermount", "-u", "-z", mountdir])
-            os.rmdir(mountdir)
+    args = parser.parse_args(argv)
+
+    signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+
+    try:
+        arvweb = ArvWeb(args.project_uuid, args.image, args.ports)
+        arvweb.run()
+    except arvados.errors.ArgumentError as e:
+        logger.error(e)
 
 if __name__ == '__main__':
     main(sys.argv[1:])
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 26098a6..870b9a0 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -305,6 +305,11 @@ class CollectionDirectory(Directory):
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
+    def change_collection(self, new_locator):
+        self.collection_locator = new_locator
+        self.collection_object = None
+        self.update()
+
     def new_collection(self, new_collection_object, coll_reader):
         self.collection_object = new_collection_object
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list