[ARVADOS] created: bd49a6c49d66f89930545a212961280eacb0398a

git at public.curoverse.com git at public.curoverse.com
Mon Feb 1 03:02:48 EST 2016


        at  bd49a6c49d66f89930545a212961280eacb0398a (commit)


commit bd49a6c49d66f89930545a212961280eacb0398a
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Feb 1 03:01:31 2016 -0500

    8288: Add test case for --exec mode.

diff --git a/services/fuse/tests/test_exec.py b/services/fuse/tests/test_exec.py
new file mode 100644
index 0000000..66013a4
--- /dev/null
+++ b/services/fuse/tests/test_exec.py
@@ -0,0 +1,60 @@
+import arvados_fuse.command
+import json
+import multiprocessing
+import os
+import run_test_server
+import tempfile
+import unittest
+
+try:
+    from shlex import quote
+except:
+    from pipes import quote
+
+def try_exec(mnt, cmd):
+    try:
+        arvados_fuse.command.Mount(
+            arvados_fuse.command.ArgumentParser().parse_args([
+                '--read-write',
+                '--mount-tmp=zzz',
+                '--unmount-timeout=0.1',
+                mnt,
+                '--exec'] + cmd)).run()
+    except SystemExit:
+        pass
+    else:
+        raise AssertionError('should have exited')
+
+
+class ExecMode(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        run_test_server.run()
+        run_test_server.run_keep(enforce_permissions=True, num_servers=2)
+        run_test_server.authorize_with('active')
+
+    @classmethod
+    def tearDownClass(cls):
+        run_test_server.stop_keep(num_servers=2)
+
+    def setUp(self):
+        self.mnt = tempfile.mkdtemp()
+        _, self.okfile = tempfile.mkstemp()
+        self.pool = multiprocessing.Pool(1)
+
+    def tearDown(self):
+        self.pool.terminate()
+        self.pool.join()
+        os.rmdir(self.mnt)
+        os.unlink(self.okfile)
+
+    def test_exec(self):
+        self.pool.apply(try_exec, (self.mnt, [
+            'sh', '-c',
+            'echo -n foo >{}; cp {} {}'.format(
+                quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+                quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+                quote(os.path.join(self.okfile)))]))
+        self.assertRegexpMatches(
+            json.load(open(self.okfile))['manifest_text'],
+            r' 0:3:foo.txt\n')

commit 92f8e281a5c20a41aaccd75a0c3f2248065aecf1
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon Feb 1 01:58:34 2016 -0500

    8288: Add timeout option to close() method of event clients.
    
    Previously in EventClient, close() didn't wait for anything. Now, if a
    timeout is given, it waits for ws4py to call the closed() callback to
    indicate the connection has closed.
    
    Previously in PollClient, close() waited indefinitely for the polling
    thread to terminate.  This can take a very long time if, for example,
    there are multiple subscriptions and the "get logs" API transaction is
    slow.
    
    The only apparent reason a caller would want to wait here at all is to
    guarantee the simplifying assumption the on_event() callback is never
    called after close().  Now, instead of letting the thread run until
    all events are received and handled, PollClient achieves this the same
    way EventClient does: ignore events that arrive after close().

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 3132da3..94b8a9d 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -13,6 +13,7 @@ from ws4py.client.threadedclient import WebSocketClient
 
 _logger = logging.getLogger('arvados.events')
 
+
 class EventClient(WebSocketClient):
     def __init__(self, url, filters, on_event, last_log_id):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
@@ -29,23 +30,33 @@ class EventClient(WebSocketClient):
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
-        self._closed_lock = threading.RLock()
-        self._closed = False
+        self._closing_lock = threading.RLock()
+        self._closing = False
+        self._closed = threading.Event()
 
     def opened(self):
         self.subscribe(self.filters, self.last_log_id)
 
+    def closed(self, code, reason=None):
+        self._closed.set()
+
     def received_message(self, m):
-        with self._closed_lock:
-            if not self._closed:
+        with self._closing_lock:
+            if not self._closing:
                 self.on_event(json.loads(str(m)))
 
-    def close(self, code=1000, reason=''):
-        """Close event client and wait for it to finish."""
+    def close(self, code=1000, reason='', timeout=0):
+        """Close event client and optionally wait for it to finish.
+
+        :timeout: is the number of seconds to wait for ws4py to
+        indicate that the connection has closed.
+        """
         super(EventClient, self).close(code, reason)
-        with self._closed_lock:
+        with self._closing_lock:
             # make sure we don't process any more messages.
-            self._closed = True
+            self._closing = True
+        # wait for ws4py to tell us the connection is closed.
+        self._closed.wait(timeout=timeout)
 
     def subscribe(self, filters, last_log_id=None):
         m = {"method": "subscribe", "filters": filters}
@@ -56,6 +67,7 @@ class EventClient(WebSocketClient):
     def unsubscribe(self, filters):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
@@ -67,8 +79,9 @@ class PollClient(threading.Thread):
         self.on_event = on_event
         self.poll_time = poll_time
         self.daemon = True
-        self.stop = threading.Event()
         self.last_log_id = last_log_id
+        self._closing = threading.Event()
+        self._closing_lock = threading.RLock()
 
     def run(self):
         self.id = 0
@@ -83,7 +96,7 @@ class PollClient(threading.Thread):
 
         self.on_event({'status': 200})
 
-        while not self.stop.isSet():
+        while not self._closing.is_set():
             max_id = self.id
             moreitems = False
             for f in self.filters:
@@ -91,24 +104,38 @@ class PollClient(threading.Thread):
                 for i in items["items"]:
                     if i['id'] > max_id:
                         max_id = i['id']
-                    self.on_event(i)
+                    with self._closing_lock:
+                        if self._closing.is_set():
+                            return
+                        self.on_event(i)
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             self.id = max_id
             if not moreitems:
-                self.stop.wait(self.poll_time)
+                self._closing.wait(self.poll_time)
 
     def run_forever(self):
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
-        while not self.stop.is_set():
-            self.stop.wait(1)
+        while not self._closing.is_set():
+            self._closing.wait(1)
+
+    def close(self, code=None, reason=None, timeout=0):
+        """Close poll client and optionally wait for it to finish.
+
+        If an :on_event: handler is running in a different thread,
+        first wait (indefinitely) for it to return.
+
+        After closing, wait up to :timeout: seconds for the thread to
+        finish the poll request in progress (if any).
 
-    def close(self):
-        """Close poll client and wait for it to finish."""
+        :code: and :reason: are ignored. They are present for
+        interface compatibility with EventClient.
+        """
 
-        self.stop.set()
+        with self._closing_lock:
+            self._closing.set()
         try:
-            self.join()
+            self.join(timeout=timeout)
         except RuntimeError:
             # "join() raises a RuntimeError if an attempt is made to join the
             # current thread as that would cause a deadlock. It is also an

commit 5f381fd8a047411e9ff9836519f69a60335a475e
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Jan 31 21:43:30 2016 -0500

    8288: Give fusermount -u a chance to work before resorting to operations.destroy().
    
    Log a warning when resorting to operations.destroy().
    
    De-duplicate setup/teardown code so more of the --exec code path is exercised in tests.

diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py
index 71623a5..ae0c920 100644
--- a/services/fuse/arvados_fuse/command.py
+++ b/services/fuse/arvados_fuse/command.py
@@ -82,6 +82,10 @@ class ArgumentParser(argparse.ArgumentParser):
 
         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
 
+        self.add_argument('--unmount-timeout',
+                          type=float, default=2.0,
+                          help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                             dest="exec_args", metavar=('command', 'args', '...', '--'),
                             help="""Mount, run a command, then unmount and exit""")
@@ -108,13 +112,20 @@ class Mount(object):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
         if self.args.mode != 'by_pdh':
             self.operations.listen_for_events()
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         self.operations.initlock.wait()
 
     def __exit__(self, exc_type, exc_value, traceback):
         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-        self.operations.destroy()
+        self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+        if self.llfuse_thread.is_alive():
+            self.logger.warning("Mount.__exit__:"
+                                " llfuse thread still alive %fs after umount"
+                                " -- resorting to operations.destroy()",
+                                self.args.unmount_timeout)
+            self.operations.destroy()
 
     def run(self):
         if self.args.exec_args:
@@ -277,63 +288,56 @@ From here, the following directories are available:
 '''.format(api_host, user_email)
 
     def _run_exec(self):
-        # Initialize the fuse connection
-        llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
-        # Subscribe to change events from API server
-        if self.args.mode != 'by_pdh':
-            self.operations.listen_for_events()
-
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
-
-        # wait until the driver is finished initializing
-        self.operations.initlock.wait()
-
         rc = 255
-        try:
-            sp = subprocess.Popen(self.args.exec_args, shell=False)
-
-            # forward signals to the process.
-            signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
-            # wait for process to complete.
-            rc = sp.wait()
-
-            # restore default signal handlers.
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-        except Exception as e:
-            self.logger.exception(
-                'arv-mount: exception during exec %s', self.args.exec_args)
+        with self:
             try:
-                rc = e.errno
-            except AttributeError:
-                pass
-        finally:
-            subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-            self.operations.destroy()
+                sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+                # forward signals to the process.
+                signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+                # wait for process to complete.
+                rc = sp.wait()
+
+                # restore default signal handlers.
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+                signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+            except Exception as e:
+                self.logger.exception(
+                    'arv-mount: exception during exec %s', self.args.exec_args)
+                try:
+                    rc = e.errno
+                except AttributeError:
+                    pass
         exit(rc)
 
     def _run_standalone(self):
         try:
             llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
 
-            if not (self.args.exec_args or self.args.foreground):
-                self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
-                                                       files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+            if not self.args.foreground:
+                self.daemon_ctx = daemon.DaemonContext(
+                    working_directory=os.path.dirname(self.args.mountpoint),
+                    files_preserve=range(
+                        3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
             self.operations.listen_for_events()
 
-            llfuse.main()
+            self._llfuse_main()
         except Exception as e:
             self.logger.exception('arv-mount: exception during mount: %s', e)
             exit(getattr(e, 'errno', 1))
-        finally:
-            self.operations.destroy()
         exit(0)
+
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
diff --git a/services/fuse/tests/integration_test.py b/services/fuse/tests/integration_test.py
index faa4a55..5a45bfc 100644
--- a/services/fuse/tests/integration_test.py
+++ b/services/fuse/tests/integration_test.py
@@ -62,7 +62,9 @@ class IntegrationTest(unittest.TestCase):
             def wrapper(self, *args, **kwargs):
                 with arvados_fuse.command.Mount(
                         arvados_fuse.command.ArgumentParser().parse_args(
-                            argv + ['--foreground', self.mnt])):
+                            argv + ['--foreground',
+                                    '--unmount-timeout=0.1',
+                                    self.mnt])):
                     return func(self, *args, **kwargs)
             return wrapper
         return decorator
diff --git a/services/fuse/tests/mount_test_base.py b/services/fuse/tests/mount_test_base.py
index 44ec199..91a4929 100644
--- a/services/fuse/tests/mount_test_base.py
+++ b/services/fuse/tests/mount_test_base.py
@@ -37,6 +37,16 @@ class MountTestBase(unittest.TestCase):
         run_test_server.authorize_with("admin")
         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
+    # This is a copy of Mount's method.  TODO: Refactor MountTestBase
+    # to use a Mount instead of copying its code.
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
+
     def make_mount(self, root_class, **root_kwargs):
         self.operations = fuse.Operations(
             os.getuid(), os.getgid(),
@@ -45,7 +55,9 @@ class MountTestBase(unittest.TestCase):
         self.operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(self.operations, self.mounttmp, [])
-        threading.Thread(None, llfuse.main).start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         # wait until the driver is finished initializing
         self.operations.initlock.wait()
         return self.operations.inodes[llfuse.ROOT_INODE]
@@ -55,17 +67,13 @@ class MountTestBase(unittest.TestCase):
         self.pool.join()
         del self.pool
 
-        # llfuse.close is buggy, so use fusermount instead.
-        #llfuse.close(unmount=True)
-
-        count = 0
-        success = 1
-        while (count < 9 and success != 0):
-          success = subprocess.call(["fusermount", "-u", self.mounttmp])
-          time.sleep(0.1)
-          count += 1
-
-        self.operations.destroy()
+        subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+        self.llfuse_thread.join(timeout=0.1)
+        if self.llfuse_thread.is_alive():
+            logger.warning("MountTestBase.tearDown():"
+                           " llfuse thread still alive 100ms after umount"
+                           " -- resorting to operations.destroy()")
+            self.operations.destroy()
 
         os.rmdir(self.mounttmp)
         if self.keeptmp:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list