[ARVADOS] updated: 77c077a48513ebcb1f17ba7e57cf6804b2af910f
git at public.curoverse.com
git at public.curoverse.com
Tue Sep 16 22:32:15 EDT 2014
Summary of changes:
sdk/cli/bin/arv-run-pipeline-instance | 4 +-
sdk/python/arvados/commands/run.py | 90 ++++++++++++++++++-----------------
sdk/python/arvados/events.py | 38 +++++++++++++--
services/api/app/models/commit.rb | 4 ++
4 files changed, 88 insertions(+), 48 deletions(-)
via 77c077a48513ebcb1f17ba7e57cf6804b2af910f (commit)
via 5e655f244603b5101baec7a3091f492c7854c25d (commit)
from 546448eb3ebe16b8806d63cb6cadc064835cbe42 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 77c077a48513ebcb1f17ba7e57cf6804b2af910f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Sep 16 22:32:08 2014 -0400
3609: Now print log messages for submitted pipeline.
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index 17ed3f6..3778c57 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -28,16 +28,16 @@ def statfile(prefix, fn, pattern):
if os.path.exists(absfn):
fn = os.path.abspath(fn)
st = os.stat(fn)
- mount = os.path.dirname(fn)+"/.arvados#collection"
- if os.path.exists(mount):
- with file(mount, 'r') as f:
- c = json.load(f)
- return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
- else:
- needupload_files.append(fn)
- return ArvFile(prefix, fn[1:])
- else:
- return prefix+fn
+ if stat.S_ISREG(st.st_mode):
+ mount = os.path.dirname(fn)+"/.arvados#collection"
+ if os.path.exists(mount):
+ with file(mount, 'r') as f:
+ c = json.load(f)
+ return prefix+"$(file "+c["portable_data_hash"]+"/" + os.path.basename(fn) + ")"
+ else:
+ needupload_files.append(fn)
+ return ArvFile(prefix, fn[1:])
+ return prefix+fn
def main(arguments=None):
args = arvrun_parser.parse_args(arguments)
@@ -64,32 +64,34 @@ def main(arguments=None):
n = True
pathprefix = "/"
- while n:
- pathstep = None
- for c in [c for c in commandargs if isinstance(c, ArvFile)]:
- if pathstep is None:
- sp = c.fn.split('/')
- if len(sp) < 2:
- n = False
- break
- pathstep = sp[0] + "/"
- else:
- if not c.fn.startswith(pathstep):
- n = False
- break
- if n:
- pathprefix += pathstep
- for c in [c for c in commandargs if isinstance(c, ArvFile)]:
- c.fn = c.fn[len(pathstep):]
-
- os.chdir(pathprefix)
-
- if args.dry_run:
- print("cd %s" % pathprefix)
- print("arv-put \"%s\"" % '" "'.join([c.fn for c in commandargs if isinstance(c, ArvFile)]))
- pdh = "$(input)"
- else:
- pdh = put.main(["--portable-data-hash"]+[c.fn for c in commandargs if isinstance(c, ArvFile)])
+ files = [c for c in commandargs if isinstance(c, ArvFile)]
+ if len(files) > 0:
+ while n:
+ pathstep = None
+ for c in files:
+ if pathstep is None:
+ sp = c.fn.split('/')
+ if len(sp) < 2:
+ n = False
+ break
+ pathstep = sp[0] + "/"
+ else:
+ if not c.fn.startswith(pathstep):
+ n = False
+ break
+ if n:
+ pathprefix += pathstep
+ for c in files:
+ c.fn = c.fn[len(pathstep):]
+
+ os.chdir(pathprefix)
+
+ if args.dry_run:
+ print("cd %s" % pathprefix)
+ print("arv-put \"%s\"" % '" "'.join([c.fn for c in files]))
+ pdh = "$(input)"
+ else:
+ pdh = put.main(["--portable-data-hash"]+[c.fn for c in files])
commandargs = [("%s$(file %s/%s)" % (c.prefix, pdh, c.fn)) if isinstance(c, ArvFile) else c for c in commandargs]
@@ -141,13 +143,15 @@ def main(arguments=None):
else:
api = arvados.api('v1')
pi = api.pipeline_instances().create(body=pipeline).execute()
- def stuff(x):
- print x
- arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]]], stuff)
- state = pi["state"]
- while state == "RunningOnServer":
- time.sleep(5)
- state = api.pipeline_instances().get(uuid=pi["uuid"]).execute()["state"]
+ ws = None
+ def report(x):
+ if x["event_type"] == "stderr":
+ print x["properties"]["text"]
+ elif x["event_type"] == "update" and x["properties"]["new_attributes"]["state"] in ["Complete", "Failed"]:
+ ws.close_connection()
+
+ ws = arvados.events.subscribe(api, [["object_uuid", "=", pi["uuid"]], ["event_type", "in", ["stderr", "update"]]], report)
+ ws.run_forever()
if __name__ == '__main__':
main()
diff --git a/sdk/python/arvados/events.py b/sdk/python/arvados/events.py
index b7d610d..343683f 100644
--- a/sdk/python/arvados/events.py
+++ b/sdk/python/arvados/events.py
@@ -7,6 +7,7 @@ import ssl
import re
import config
import logging
+import threading
_logger = logging.getLogger('arvados.events')
@@ -36,14 +37,43 @@ class EventClient(WebSocketClient):
except:
pass
+class PollClient(threading.Thread):
+ def __init__(self, api, filters, on_event):
+ self.api = api
+ self.filters = filters
+ self.on_event = on_event
+ items = self.api.logs().list(limit=1, order=json.dumps(["id desc"]), filters=json.dumps(filters)).execute()['items']
+ if len(items) > 0:
+ self.id = items[0]["id"]
+ else:
+ self.id = 0
+ self.loop = True
+
+ def run_forever(self):
+ while self.loop:
+ time.sleep(15)
+ items = self.api.logs().list(limit=1, order=json.dumps(["id asc"]), filters=json.dumps(self.filters+[["id", ">", str(self.id)]])).execute()['items']
+ for i in items:
+ self.id = i['id']
+ self.on_event(i)
+
+ def close_connection(self):
+ self.loop = False
+
def subscribe(api, filters, on_event):
ws = None
try:
- url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
- ws = EventClient(url, filters, on_event)
- ws.connect()
+ if 'websocketUrl' in api._rootDesc:
+ url = "{}?api_token={}".format(api._rootDesc['websocketUrl'], config.get('ARVADOS_API_TOKEN'))
+ ws = EventClient(url, filters, on_event)
+ ws.connect()
+ else:
+ ws = PollClient(api, filters, on_event)
return ws
except Exception:
if (ws):
ws.close_connection()
- raise
+ try:
+ return PollClient(api, filters, on_event)
+ except:
+ raise
diff --git a/services/api/app/models/commit.rb b/services/api/app/models/commit.rb
index 0f62737..5ed23a6 100644
--- a/services/api/app/models/commit.rb
+++ b/services/api/app/models/commit.rb
@@ -41,6 +41,10 @@ class Commit < ActiveRecord::Base
readable = readable.where(name: repository)
end
+ if readable.empty?
+ logger.warn "Repository #{repository} not readable by user"
+ end
+
commits = []
readable.each do |r|
if on_disk_repos[r.name]
commit 5e655f244603b5101baec7a3091f492c7854c25d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Sep 16 22:30:21 2014 -0400
Fix arv-run-pipeline-instance to not crash when template is null. no issue #
diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance
index b19bf04..2996ca2 100755
--- a/sdk/cli/bin/arv-run-pipeline-instance
+++ b/sdk/cli/bin/arv-run-pipeline-instance
@@ -569,9 +569,11 @@ class WhRunPipelineInstance
debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
if (not @instance[:name].nil?) and (not @instance[:name].empty?)
pipeline_name = @instance[:name]
- else
+ elsif @instance[:pipeline_template_uuid]
fetch_template(@instance[:pipeline_template_uuid])
pipeline_name = @template[:name]
+ else
+ pipeline_name = "pipeline started #{@instance[:started_at]}"
end
if c[:output_name] != false
# Create a collection located in the same project as the pipeline with the contents of the output.
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list