[ARVADOS] created: 0e98d5f2f9c827deb5beb4a1765a4718d7cffd88

Git user git at public.curoverse.com
Tue Nov 1 11:30:02 EDT 2016


        at  0e98d5f2f9c827deb5beb4a1765a4718d7cffd88 (commit)


commit 0e98d5f2f9c827deb5beb4a1765a4718d7cffd88
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 1 11:28:49 2016 -0400

    10224: Choose a recent-event threshold without querying the entire event history.

diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index a1b4638..cf26f9e 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -155,39 +155,66 @@ class PollClient(threading.Thread):
         self._closing_lock = threading.RLock()
 
     def run(self):
-        self.id = 0
         if self.last_log_id != None:
-            self.id = self.last_log_id
+            # Caller supplied the last-seen event ID from a previous
+            # connection
+            skip_old_events = [["id", ">", str(self.last_log_id)]]
         else:
-            for f in self.filters:
-                for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
-                    try:
-                        items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
-                        break
-                    except errors.ApiError as error:
-                        pass
-                    else:
-                        tries_left = 0
-                        break
-                if tries_left == 0:
-                    _logger.exception("PollClient thread could not contact API server.")
-                    with self._closing_lock:
-                        self._closing.set()
-                    thread.interrupt_main()
-                    return
-                if items:
-                    if items[0]['id'] > self.id:
-                        self.id = items[0]['id']
+            # We need to do a reverse-order query to find the most
+            # recent event ID (see "if not skip_old_events" below).
+            skip_old_events = False
 
         self.on_event({'status': 200})
 
         while not self._closing.is_set():
-            max_id = self.id
             moreitems = False
             for f in self.filters:
                 for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
                     try:
-                        items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+                        if not skip_old_events:
+                            # If the caller didn't provide a known
+                            # recent ID, our first request will ask
+                            # for the single most recent event from
+                            # the last 2 hours (the time restriction
+                            # avoids doing an expensive database
+                            # query, and leaves a big enough margin to
+                            # account for clock skew). If we do find a
+                            # recent event, we remember its ID but
+                            # then discard it (we are supposed to be
+                            # returning new/current events, not old
+                            # ones).
+                            #
+                            # Subsequent requests will get multiple
+                            # events in chronological order, and
+                            # filter on that same cutoff time, or
+                            # (once we see our first matching event)
+                            # the ID of the last-seen event.
+                            skip_old_events = [[
+                                "created_at", ">=",
+                                time.strftime(
+                                    "%Y-%m-%dT%H:%M:%SZ",
+                                    time.gmtime(time.time()-7200))]]
+                            items = self.api.logs().list(
+                                order="id desc",
+                                limit=1,
+                                filters=f+skip_old_events).execute()
+                            if items["items"]:
+                                skip_old_events = [
+                                    ["id", ">", str(items["items"][0]["id"])]]
+                                items = {
+                                    "items": [],
+                                    "items_available": 0,
+                                }
+                        else:
+                            # In this case, either we know the most
+                            # recent matching ID, or we know there
+                            # were no matching events in the 2-hour
+                            # window before subscribing. Either way we
+                            # can safely ask for events in ascending
+                            # order.
+                            items = self.api.logs().list(
+                                order="id asc",
+                                filters=f+skip_old_events).execute()
                         break
                     except errors.ApiError as error:
                         pass
@@ -201,8 +228,7 @@ class PollClient(threading.Thread):
                     thread.interrupt_main()
                     return
                 for i in items["items"]:
-                    if i['id'] > max_id:
-                        max_id = i['id']
+                    skip_old_events = [["id", ">", str(i["id"])]]
                     with self._closing_lock:
                         if self._closing.is_set():
                             return
@@ -213,7 +239,6 @@ class PollClient(threading.Thread):
                             thread.interrupt_main()
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
-            self.id = max_id
             if not moreitems:
                 self._closing.wait(self.poll_time)
 
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index f32a5db..1828e15 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -367,9 +367,10 @@ class Operations(llfuse.Operations):
         return True
 
     def listen_for_events(self):
-        self.events = arvados.events.subscribe(self._api_client,
-                                 [["event_type", "in", ["create", "update", "delete"]]],
-                                 self.on_event)
+        self.events = arvados.events.subscribe(
+            self._api_client,
+            [["event_type", "in", ["create", "update", "delete"]]],
+            self.on_event)
 
     @catch_exceptions
     def on_event(self, ev):

commit 23099b0854fc3b38712dcbcdb6484115b2a1fdf3
Author: Tom Clegg <tom at curoverse.com>
Date:   Tue Nov 1 11:29:42 2016 -0400

    10224: Change event_type to "delete" to match HTTP verb and Python SDK code.

diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index 672374b..18d5647 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -636,7 +636,7 @@ class ArvadosModel < ActiveRecord::Base
   end
 
   def log_destroy
-    log_change('destroy') do |log|
+    log_change('delete') do |log|
       log.fill_properties('old', etag(@old_attributes), @old_logged_attributes)
       log.update_to nil
     end
diff --git a/services/api/app/models/log.rb b/services/api/app/models/log.rb
index f8d624a..7eab402 100644
--- a/services/api/app/models/log.rb
+++ b/services/api/app/models/log.rb
@@ -47,7 +47,7 @@ class Log < ArvadosModel
       self.event_at = thing.created_at
     when "update"
       self.event_at = thing.modified_at
-    when "destroy"
+    when "delete"
       self.event_at = db_current_time
     end
     self

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list