[ARVADOS] created: 0e98d5f2f9c827deb5beb4a1765a4718d7cffd88
Git user
git at public.curoverse.com
Tue Nov 1 11:30:02 EDT 2016
at 0e98d5f2f9c827deb5beb4a1765a4718d7cffd88 (commit)
commit 0e98d5f2f9c827deb5beb4a1765a4718d7cffd88
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 1 11:28:49 2016 -0400
10224: Choose a recent-event threshold without querying the entire event history.
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index a1b4638..cf26f9e 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -155,39 +155,66 @@ class PollClient(threading.Thread):
self._closing_lock = threading.RLock()
def run(self):
- self.id = 0
if self.last_log_id != None:
- self.id = self.last_log_id
+ # Caller supplied the last-seen event ID from a previous
+ # connection
+ skip_old_events = [["id", ">", str(self.last_log_id)]]
else:
- for f in self.filters:
- for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
- try:
- items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
- break
- except errors.ApiError as error:
- pass
- else:
- tries_left = 0
- break
- if tries_left == 0:
- _logger.exception("PollClient thread could not contact API server.")
- with self._closing_lock:
- self._closing.set()
- thread.interrupt_main()
- return
- if items:
- if items[0]['id'] > self.id:
- self.id = items[0]['id']
+ # We need to do a reverse-order query to find the most
+ # recent event ID (see "if not skip_old_events" below).
+ skip_old_events = False
self.on_event({'status': 200})
while not self._closing.is_set():
- max_id = self.id
moreitems = False
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
- items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ if not skip_old_events:
+ # If the caller didn't provide a known
+ # recent ID, our first request will ask
+ # for the single most recent event from
+ # the last 2 hours (the time restriction
+ # avoids doing an expensive database
+ # query, and leaves a big enough margin to
+ # account for clock skew). If we do find a
+ # recent event, we remember its ID but
+ # then discard it (we are supposed to be
+ # returning new/current events, not old
+ # ones).
+ #
+ # Subsequent requests will get multiple
+ # events in chronological order, and
+ # filter on that same cutoff time, or
+ # (once we see our first matching event)
+ # the ID of the last-seen event.
+ skip_old_events = [[
+ "created_at", ">=",
+ time.strftime(
+ "%Y-%m-%dT%H:%M:%SZ",
+ time.gmtime(time.time()-7200))]]
+ items = self.api.logs().list(
+ order="id desc",
+ limit=1,
+ filters=f+skip_old_events).execute()
+ if items["items"]:
+ skip_old_events = [
+ ["id", ">", str(items["items"][0]["id"])]]
+ items = {
+ "items": [],
+ "items_available": 0,
+ }
+ else:
+ # In this case, either we know the most
+ # recent matching ID, or we know there
+ # were no matching events in the 2-hour
+ # window before subscribing. Either way we
+ # can safely ask for events in ascending
+ # order.
+ items = self.api.logs().list(
+ order="id asc",
+ filters=f+skip_old_events).execute()
break
except errors.ApiError as error:
pass
@@ -201,8 +228,7 @@ class PollClient(threading.Thread):
thread.interrupt_main()
return
for i in items["items"]:
- if i['id'] > max_id:
- max_id = i['id']
+ skip_old_events = [["id", ">", str(i["id"])]]
with self._closing_lock:
if self._closing.is_set():
return
@@ -213,7 +239,6 @@ class PollClient(threading.Thread):
thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
- self.id = max_id
if not moreitems:
self._closing.wait(self.poll_time)
diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py
index f32a5db..1828e15 100644
--- a/services/fuse/arvados_fuse/__init__.py
+++ b/services/fuse/arvados_fuse/__init__.py
@@ -367,9 +367,10 @@ class Operations(llfuse.Operations):
return True
def listen_for_events(self):
- self.events = arvados.events.subscribe(self._api_client,
- [["event_type", "in", ["create", "update", "delete"]]],
- self.on_event)
+ self.events = arvados.events.subscribe(
+ self._api_client,
+ [["event_type", "in", ["create", "update", "delete"]]],
+ self.on_event)
@catch_exceptions
def on_event(self, ev):
commit 23099b0854fc3b38712dcbcdb6484115b2a1fdf3
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Nov 1 11:29:42 2016 -0400
10224: Change event_type to "delete" to match HTTP verb and Python SDK code.
diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index 672374b..18d5647 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -636,7 +636,7 @@ class ArvadosModel < ActiveRecord::Base
end
def log_destroy
- log_change('destroy') do |log|
+ log_change('delete') do |log|
log.fill_properties('old', etag(@old_attributes), @old_logged_attributes)
log.update_to nil
end
diff --git a/services/api/app/models/log.rb b/services/api/app/models/log.rb
index f8d624a..7eab402 100644
--- a/services/api/app/models/log.rb
+++ b/services/api/app/models/log.rb
@@ -47,7 +47,7 @@ class Log < ArvadosModel
self.event_at = thing.created_at
when "update"
self.event_at = thing.modified_at
- when "destroy"
+ when "delete"
self.event_at = db_current_time
end
self
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list