[ARVADOS] created: 55888e63181a0847d3e00344fa9c7c5e747082ae
git at public.curoverse.com
git at public.curoverse.com
Fri Oct 10 16:47:19 EDT 2014
at 55888e63181a0847d3e00344fa9c7c5e747082ae (commit)
commit 55888e63181a0847d3e00344fa9c7c5e747082ae
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 16:47:07 2014 -0400
Fix filters, subscribing to components of pipelines.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index e0bf92f..b657b96 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -24,6 +24,7 @@ def main(arguments=None):
args = parser.parse_args(arguments)
+ global filters
filters = []
if args.uuid:
filters += [ ['object_uuid', '=', args.uuid] ]
@@ -35,21 +36,29 @@ def main(arguments=None):
filters += [ ['object_uuid', '=', args.pipeline] ]
if args.job:
- filters += [ ['object_uuid', '=', args.job], ['event_type', 'in', ['stderr', 'stdout'] ] ]
+ filters += [ ['object_uuid', '=', args.job] ]
api = arvados.api('v1', cache=False)
+ global known_component_jobs
+ global ws
+
known_component_jobs = set()
+ ws = None
def on_message(ev):
+ global known_component_jobs
+ global filters
+ global ws
+
logger.debug(ev)
if 'event_type' in ev and (args.pipeline or args.job):
if ev['event_type'] in ('stderr', 'stdout'):
sys.stdout.write(ev["properties"]["text"])
elif ev["event_type"] in ("create", "update"):
- if args.job or ev["object_kind"] == "arvados#pipeline_instance":
- if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
- ws.close_connection()
- if ev["object_kind"] == "arvados#pipeline_instance":
+ #if args.job or ev["object_kind"] == "arvados#pipelineInstance":
+ # if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ # ws.close()
+ if ev["object_kind"] == "arvados#pipelineInstance":
pipeline_jobs = set()
for c in ev["properties"]["new_attributes"]["components"]:
if "job" in ev["properties"]["new_attributes"]["components"][c]:
@@ -62,7 +71,6 @@ def main(arguments=None):
else:
print json.dumps(ev)
- ws = None
try:
ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
ws.run_forever()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index fbf635f..efdde94 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -19,7 +19,7 @@ class EventClient(WebSocketClient):
else:
ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
super(EventClient, self).__init__(url, ssl_options=ssl_options)
- self.filters = []
+ self.filters = filters
self.on_event = on_event
def opened(self):
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 50400ee..b7579d3 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -118,7 +118,7 @@ class EventBus
# Add a filter. This gets the :filters field which is the same
# format as used for regular index queries.
ws.filters << Filter.new(p)
- ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+ ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
# Send any pending events
push_events ws
commit 76d932c4916d15c2ca2668409905da4206cb38b3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 15:45:44 2014 -0400
3609: Add some options to control polling fallback, improve error logging a bit.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 68fb87b..e0bf92f 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -14,7 +14,11 @@ def main(arguments=None):
parser.add_argument('-u', '--uuid', type=str, default="")
parser.add_argument('-f', '--filters', type=str, default="")
- group = parser.add_argument_group('group')
+ group = parser.add_argument_group('Polling fallback')
+ group.add_argument('--poll-fallback', default=15)
+ group.add_argument('--no-poll-fallback', action='store_false', dest='poll_fallback')
+
+ group = parser.add_argument_group('Jobs and Pipelines')
group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
@@ -37,11 +41,10 @@ def main(arguments=None):
known_component_jobs = set()
def on_message(ev):
- print "\n"
- print ev
+ logger.debug(ev)
if 'event_type' in ev and (args.pipeline or args.job):
if ev['event_type'] in ('stderr', 'stdout'):
- print ev["properties"]["text"]
+ sys.stdout.write(ev["properties"]["text"])
elif ev["event_type"] in ("create", "update"):
if args.job or ev["object_kind"] == "arvados#pipeline_instance":
if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
@@ -61,8 +64,7 @@ def main(arguments=None):
ws = None
try:
- print filters
- ws = subscribe(api, filters, lambda ev: on_message(ev))
+ ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
ws.run_forever()
except KeyboardInterrupt:
pass
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 681a45f..fbf635f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -18,8 +18,7 @@ class EventClient(WebSocketClient):
ssl_options={'cert_reqs': ssl.CERT_NONE}
else:
ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
-
- super(EventClient, self).__init__(url, ssl_options)
+ super(EventClient, self).__init__(url, ssl_options=ssl_options)
self.filters = []
self.on_event = on_event
@@ -46,7 +45,7 @@ class EventClient(WebSocketClient):
self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
class PollClient(threading.Thread):
- def __init__(self, api, filters, on_event):
+ def __init__(self, api, filters, on_event, poll_time):
self.api = api
self.filters = filters
self.on_event = on_event
@@ -55,11 +54,12 @@ class PollClient(threading.Thread):
self.id = items[0]["id"]
else:
self.id = 0
+ self.poll_time = poll_time
self.loop = True
def run_forever(self):
while self.loop:
- time.sleep(15)
+ time.sleep(self.poll_time)
items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
for i in items:
self.id = i['id']
@@ -74,21 +74,25 @@ class PollClient(threading.Thread):
def unsubscribe(self, filters):
del self.filters[self.filters.index(filters)]
-def subscribe(api, filters, on_event):
+def subscribe(api, filters, on_event, poll_fallback=15):
ws = None
try:
if 'websocketUrl' in api._rootDesc:
url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
ws = EventClient(url, filters, on_event)
ws.connect()
+ elif poll_fallback:
+ _logger.warn("Web sockets not available, falling back to log table polling")
+ ws = PollClient(api, filters, on_event, poll_fallback)
else:
- _logger.info("Web sockets not available, falling back to log table polling")
- ws = PollClient(api, filters, on_event)
+ _logger.error("Web sockets not available")
+ return None
return ws
except Exception:
- if (ws):
- ws.close_connection()
- try:
- return PollClient(api, filters, on_event)
- except:
+ if ws:
+ ws.close_connection()
+ if poll_fallback:
+ return PollClient(api, filters, on_event, poll_fallback)
+ else:
+ _logger.error("Web sockets not available at %s" % api._rootDesc['websocketUrl'])
raise
commit 2774760a4ccdb65336e2aad32b5fd57fc7bc5bb3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 15:43:52 2014 -0400
Websocket server side fix, perform database notify in after_save callback on
the log object instead of in log_change on ArvadosBase because crunch-dispatch
was creating Log objects directly and bypassing the notification in log_change.
diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index 2e17747..823fd55 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -526,7 +526,6 @@ class ArvadosModel < ActiveRecord::Base
log = Log.new(event_type: event_type).fill_object(self)
yield log
log.save!
- connection.execute "NOTIFY logs, '#{log.id}'"
log_start_state
end
diff --git a/services/api/app/models/log.rb b/services/api/app/models/log.rb
index 34e6dfa..39f789e 100644
--- a/services/api/app/models/log.rb
+++ b/services/api/app/models/log.rb
@@ -5,6 +5,7 @@ class Log < ArvadosModel
serialize :properties, Hash
before_validation :set_default_event_at
attr_accessor :object, :object_kind
+ after_save :send_notify
api_accessible :user, extend: :common do |t|
t.add :id
@@ -80,4 +81,8 @@ class Log < ArvadosModel
# logs can have references to deleted objects
end
+ def send_notify
+ connection.execute "NOTIFY logs, '#{self.id}'"
+ end
+
end
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index 4a6141c..ea1c210 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -12,5 +12,8 @@ Server::Application.configure do
:mount => "/websocket",
:websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
}
+ Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
+ else
+ Rails.logger.info "Websockets disabled"
end
end
commit 5d262f9b0206b79fca198d191d3678415cf5c338
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 11:25:48 2014 -0400
3609: Logging should work for jobs but running into a bug in eventbus
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 8656f92..68fb87b 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -31,24 +31,26 @@ def main(arguments=None):
filters += [ ['object_uuid', '=', args.pipeline] ]
if args.job:
- filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
+ filters += [ ['object_uuid', '=', args.job], ['event_type', 'in', ['stderr', 'stdout'] ] ]
api = arvados.api('v1', cache=False)
known_component_jobs = set()
def on_message(ev):
- if args.pipeline or args.job:
+ print "\n"
+ print ev
+ if 'event_type' in ev and (args.pipeline or args.job):
if ev['event_type'] in ('stderr', 'stdout'):
- print x["properties"]["text"]
- elif x["event_type"] in ("create", "update"):
- if args.job or x["object_kind"] == "arvados#pipeline_instance":
- if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ print ev["properties"]["text"]
+ elif ev["event_type"] in ("create", "update"):
+ if args.job or ev["object_kind"] == "arvados#pipeline_instance":
+ if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
ws.close_connection()
- if x["object_kind"] == "arvados#pipeline_instance":
+ if ev["object_kind"] == "arvados#pipeline_instance":
pipeline_jobs = set()
- for c in x["properties"]["new_attributes"]["components"]:
- if "job" in x["properties"]["new_attributes"]["components"][c]:
- pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+ for c in ev["properties"]["new_attributes"]["components"]:
+ if "job" in ev["properties"]["new_attributes"]["components"][c]:
+ pipeline_jobs.add(ev["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
if known_component_jobs != pipeline_jobs:
ws.unsubscribe(filters)
filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
@@ -59,6 +61,7 @@ def main(arguments=None):
ws = None
try:
+ print filters
ws = subscribe(api, filters, lambda ev: on_message(ev))
ws.run_forever()
except KeyboardInterrupt:
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index faf638f..681a45f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -37,13 +37,13 @@ class EventClient(WebSocketClient):
pass
def subscribe(self, filters, last_log_id=None):
- m = {"method": "subscribe", "filters": self.filters}
+ m = {"method": "subscribe", "filters": filters}
if last_log_id is not None:
m["last_log_id"] = last_log_id
self.send(json.dumps(m))
def unsubscribe(self, filters):
- self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+ self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event):
commit a606b9a9996aa3e8a144a8328acf3f960d8d0057
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 10:52:26 2014 -0400
3609: --job and --pipeline logging implemented, needs testing
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index f421d62..8656f92 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -13,8 +13,11 @@ def main(arguments=None):
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--uuid', type=str, default="")
parser.add_argument('-f', '--filters', type=str, default="")
- parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
- parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
+ group = parser.add_argument_group('group')
+ group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+ group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
args = parser.parse_args(arguments)
filters = []
@@ -28,12 +31,31 @@ def main(arguments=None):
filters += [ ['object_uuid', '=', args.pipeline] ]
if args.job:
- filters += [ ['object_uuid', '=', args.job] ]
+ filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
api = arvados.api('v1', cache=False)
+ known_component_jobs = set()
def on_message(ev):
- print json.dumps(ev)
+ if args.pipeline or args.job:
+ if ev['event_type'] in ('stderr', 'stdout'):
+ print x["properties"]["text"]
+ elif x["event_type"] in ("create", "update"):
+ if args.job or x["object_kind"] == "arvados#pipeline_instance":
+ if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ ws.close_connection()
+ if x["object_kind"] == "arvados#pipeline_instance":
+ pipeline_jobs = set()
+ for c in x["properties"]["new_attributes"]["components"]:
+ if "job" in x["properties"]["new_attributes"]["components"][c]:
+ pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+ if known_component_jobs != pipeline_jobs:
+ ws.unsubscribe(filters)
+ filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+ ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+ known_component_jobs = pipeline_jobs
+ else:
+ print json.dumps(ev)
ws = None
try:
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index beb3454..faf638f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -1,5 +1,5 @@
from ws4py.client.threadedclient import WebSocketClient
-import thread
+import threading
import json
import os
import time
@@ -20,11 +20,11 @@ class EventClient(WebSocketClient):
ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
super(EventClient, self).__init__(url, ssl_options)
- self.filters = filters
+ self.filters = []
self.on_event = on_event
def opened(self):
- self.send(json.dumps({"method": "subscribe", "filters": self.filters}))
+ self.subscribe(self.filters)
def received_message(self, m):
self.on_event(json.loads(str(m)))
@@ -36,6 +36,15 @@ class EventClient(WebSocketClient):
except:
pass
+ def subscribe(self, filters, last_log_id=None):
+ m = {"method": "subscribe", "filters": self.filters}
+ if last_log_id is not None:
+ m["last_log_id"] = last_log_id
+ self.send(json.dumps(m))
+
+ def unsubscribe(self, filters):
+ self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+
class PollClient(threading.Thread):
def __init__(self, api, filters, on_event):
self.api = api
@@ -59,6 +68,12 @@ class PollClient(threading.Thread):
def close_connection(self):
self.loop = False
+ def subscribe(self, filters):
+ self.filters += filters
+
+ def unsubscribe(self, filters):
+ del self.filters[self.filters.index(filters)]
+
def subscribe(api, filters, on_event):
ws = None
try:
@@ -67,6 +82,7 @@ def subscribe(api, filters, on_event):
ws = EventClient(url, filters, on_event)
ws.connect()
else:
+ _logger.info("Web sockets not available, falling back to log table polling")
ws = PollClient(api, filters, on_event)
return ws
except Exception:
commit 2f8d1a30eaca707dff086b52d5809963e6a8cc1c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Oct 10 10:17:40 2014 -0400
3609: Refactoring arv-ws into a reusable command module. Working on adding
ability to monitor pipeline/job log at command line.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
new file mode 100644
index 0000000..f421d62
--- /dev/null
+++ b/sdk/python/arvados/commands/ws.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+import sys
+import logging
+import argparse
+import arvados
+import json
+from arvados.events import subscribe
+
+def main(arguments=None):
+ logger = logging.getLogger('arvados.arv-ws')
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-u', '--uuid', type=str, default="")
+ parser.add_argument('-f', '--filters', type=str, default="")
+ parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+ parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+ args = parser.parse_args(arguments)
+
+ filters = []
+ if args.uuid:
+ filters += [ ['object_uuid', '=', args.uuid] ]
+
+ if args.filters:
+ filters += json.loads(args.filters)
+
+ if args.pipeline:
+ filters += [ ['object_uuid', '=', args.pipeline] ]
+
+ if args.job:
+ filters += [ ['object_uuid', '=', args.job] ]
+
+ api = arvados.api('v1', cache=False)
+
+ def on_message(ev):
+ print json.dumps(ev)
+
+ ws = None
+ try:
+ ws = subscribe(api, filters, lambda ev: on_message(ev))
+ ws.run_forever()
+ except KeyboardInterrupt:
+ pass
+ except Exception:
+ logger.exception('')
+ finally:
+ if ws:
+ ws.close_connection()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index b7d610d..beb3454 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -36,14 +36,43 @@ class EventClient(WebSocketClient):
except:
pass
+class PollClient(threading.Thread):
+ def __init__(self, api, filters, on_event):
+ self.api = api
+ self.filters = filters
+ self.on_event = on_event
+ items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+ if len(items) > 0:
+ self.id = items[0]["id"]
+ else:
+ self.id = 0
+ self.loop = True
+
+ def run_forever(self):
+ while self.loop:
+ time.sleep(15)
+ items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+ for i in items:
+ self.id = i['id']
+ self.on_event(i)
+
+ def close_connection(self):
+ self.loop = False
+
def subscribe(api, filters, on_event):
ws = None
try:
- url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
- ws = EventClient(url, filters, on_event)
- ws.connect()
+ if 'websocketUrl' in api._rootDesc:
+ url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+ ws = EventClient(url, filters, on_event)
+ ws.connect()
+ else:
+ ws = PollClient(api, filters, on_event)
return ws
except Exception:
if (ws):
ws.close_connection()
- raise
+ try:
+ return PollClient(api, filters, on_event)
+ except:
+ raise
diff --git a/sdk/python/bin/arv-ws b/sdk/python/bin/arv-ws
index ce7f066..4e663ce 100755
--- a/sdk/python/bin/arv-ws
+++ b/sdk/python/bin/arv-ws
@@ -1,30 +1,4 @@
#!/usr/bin/env python
-import sys
-import logging
-import argparse
-import arvados
-from arvados.events import subscribe
-
-logger = logging.getLogger('arvados.arv-ws')
-
-parser = argparse.ArgumentParser()
-parser.add_argument('-u', '--uuid', type=str, default="")
-args = parser.parse_args()
-
-filters = []
-if len(args.uuid)>0: filters = [ ['object_uuid', '=', args.uuid] ]
-
-api = arvados.api('v1', cache=False)
-
-def on_message(ev):
- print "\n", ev
-
-ws = None
-try:
- ws = subscribe(api, filters, lambda ev: on_message(ev))
- ws.run_forever()
-except Exception:
- logger.exception('')
- if (ws):
- ws.close_connection()
+from arvados.commands.ws import main
+main()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list