[ARVADOS] updated: 58014b961247c55093afdd1dab3e290518a54e3e
git at public.curoverse.com
git at public.curoverse.com
Thu Oct 16 09:30:54 EDT 2014
Summary of changes:
sdk/python/arvados/commands/ws.py | 53 +++++++++++++++++++++++----------------
1 file changed, 32 insertions(+), 21 deletions(-)
via 58014b961247c55093afdd1dab3e290518a54e3e (commit)
from fe21e5236f596001d22e4b28519d1c5ef32a7e3c (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 58014b961247c55093afdd1dab3e290518a54e3e
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Oct 16 09:30:48 2014 -0400
3609: Prefetch pipeline components to find out which jobs to subscribe to.
diff --git a/sdk/python/arvados/commands/ws.py b/sdk/python/arvados/commands/ws.py
index 142a205..457880e 100644
--- a/sdk/python/arvados/commands/ws.py
+++ b/sdk/python/arvados/commands/ws.py
@@ -26,28 +26,41 @@ def main(arguments=None):
args = parser.parse_args(arguments)
global filters
+ global known_component_jobs
+ global ws
+
filters = []
+ known_component_jobs = set()
+ ws = None
+
+ def update_subscribed_components(components):
+ global known_component_jobs
+ global filters
+ pipeline_jobs = set()
+ for c in components:
+ if "job" in components[c]:
+ pipeline_jobs.add(components[c]["job"]["uuid"])
+ if known_component_jobs != pipeline_jobs:
+ ws.unsubscribe(filters)
+ filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
+ ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
+ known_component_jobs = pipeline_jobs
+
+ api = arvados.api('v1', cache=False)
+
if args.uuid:
filters += [ ['object_uuid', '=', args.uuid] ]
if args.filters:
filters += json.loads(args.filters)
- if args.pipeline:
- filters += [ ['object_uuid', '=', args.pipeline] ]
-
if args.job:
filters += [ ['object_uuid', '=', args.job] ]
- api = arvados.api('v1', cache=False)
-
- global known_component_jobs
- global ws
+ if args.pipeline:
+ filters += [ ['object_uuid', '=', args.pipeline] ]
- known_component_jobs = set()
- ws = None
def on_message(ev):
- global known_component_jobs
global filters
global ws
@@ -57,27 +70,25 @@ def main(arguments=None):
sys.stdout.write(ev["properties"]["text"])
elif ev["event_type"] in ("create", "update"):
if ev["object_kind"] == "arvados#pipelineInstance":
- pipeline_jobs = set()
- for c in ev["properties"]["new_attributes"]["components"]:
- if "job" in ev["properties"]["new_attributes"]["components"][c]:
- pipeline_jobs.add(ev["properties"]["new_attributes"]["components"][c]["job"]["uuid"])
- if known_component_jobs != pipeline_jobs:
- ws.unsubscribe(filters)
- filters = [['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]]
- ws.subscribe([['object_uuid', 'in', [args.pipeline] + list(pipeline_jobs)]])
- known_component_jobs = pipeline_jobs
+ update_subscribed_components(ev["properties"]["new_attributes"]["components"])
+ elif 'status' in ev and ev['status'] == 200:
+ pass
else:
print json.dumps(ev)
try:
ws = subscribe(api, filters, on_message, poll_fallback=args.poll_interval)
if ws:
+ if args.pipeline:
+ c = api.pipeline_instances().get(uuid=args.pipeline).execute()
+ update_subscribed_components(c["components"])
+
while True:
time.sleep(60)
except KeyboardInterrupt:
pass
- except Exception:
- logger.exception('')
+ except Exception as e:
+ logger.error(e)
finally:
if ws:
ws.close()
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list