[ARVADOS] created: 55888e63181a0847d3e00344fa9c7c5e747082ae

git at public.curoverse.com git at public.curoverse.com
Fri Oct 10 16:47:19 EDT 2014


        at  55888e63181a0847d3e00344fa9c7c5e747082ae (commit)


commit 55888e63181a0847d3e00344fa9c7c5e747082ae
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 16:47:07 2014 -0400

    Fix filters, subscribing to components of pipelines.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index e0bf92f..b657b96 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -24,6 +24,7 @@ def main(arguments=None):
 
     args = parser.parse_args(arguments)
 
+    global filters
     filters = []
     if args.uuid:
         filters += [ ['object_uuid', '=', args.uuid] ]
@@ -35,21 +36,29 @@ def main(arguments=None):
         filters += [ ['object_uuid', '=', args.pipeline] ]
 
     if args.job:
-        filters += [ ['object_uuid', '=', args.job], ['event_type', 'in', ['stderr', 'stdout'] ] ]
+        filters += [ ['object_uuid', '=', args.job] ]
 
     api = arvados.api('v1', cache=False)
 
+    global known_component_jobs
+    global ws
+
     known_component_jobs = set()
+    ws = None
     def on_message(ev):
+        global known_component_jobs
+        global filters
+        global ws
+
         logger.debug(ev)
         if 'event_type' in ev and (args.pipeline or args.job):
             if ev['event_type'] in ('stderr', 'stdout'):
                 sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
-                if args.job or ev["object_kind"] == "arvados#pipeline_instance":
-                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        ws.close_connection()
-                if ev["object_kind"] == "arvados#pipeline_instance":
+                #if args.job or ev["object_kind"] == "arvados#pipelineInstance":
+                #    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                #        ws.close()
+                if ev["object_kind"] == "arvados#pipelineInstance":
                     pipeline_jobs = set()
                     for c in ev["properties"]["new_attributes"]["components"]:
                         if "job" in ev["properties"]["new_attributes"]["components"][c]:
@@ -62,7 +71,6 @@ def main(arguments=None):
         else:
             print json.dumps(ev)
 
-    ws = None
     try:
         ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
         ws.run_forever()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index fbf635f..efdde94 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -19,7 +19,7 @@ class EventClient(WebSocketClient):
         else:
             ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
         super(EventClient, self).__init__(url, ssl_options=ssl_options)
-        self.filters = []
+        self.filters = filters
         self.on_event = on_event
 
     def opened(self):
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index 50400ee..b7579d3 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -118,7 +118,7 @@ class EventBus
           # Add a filter.  This gets the :filters field which is the same
           # format as used for regular index queries.
           ws.filters << Filter.new(p)
-          ws.send ({status: 200, message: 'subscribe ok'}.to_json)
+          ws.send ({status: 200, message: 'subscribe ok', filter: p}.to_json)
 
           # Send any pending events
           push_events ws

commit 76d932c4916d15c2ca2668409905da4206cb38b3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 15:45:44 2014 -0400

    3609: Add some options to control polling fallback, improve error logging a bit.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 68fb87b..e0bf92f 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -14,7 +14,11 @@ def main(arguments=None):
     parser.add_argument('-u', '--uuid', type=str, default="")
     parser.add_argument('-f', '--filters', type=str, default="")
 
-    group = parser.add_argument_group('group')
+    group = parser.add_argument_group('Polling fallback')
+    group.add_argument('--poll-fallback', default=15)
+    group.add_argument('--no-poll-fallback', action='store_false', dest='poll_fallback')
+
+    group = parser.add_argument_group('Jobs and Pipelines')
     group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
     group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
 
@@ -37,11 +41,10 @@ def main(arguments=None):
 
     known_component_jobs = set()
     def on_message(ev):
-        print "\n"
-        print ev
+        logger.debug(ev)
         if 'event_type' in ev and (args.pipeline or args.job):
             if ev['event_type'] in ('stderr', 'stdout'):
-                print ev["properties"]["text"]
+                sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
                 if args.job or ev["object_kind"] == "arvados#pipeline_instance":
                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
@@ -61,8 +64,7 @@ def main(arguments=None):
 
     ws = None
     try:
-        print filters
-        ws = subscribe(api, filters, lambda ev: on_message(ev))
+        ws = subscribe(api, filters, lambda ev: on_message(ev), poll_fallback=args.poll_fallback)
         ws.run_forever()
     except KeyboardInterrupt:
         pass
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index 681a45f..fbf635f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -18,8 +18,7 @@ class EventClient(WebSocketClient):
             ssl_options={'cert_reqs': ssl.CERT_NONE}
         else:
             ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
-
-        super(EventClient, self).__init__(url, ssl_options)
+        super(EventClient, self).__init__(url, ssl_options=ssl_options)
         self.filters = []
         self.on_event = on_event
 
@@ -46,7 +45,7 @@ class EventClient(WebSocketClient):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
 class PollClient(threading.Thread):
-    def __init__(self, api, filters, on_event):
+    def __init__(self, api, filters, on_event, poll_time):
         self.api = api
         self.filters = filters
         self.on_event = on_event
@@ -55,11 +54,12 @@ class PollClient(threading.Thread):
             self.id = items[0]["id"]
         else:
             self.id = 0
+        self.poll_time = poll_time
         self.loop = True
 
     def run_forever(self):
         while self.loop:
-            time.sleep(15)
+            time.sleep(self.poll_time)
             items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
             for i in items:
                 self.id = i['id']
@@ -74,21 +74,25 @@ class PollClient(threading.Thread):
     def unsubscribe(self, filters):
         del self.filters[self.filters.index(filters)]
 
-def subscribe(api, filters, on_event):
+def subscribe(api, filters, on_event, poll_fallback=15):
     ws = None
     try:
         if 'websocketUrl' in api._rootDesc:
             url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
             ws = EventClient(url, filters, on_event)
             ws.connect()
+        elif poll_fallback:
+            _logger.warn("Web sockets not available, falling back to log table polling")
+            ws = PollClient(api, filters, on_event, poll_fallback)
         else:
-            _logger.info("Web sockets not available, falling back to log table polling")
-            ws = PollClient(api, filters, on_event)
+            _logger.error("Web sockets not available")
+            return None
         return ws
     except Exception:
-        if (ws):
-          ws.close_connection()
-        try:
-            return PollClient(api, filters, on_event)
-        except:
+        if ws:
+            ws.close_connection()
+        if poll_fallback:
+            return PollClient(api, filters, on_event, poll_fallback)
+        else:
+            _logger.error("Web sockets not available at %s" % api._rootDesc['websocketUrl'])
             raise

commit 2774760a4ccdb65336e2aad32b5fd57fc7bc5bb3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 15:43:52 2014 -0400

    Websocket server side fix, perform database notify in after_save callback on
    the log object instead of in log_change on ArvadosBase because crunch-dispatch
    was creating Log objects directly and bypassing the notification in log_change.

diff --git a/services/api/app/models/arvados_model.rb b/services/api/app/models/arvados_model.rb
index 2e17747..823fd55 100644
--- a/services/api/app/models/arvados_model.rb
+++ b/services/api/app/models/arvados_model.rb
@@ -526,7 +526,6 @@ class ArvadosModel < ActiveRecord::Base
     log = Log.new(event_type: event_type).fill_object(self)
     yield log
     log.save!
-    connection.execute "NOTIFY logs, '#{log.id}'"
     log_start_state
   end
 
diff --git a/services/api/app/models/log.rb b/services/api/app/models/log.rb
index 34e6dfa..39f789e 100644
--- a/services/api/app/models/log.rb
+++ b/services/api/app/models/log.rb
@@ -5,6 +5,7 @@ class Log < ArvadosModel
   serialize :properties, Hash
   before_validation :set_default_event_at
   attr_accessor :object, :object_kind
+  after_save :send_notify
 
   api_accessible :user, extend: :common do |t|
     t.add :id
@@ -80,4 +81,8 @@ class Log < ArvadosModel
     # logs can have references to deleted objects
   end
 
+  def send_notify
+    connection.execute "NOTIFY logs, '#{self.id}'"
+  end
+
 end
diff --git a/services/api/config/initializers/eventbus.rb b/services/api/config/initializers/eventbus.rb
index 4a6141c..ea1c210 100644
--- a/services/api/config/initializers/eventbus.rb
+++ b/services/api/config/initializers/eventbus.rb
@@ -12,5 +12,8 @@ Server::Application.configure do
       :mount => "/websocket",
       :websocket_only => (ENV['ARVADOS_WEBSOCKETS'] == "ws-only")
     }
+    Rails.logger.info "Websockets #{ENV['ARVADOS_WEBSOCKETS']}, running at /websocket"
+  else
+    Rails.logger.info "Websockets disabled"
   end
 end

commit 5d262f9b0206b79fca198d191d3678415cf5c338
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 11:25:48 2014 -0400

    3609: Logging should work for jobs but running into a bug in eventbus

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 8656f92..68fb87b 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -31,24 +31,26 @@ def main(arguments=None):
         filters += [ ['object_uuid', '=', args.pipeline] ]
 
     if args.job:
-        filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
+        filters += [ ['object_uuid', '=', args.job], ['event_type', 'in', ['stderr', 'stdout'] ] ]
 
     api = arvados.api('v1', cache=False)
 
     known_component_jobs = set()
     def on_message(ev):
-        if args.pipeline or args.job:
+        print "\n"
+        print ev
+        if 'event_type' in ev and (args.pipeline or args.job):
             if ev['event_type'] in ('stderr', 'stdout'):
-                print x["properties"]["text"]
-            elif x["event_type"] in ("create", "update"):
-                if args.job or x["object_kind"] == "arvados#pipeline_instance":
-                    if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                print ev["properties"]["text"]
+            elif ev["event_type"] in ("create", "update"):
+                if args.job or ev["object_kind"] == "arvados#pipeline_instance":
+                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
                         ws.close_connection()
-                if x["object_kind"] == "arvados#pipeline_instance":
+                if ev["object_kind"] == "arvados#pipeline_instance":
                     pipeline_jobs = set()
-                    for c in x["properties"]["new_attributes"]["components"]:
-                        if "job" in x["properties"]["new_attributes"]["components"][c]:
-                            pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+                    for c in ev["properties"]["new_attributes"]["components"]:
+                        if "job" in ev["properties"]["new_attributes"]["components"][c]:
+                            pipeline_jobs.add(ev["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
                     if known_component_jobs != pipeline_jobs:
                         ws.unsubscribe(filters)
                         filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
@@ -59,6 +61,7 @@ def main(arguments=None):
 
     ws = None
     try:
+        print filters
         ws = subscribe(api, filters, lambda ev: on_message(ev))
         ws.run_forever()
     except KeyboardInterrupt:
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index faf638f..681a45f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -37,13 +37,13 @@ class EventClient(WebSocketClient):
             pass
 
     def subscribe(self, filters, last_log_id=None):
-        m = {"method": "subscribe", "filters": self.filters}
+        m = {"method": "subscribe", "filters": filters}
         if last_log_id is not None:
             m["last_log_id"] = last_log_id
         self.send(json.dumps(m))
 
     def unsubscribe(self, filters):
-        self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+        self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event):

commit a606b9a9996aa3e8a144a8328acf3f960d8d0057
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 10:52:26 2014 -0400

    3609: --job and --pipeline logging implemented, needs testing

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index f421d62..8656f92 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -13,8 +13,11 @@ def main(arguments=None):
     parser = argparse.ArgumentParser()
     parser.add_argument('-u', '--uuid', type=str, default="")
     parser.add_argument('-f', '--filters', type=str, default="")
-    parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
-    parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
+    group = parser.add_argument_group('group')
+    group.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+    group.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+
     args = parser.parse_args(arguments)
 
     filters = []
@@ -28,12 +31,31 @@ def main(arguments=None):
         filters += [ ['object_uuid', '=', args.pipeline] ]
 
     if args.job:
-        filters += [ ['object_uuid', '=', args.job] ]
+        filters += [ ['object_uuid', '=', args.job] ], ['event_type', 'in', ['stderr', 'stdout'] ]
 
     api = arvados.api('v1', cache=False)
 
+    known_component_jobs = set()
     def on_message(ev):
-        print json.dumps(ev)
+        if args.pipeline or args.job:
+            if ev['event_type'] in ('stderr', 'stdout'):
+                print x["properties"]["text"]
+            elif x["event_type"] in ("create", "update"):
+                if args.job or x["object_kind"] == "arvados#pipeline_instance":
+                    if x["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                        ws.close_connection()
+                if x["object_kind"] == "arvados#pipeline_instance":
+                    pipeline_jobs = set()
+                    for c in x["properties"]["new_attributes"]["components"]:
+                        if "job" in x["properties"]["new_attributes"]["components"][c]:
+                            pipeline_jobs.add(x["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
+                    if known_component_jobs != pipeline_jobs:
+                        ws.unsubscribe(filters)
+                        filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+                        ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+                        known_component_jobs = pipeline_jobs
+        else:
+            print json.dumps(ev)
 
     ws = None
     try:
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index beb3454..faf638f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -1,5 +1,5 @@
 from ws4py.client.threadedclient import WebSocketClient
-import thread
+import threading
 import json
 import os
 import time
@@ -20,11 +20,11 @@ class EventClient(WebSocketClient):
             ssl_options={'cert_reqs': ssl.CERT_REQUIRED}
 
         super(EventClient, self).__init__(url, ssl_options)
-        self.filters = filters
+        self.filters = []
         self.on_event = on_event
 
     def opened(self):
-        self.send(json.dumps({"method": "subscribe", "filters": self.filters}))
+        self.subscribe(self.filters)
 
     def received_message(self, m):
         self.on_event(json.loads(str(m)))
@@ -36,6 +36,15 @@ class EventClient(WebSocketClient):
         except:
             pass
 
+    def subscribe(self, filters, last_log_id=None):
+        m = {"method": "subscribe", "filters": self.filters}
+        if last_log_id is not None:
+            m["last_log_id"] = last_log_id
+        self.send(json.dumps(m))
+
+    def unsubscribe(self, filters):
+        self.send(json.dumps({"method": "unsubscribe", "filters": self.filters}))
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event):
         self.api = api
@@ -59,6 +68,12 @@ class PollClient(threading.Thread):
     def close_connection(self):
         self.loop = False
 
+    def subscribe(self, filters):
+        self.filters += filters
+
+    def unsubscribe(self, filters):
+        del self.filters[self.filters.index(filters)]
+
 def subscribe(api, filters, on_event):
     ws = None
     try:
@@ -67,6 +82,7 @@ def subscribe(api, filters, on_event):
             ws = EventClient(url, filters, on_event)
             ws.connect()
         else:
+            _logger.info("Web sockets not available, falling back to log table polling")
             ws = PollClient(api, filters, on_event)
         return ws
     except Exception:

commit 2f8d1a30eaca707dff086b52d5809963e6a8cc1c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Oct 10 10:17:40 2014 -0400

    3609: Refactoring arv-ws into a reusable command module.  Working on adding
    ability to monitor pipeline/job log at command line.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
new file mode 100644
index 0000000..f421d62
--- /dev/null
+++ b/sdk/python/arvados/commands/ws.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python
+
+import sys
+import logging
+import argparse
+import arvados
+import json
+from arvados.events import subscribe
+
+def main(arguments=None):
+    logger = logging.getLogger('arvados.arv-ws')
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-u', '--uuid', type=str, default="")
+    parser.add_argument('-f', '--filters', type=str, default="")
+    parser.add_argument('-p', '--pipeline', type=str, default="", help="Print log output from a pipeline and its jobs")
+    parser.add_argument('-j', '--job', type=str, default="", help="Print log output from a job")
+    args = parser.parse_args(arguments)
+
+    filters = []
+    if args.uuid:
+        filters += [ ['object_uuid', '=', args.uuid] ]
+
+    if args.filters:
+        filters += json.loads(args.filters)
+
+    if args.pipeline:
+        filters += [ ['object_uuid', '=', args.pipeline] ]
+
+    if args.job:
+        filters += [ ['object_uuid', '=', args.job] ]
+
+    api = arvados.api('v1', cache=False)
+
+    def on_message(ev):
+        print json.dumps(ev)
+
+    ws = None
+    try:
+        ws = subscribe(api, filters, lambda ev: on_message(ev))
+        ws.run_forever()
+    except KeyboardInterrupt:
+        pass
+    except Exception:
+        logger.exception('')
+    finally:
+        if ws:
+            ws.close_connection()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index b7d610d..beb3454 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -36,14 +36,43 @@ class EventClient(WebSocketClient):
         except:
             pass
 
+class PollClient(threading.Thread):
+    def __init__(self, api, filters, on_event):
+        self.api = api
+        self.filters = filters
+        self.on_event = on_event
+        items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+        if len(items) > 0:
+            self.id = items[0]["id"]
+        else:
+            self.id = 0
+        self.loop = True
+
+    def run_forever(self):
+        while self.loop:
+            time.sleep(15)
+            items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+            for i in items:
+                self.id = i['id']
+                self.on_event(i)
+
+    def close_connection(self):
+        self.loop = False
+
 def subscribe(api, filters, on_event):
     ws = None
     try:
-        url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
-        ws = EventClient(url, filters, on_event)
-        ws.connect()
+        if 'websocketUrl' in api._rootDesc:
+            url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+            ws = EventClient(url, filters, on_event)
+            ws.connect()
+        else:
+            ws = PollClient(api, filters, on_event)
         return ws
     except Exception:
         if (ws):
           ws.close_connection()
-        raise
+        try:
+            return PollClient(api, filters, on_event)
+        except:
+            raise
diff --git a/sdk/python/bin/arv-ws b/sdk/python/bin/arv-ws
index ce7f066..4e663ce 100755
--- a/sdk/python/bin/arv-ws
+++ b/sdk/python/bin/arv-ws
@@ -1,30 +1,4 @@
 #!/usr/bin/env python
 
-import sys
-import logging
-import argparse
-import arvados
-from arvados.events import subscribe
-
-logger = logging.getLogger('arvados.arv-ws')
-
-parser = argparse.ArgumentParser()
-parser.add_argument('-u', '--uuid', type=str, default="")
-args = parser.parse_args()
-
-filters = []
-if len(args.uuid)>0: filters = [ ['object_uuid', '=', args.uuid] ]
-
-api = arvados.api('v1', cache=False)
-
-def on_message(ev):
-  print "\n", ev
-
-ws = None
-try:
-  ws = subscribe(api, filters, lambda ev: on_message(ev))
-  ws.run_forever()
-except Exception:
-  logger.exception('')
-  if (ws):
-    ws.close_connection()
+from arvados.commands.ws import main
+main()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list