[ARVADOS] created: f34c10a5b22e3f9941f2cfe60f6f9d7a81319b6f

git at public.curoverse.com git at public.curoverse.com
Tue Nov 17 22:00:55 EST 2015


        at  f34c10a5b22e3f9941f2cfe60f6f9d7a81319b6f (commit)


commit f34c10a5b22e3f9941f2cfe60f6f9d7a81319b6f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Nov 17 22:00:45 2015 -0500

    3137: Refactor stats to record keep & fuse operations as well as bytes.

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index 346147e..0d5eeb3 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -669,6 +669,8 @@ class KeepClient(object):
         self._user_agent_pool = Queue.LifoQueue()
         self.upload_counter = Counter()
         self.download_counter = Counter()
+        self.put_counter = Counter()
+        self.get_counter = Counter()
 
         if local_store:
             self.local_store = local_store
@@ -875,6 +877,9 @@ class KeepClient(object):
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
+
+        self.get_counter.add(1)
+
         locator = KeepLocator(loc_s)
         slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
@@ -980,6 +985,8 @@ class KeepClient(object):
         elif not isinstance(data, str):
             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
+        self.put_counter.add(1)
+
         data_hash = hashlib.md5(data).hexdigest()
         loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index fc7aace..fd25aa9 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -326,6 +326,8 @@ class Operations(llfuse.Operations):
 
         self.read_counter = arvados.keep.Counter()
         self.write_counter = arvados.keep.Counter()
+        self.read_ops_counter = arvados.keep.Counter()
+        self.write_ops_counter = arvados.keep.Counter()
 
         self.events = None
 
@@ -488,6 +490,8 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
+        self.read_ops_counter.add(1)
+
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
@@ -503,6 +507,8 @@ class Operations(llfuse.Operations):
     @catch_exceptions
     def write(self, fh, off, buf):
         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
+        self.write_ops_counter.add(1)
+
         if fh in self._filehandles:
             handle = self._filehandles[fh]
         else:
diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount
index d155534..96116a8 100755
--- a/services/fuse/bin/arv-mount
+++ b/services/fuse/bin/arv-mount
@@ -16,6 +16,61 @@ import arvados.keep
 
 logger = logging.getLogger('arvados.arv-mount')
 
+class Stat(object):
+    def __init__(self, prefix, interval,
+                 egr_name, ing_name,
+                 egr_func, ing_func):
+        self.prefix = prefix
+        self.interval = interval
+        self.egr_name = egr_name
+        self.ing_name = ing_name
+        self.egress = egr_func
+        self.ingress = ing_func
+        self.egr = self.egress()
+        self.ing = self.ingress()
+
+    def update(self):
+        self.egr_prev = self.egr
+        self.ing_prev = self.ing
+        self.egr = self.egress()
+        self.ing = self.ingress()
+
+        delta = " -- interval %.4f seconds %d %s %d %s" % (self.interval,
+                                                           self.egr-self.egr_prev,
+                                                           self.egr_name,
+                                                           self.ing-self.ing_prev,
+                                                           self.ing_name)
+
+        sys.stderr.write("crunchstat: %s %d %s %d %s%s\n" % (self.prefix,
+                                                             self.egr,
+                                                             self.egr_name,
+                                                             self.ing,
+                                                             self.ing_name,
+                                                             delta))
+
+def statlogger(keep, ops):
+    interval = 10
+    calls = Stat("keepcalls", interval, "put", "get",
+                 keep.put_counter.get,
+                 keep.get_counter.get)
+    net = Stat("net:keep0", interval, "tx", "rx",
+               keep.upload_counter.get,
+               keep.download_counter.get)
+    fuseops = Stat("fuseops", interval,"write", "read",
+                   ops.write_ops_counter.get,
+                   ops.read_ops_counter.get)
+    blk = Stat("blkio:0:0", interval, "write", "read",
+               ops.write_counter.get,
+               ops.read_counter.get)
+
+    while True:
+        time.sleep(interval)
+        calls.update()
+        net.update()
+        fuseops.update()
+        blk.update()
+
+
 if __name__ == '__main__':
     # Handle command line parameters
     parser = argparse.ArgumentParser(
@@ -55,6 +110,8 @@ with "--".
     parser.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
     parser.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
 
+    parser.add_argument('--stats', action='store_true', help="Write stats to stderr", default=False)
+
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),
                         help="""Mount, run a command, then unmount and exit""")
@@ -101,6 +158,11 @@ with "--".
         api = ThreadSafeApiCache(apiconfig=arvados.config.settings(),
                                  keep_params={"block_cache": arvados.keep.KeepBlockCache(args.file_cache)})
 
+        if args.stats:
+            statsthread = threading.Thread(target=statlogger, args=(api.keep, operations))
+            statsthread.daemon = True
+            statsthread.start()
+
         usr = api.users().current().execute(num_retries=args.retries)
         now = time.time()
         dir_class = None

commit f3b78f5c0690a93d4cc9370519f970ded430c453
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Nov 17 11:11:20 2015 -0500

    3137: Add stat counters for bytes uploaded/downloaded (keep) and read/written (fuse).

diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
index ec9f6f6..346147e 100644
--- a/sdk/python/arvados/keep.py
+++ b/sdk/python/arvados/keep.py
@@ -210,6 +210,21 @@ class KeepBlockCache(object):
                 self._cache.insert(0, n)
                 return n, True
 
+
+class Counter(object):
+    def __init__(self, v=0):
+        self._lk = threading.Lock()
+        self._val = v
+
+    def add(self, v):
+        with self._lk:
+            self._val += v
+
+    def get(self):
+        with self._lk:
+            return self._val
+
+
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
@@ -290,7 +305,6 @@ class KeepClient(object):
             with self._done_lock:
                 return self._done
 
-
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
 
@@ -309,7 +323,9 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+                     upload_counter=None,
+                     download_counter=None, **headers):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -318,6 +334,8 @@ class KeepClient(object):
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
+            self.upload_counter = upload_counter
+            self.download_counter = download_counter
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -403,11 +421,13 @@ class KeepClient(object):
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return None
-            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.download_counter:
+                self.download_counter.add(len(self._result['body']))
             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
             if resp_md5 != locator.md5sum:
                 _logger.warning("Checksum fail: md5(%s) = %s",
@@ -422,36 +442,37 @@ class KeepClient(object):
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
             try:
-                self._headers = {}
-                body_reader = cStringIO.StringIO(body)
-                response_body = cStringIO.StringIO()
-                curl.setopt(pycurl.NOSIGNAL, 1)
-                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
-                curl.setopt(pycurl.URL, url.encode('utf-8'))
-                # Using UPLOAD tells cURL to wait for a "go ahead" from the
-                # Keep server (in the form of a HTTP/1.1 "100 Continue"
-                # response) instead of sending the request body immediately.
-                # This allows the server to reject the request if the request
-                # is invalid or the server is read-only, without waiting for
-                # the client to send the entire block.
-                curl.setopt(pycurl.UPLOAD, True)
-                curl.setopt(pycurl.INFILESIZE, len(body))
-                curl.setopt(pycurl.READFUNCTION, body_reader.read)
-                curl.setopt(pycurl.HTTPHEADER, [
-                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
-                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
-                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
-                self._setcurltimeouts(curl, timeout)
-                try:
-                    curl.perform()
-                except Exception as e:
-                    raise arvados.errors.HttpError(0, str(e))
-                self._result = {
-                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                    'body': response_body.getvalue(),
-                    'headers': self._headers,
-                    'error': False,
-                }
+                with timer.Timer() as t:
+                    self._headers = {}
+                    body_reader = cStringIO.StringIO(body)
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                    # response) instead of sending the request body immediately.
+                    # This allows the server to reject the request if the request
+                    # is invalid or the server is read-only, without waiting for
+                    # the client to send the entire block.
+                    curl.setopt(pycurl.UPLOAD, True)
+                    curl.setopt(pycurl.INFILESIZE, len(body))
+                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
                 ok = retry.check_http_response_success(self._result['status_code'])
                 if not ok:
                     self._result['error'] = arvados.errors.HttpError(
@@ -472,6 +493,13 @@ class KeepClient(object):
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return False
+            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(body),
+                         t.msecs,
+                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.upload_counter:
+                self.upload_counter.add(len(body))
             return True
 
         def _setcurltimeouts(self, curl, timeouts):
@@ -639,6 +667,8 @@ class KeepClient(object):
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self._user_agent_pool = Queue.LifoQueue()
+        self.upload_counter = Counter()
+        self.download_counter = Counter()
 
         if local_store:
             self.local_store = local_store
@@ -790,7 +820,10 @@ class KeepClient(object):
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(
-                    root, self._user_agent_pool, **headers)
+                    root, self._user_agent_pool,
+                    upload_counter=self.upload_counter,
+                    download_counter=self.download_counter,
+                    **headers)
         return local_roots
 
     @staticmethod
@@ -862,7 +895,9 @@ class KeepClient(object):
                                    )])
         # Map root URLs to their KeepService objects.
         roots_map = {
-            root: self.KeepService(root, self._user_agent_pool)
+            root: self.KeepService(root, self._user_agent_pool,
+                                   upload_counter=self.upload_counter,
+                                   download_counter=self.download_counter)
             for root in hint_roots
         }
 
diff --git a/sdk/python/tests/test_keep_client.py b/sdk/python/tests/test_keep_client.py
index ada0dac..735b938 100644
--- a/sdk/python/tests/test_keep_client.py
+++ b/sdk/python/tests/test_keep_client.py
@@ -29,14 +29,21 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
                                              proxy='', local_store='')
 
     def test_KeepBasicRWTest(self):
+        self.assertEqual(0, self.keep_client.upload_counter.get())
         foo_locator = self.keep_client.put('foo')
         self.assertRegexpMatches(
             foo_locator,
             '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
             'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
+
+        # 6 bytes because uploaded 2 copies
+        self.assertEqual(6, self.keep_client.upload_counter.get())
+
+        self.assertEqual(0, self.keep_client.download_counter.get())
         self.assertEqual(self.keep_client.get(foo_locator),
                          'foo',
                          'wrong content from Keep.get(md5("foo"))')
+        self.assertEqual(3, self.keep_client.download_counter.get())
 
     def test_KeepBinaryRWTest(self):
         blob_str = '\xff\xfe\xf7\x00\x01\x02'
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index 4915131..fc7aace 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -66,6 +66,7 @@ import itertools
 import ciso8601
 import collections
 import functools
+import arvados.keep
 
 import Queue
 
@@ -323,6 +324,9 @@ class Operations(llfuse.Operations):
 
         self.num_retries = num_retries
 
+        self.read_counter = arvados.keep.Counter()
+        self.write_counter = arvados.keep.Counter()
+
         self.events = None
 
     def init(self):
@@ -491,7 +495,10 @@ class Operations(llfuse.Operations):
 
         self.inodes.touch(handle.obj)
 
-        return handle.obj.readfrom(off, size, self.num_retries)
+        r = handle.obj.readfrom(off, size, self.num_retries)
+        if r:
+            self.read_counter.add(len(r))
+        return r
 
     @catch_exceptions
     def write(self, fh, off, buf):
@@ -506,7 +513,10 @@ class Operations(llfuse.Operations):
 
         self.inodes.touch(handle.obj)
 
-        return handle.obj.writeto(off, buf, self.num_retries)
+        w = handle.obj.writeto(off, buf, self.num_retries)
+        if w:
+            self.write_counter.add(w)
+        return w
 
     @catch_exceptions
     def release(self, fh):
diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py
index cc8693b..1d7b908 100644
--- a/services/fuse/tests/test_mount.py
+++ b/services/fuse/tests/test_mount.py
@@ -427,12 +427,16 @@ class FuseWriteFileTest(MountTestBase):
 
         self.assertNotIn("file1.txt", collection)
 
+        self.assertEqual(0, self.operations.write_counter.get())
         self.pool.apply(fuseWriteFileTestHelperWriteFile, (self.mounttmp,))
+        self.assertEqual(12, self.operations.write_counter.get())
 
         with collection.open("file1.txt") as f:
             self.assertEqual(f.read(), "Hello world!")
 
+        self.assertEqual(0, self.operations.read_counter.get())
         self.pool.apply(fuseWriteFileTestHelperReadFile, (self.mounttmp,))
+        self.assertEqual(12, self.operations.read_counter.get())
 
         collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
         self.assertRegexpMatches(collection2["manifest_text"],

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list