[ARVADOS] updated: 58014b961247c55093afdd1dab3e290518a54e3e

git at public.curoverse.com git at public.curoverse.com
Thu Oct 16 09:30:54 EDT 2014


Summary of changes:
 sdk/python/arvados/commands/ws.py | 53 +++++++++++++++++++++++----------------
 1 file changed, 32 insertions(+), 21 deletions(-)

       via  58014b961247c55093afdd1dab3e290518a54e3e (commit)
      from  fe21e5236f596001d22e4b28519d1c5ef32a7e3c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 58014b961247c55093afdd1dab3e290518a54e3e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Thu Oct 16 09:30:48 2014 -0400

    3609: Prefetch pipeline components to find out which jobs to subscribe to.

diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 142a205..457880e 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -26,28 +26,41 @@ def main(arguments=None):
     args = parser.parse_args(arguments)
 
     global filters
+    global known_component_jobs
+    global ws
+
     filters = []
+    known_component_jobs = set()
+    ws = None
+
+    def update_subscribed_components(components):
+        global known_component_jobs
+        global filters
+        pipeline_jobs = set()
+        for c in components:
+            if "job" in components[c]:
+                pipeline_jobs.add(components[c]["job"]["uuid"])
+        if known_component_jobs != pipeline_jobs:
+            ws.unsubscribe(filters)
+            filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+            ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+            known_component_jobs = pipeline_jobs
+
+    api = arvados.api('v1', cache=False)
+
     if args.uuid:
         filters += [ ['object_uuid', '=', args.uuid] ]
 
     if args.filters:
         filters += json.loads(args.filters)
 
-    if args.pipeline:
-        filters += [ ['object_uuid', '=', args.pipeline] ]
-
     if args.job:
         filters += [ ['object_uuid', '=', args.job] ]
 
-    api = arvados.api('v1', cache=False)
-
-    global known_component_jobs
-    global ws
+    if args.pipeline:
+        filters += [ ['object_uuid', '=', args.pipeline] ]
 
-    known_component_jobs = set()
-    ws = None
     def on_message(ev):
-        global known_component_jobs
         global filters
         global ws
 
@@ -57,27 +70,25 @@ def main(arguments=None):
                 sys.stdout.write(ev["properties"]["text"])
             elif ev["event_type"] in ("create", "update"):
                 if ev["object_kind"] == "arvados#pipelineInstance":
-                    pipeline_jobs = set()
-                    for c in ev["properties"]["new_attributes"]["components"]:
-                        if "job" in ev["properties"]["new_attributes"]["components"][c]:
-                            pipeline_jobs.add(ev["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
-                    if known_component_jobs != pipeline_jobs:
-                        ws.unsubscribe(filters)
-                        filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
-                        ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
-                        known_component_jobs = pipeline_jobs
+                    update_subscribed_components(ev["properties"]["new_attributes"]["components"])
+        elif 'status' in ev and ev['status'] == 200:
+            pass
         else:
             print json.dumps(ev)
 
     try:
         ws = subscribe(api, filters, on_message, poll_fallback=args.poll_interval)
         if ws:
+            if args.pipeline:
+                c = api.pipeline_instances().get(uuid=args.pipeline).execute()
+                update_subscribed_components(c["components"])
+
             while True:
                 time.sleep(60)
     except KeyboardInterrupt:
         pass
-    except Exception:
-        logger.exception('')
+    except Exception as e:
+        logger.error(e)
     finally:
         if ws:
             ws.close()

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list