[ARVADOS] created: c5e5db25e95289376a0df35695ab2b5f48131b1f
git at public.curoverse.com
git at public.curoverse.com
Mon Jun 29 16:14:21 EDT 2015
at c5e5db25e95289376a0df35695ab2b5f48131b1f (commit)
commit c5e5db25e95289376a0df35695ab2b5f48131b1f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 29 16:14:43 2015 -0400
6264: First pass complete, ready for testing.
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 9db55ee..3799507 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -33,6 +33,42 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
return dockerRequirement["dockerImageId"]
+class CollectionFsAccess(object):
+ def __init__(self):
+ self.collections = {}
+
+ def get_collection(self, path):
+ p = path.split("/")
+ if p[0] == "keep":
+ del p[0]
+ if p[0] not in self.collections:
+ self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+ return (self.collections[p[0]], "/".join(p[1:]))
+
+ def _match(self, collection, patternsegments, parent):
+ ret = []
+ for i in collection:
+ if fnmatch.fnmatch(i, patternsegments[0]):
+ cur = os.path.join(parent, i)
+ if len(patternsegments) == 1:
+ ret.append(cur)
+ else:
+ ret.extend(self._match(collection[i], patternsegments[1:], cur))
+ return ret
+
+ def glob(self, pattern):
+ collection, rest = self.get_path(pattern)
+ patternsegments = rest.split("/")
+ return self._match(collection, patternsegments, collection.manifest_locator())
+
+ def open(self, fn, mode):
+ collection, rest = self.get_path(fn)
+ return c.open(rest, mode)
+
+ def exists(self, fn):
+ collection, rest = self.get_path(fn)
+ return c.exists(rest)
+
class ArvadosJob(object):
def __init__(self, runner):
@@ -65,44 +101,8 @@ class ArvadosJob(object):
self.arvrunner.jobs[response["uuid"]] = self
- class CollectionFsAccess(object):
- def __init__(self):
- self.collections = {}
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0] == "keep":
- del p[0]
- if p[0] not in self.collections:
- self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
- return (self.collections[p[0]], "/".join(p[1:]))
-
- def _match(self, collection, patternsegments, parent):
- ret = []
- for i in collection:
- if fnmatch.fnmatch(i, patternsegments[0]):
- cur = os.path.join(parent, i)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[i], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_path(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_path(fn)
- return c.open(rest, mode)
-
- def exists(self, fn):
- collection, rest = self.get_path(fn)
- return c.exists(rest)
-
def done(self, record):
- outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+ outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
if record["state"] == "Complete":
processStatus = "success"
@@ -112,6 +112,31 @@ class ArvadosJob(object):
self.output_callback(outputs, processStatus)
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ def __init__(self, arvrunner, referenced_files, basedir):
+ self._pathmap = {}
+ uploadfiles = []
+
+ pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+ for src in referenced_files:
+ ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+ st = arvados.commands.run.statfile("", ab)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.append((src, ab, st))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = (ab, st.fn)
+ elif isinstance(st, basestring) and pdh_path.match(st):
+ self._pathmap[src] = (st, "$(file %s") % st)
+ else:
+ workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+ arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+
+ for src, ab, st in uploadfiles:
+ self._pathmap[src] = (ab, st.fn)
+
+
class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def __init__(self, arvrunner, toolpath_object, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
@@ -120,27 +145,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
def makeJobRunner(self):
return ArvadosJob(self.arvrunner)
- class ArvPathMapper(cwltool.pathmapper.PathMapper):
- def __init__(self, referenced_files, basedir):
- self._pathmap = {}
- uploadfiles = []
-
- pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
-
- for src in referenced_files:
- ab = src if os.path.isabs(src) else os.path.join(basedir, src)
- st = arvados.commands.run.statfile("", ab)
- if isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.append((src, ab))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- elif isinstance(st, basestring) and pdh_path.match(st):
- self._pathmap[src] = (st, "$(file %s") % st)
- else:
- workflow.WorkflowException("Input file path '%s' is invalid", st)
-
def makePathMapper(self, reffiles, input_basedir):
- pass
+ return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+
class ArvCwlRunner(object):
def __init__(self, api_client):
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index c303ef6..6b929ac 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -101,7 +101,7 @@ def statfile(prefix, fn):
return prefix+fn
-def uploadfiles(files):
+def uploadfiles(files, api):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file pathes no longer have a common parent.
@@ -237,8 +237,8 @@ def main(arguments=None):
n = True
pathprefix = "/"
files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
- if len(files) > 0:
- uploadfiles(files)
+ if files:
+ uploadfiles(files, api)
for i in xrange(1, len(slots)):
slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
commit 8f10d54cf1d1b03d566366bed3a04ceaa315dd5b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 29 14:10:54 2015 -0400
6264: Arvados CWL runner prototype work in progress.
diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
new file mode 100644
index 0000000..9db55ee
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+
+import argparse
+import arvados
+import arvados.events
+import arvados.commands.keepdocker
+import arvados.commands.run
+import cwltool.draft2tool
+import cwltool.workflow
+import cwltool.main
+import threading
+import cwltool.docker
+import fnmatch
+
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+ if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+ dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+ sp = dockerRequirement["dockerImageId"].split(":")
+ image_name = sp[0]
+ image_tag = sp[1] if len(sp) > 1 else None
+
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag)
+
+ if not images:
+ imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+ arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
+
+ return dockerRequirement["dockerImageId"]
+
+
+class ArvadosJob(object):
+ def __init__(self, runner):
+ self.arvrunner = runner
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ script_parameters = {
+ "command": self.command_line
+ }
+ runtime_constraints = {}
+
+ if self.stdin:
+ command["stdin"] = self.stdin
+
+ if self.stdout:
+ command["stdout"] = self.stdout
+
+ (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ if docker_req and kwargs.get("use_container") is not False:
+ runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
+ runtime_constraints["arvados_sdk_version"] = "master"
+
+ response = self.arvrunner.api.jobs().create(body={
+ "script": "run-command",
+ "repository": "arvados",
+ "script_version": "master",
+ "script_parameters": script_parameters,
+ "runtime_constraints": runtime_constraints
+ }).execute()
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ class CollectionFsAccess(object):
+ def __init__(self):
+ self.collections = {}
+
+ def get_collection(self, path):
+ p = path.split("/")
+ if p[0] == "keep":
+ del p[0]
+ if p[0] not in self.collections:
+ self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+ return (self.collections[p[0]], "/".join(p[1:]))
+
+ def _match(self, collection, patternsegments, parent):
+ ret = []
+ for i in collection:
+ if fnmatch.fnmatch(i, patternsegments[0]):
+ cur = os.path.join(parent, i)
+ if len(patternsegments) == 1:
+ ret.append(cur)
+ else:
+ ret.extend(self._match(collection[i], patternsegments[1:], cur))
+ return ret
+
+ def glob(self, pattern):
+ collection, rest = self.get_path(pattern)
+ patternsegments = rest.split("/")
+ return self._match(collection, patternsegments, collection.manifest_locator())
+
+ def open(self, fn, mode):
+ collection, rest = self.get_path(fn)
+ return c.open(rest, mode)
+
+ def exists(self, fn):
+ collection, rest = self.get_path(fn)
+ return c.exists(rest)
+
+ def done(self, record):
+ outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ self.output_callback(outputs, processStatus)
+
+
+class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+ def __init__(self, arvrunner, toolpath_object, **kwargs):
+ super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+ self.arvrunner = arvrunner
+
+ def makeJobRunner(self):
+ return ArvadosJob(self.arvrunner)
+
+ class ArvPathMapper(cwltool.pathmapper.PathMapper):
+ def __init__(self, referenced_files, basedir):
+ self._pathmap = {}
+ uploadfiles = []
+
+ pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+ for src in referenced_files:
+ ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+ st = arvados.commands.run.statfile("", ab)
+ if isinstance(st, arvados.commands.run.UploadFile):
+ uploadfiles.append((src, ab))
+ elif isinstance(st, arvados.commands.run.ArvFile):
+ self._pathmap[src] = (ab, st.fn)
+ elif isinstance(st, basestring) and pdh_path.match(st):
+ self._pathmap[src] = (st, "$(file %s") % st)
+ else:
+ workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+ def makePathMapper(self, reffiles, input_basedir):
+ pass
+
+class ArvCwlRunner(object):
+ def __init__(self, api_client):
+ self.api = api_client
+ self.jobs = {}
+ self.lock = threading.Lock()
+ self.cond = threading.Condition(self.lock)
+ self.final_output = None
+
+ def arvMakeTool(self, toolpath_object, **kwargs):
+ if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+ return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ else:
+ return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+
+ def output_callback(out, processStatus):
+ if processStatus == "success":
+ _logger.info("Overall job status is %s", processStatus)
+ else:
+ _logger.warn("Overall job status is %s", processStatus)
+ self.final_output = out
+
+ def on_message(self, event):
+ if "object_uuid" in event:
+ if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+ if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ try:
+ self.cond.acquire()
+ self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+ self.cond.notify()
+ finally:
+ self.cond.release()
+
+ def arvExecutor(self, t, job_order, input_basedir, **kwargs):
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
+
+ if kwargs.get("conformance_test"):
+ return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
+ else:
+ jobiter = t.job(job_order,
+ input_basedir,
+ self.output_callback,
+ **kwargs)
+
+ for r in jobiter:
+ if r:
+ with self.lock:
+ r.run(**kwargs)
+ else:
+ if self.jobs:
+ try:
+ self.cond.acquire()
+ self.cond.wait()
+ finally:
+ self.cond.release()
+ else:
+ raise workflow.WorkflowException("Workflow deadlocked.")
+
+ while self.jobs:
+ try:
+ self.cond.acquire()
+ self.cond.wait()
+ finally:
+ self.cond.release()
+
+ events.close()
+
+ if self.final_output is None:
+ raise workflow.WorkflowException("Workflow did not return a result.")
+
+ return self.final_output
+
+
+def main(args, stdout, stderr, api_client=None):
+ runner = ArvCwlRunner(api_client=arvados.api('v1'))
+ args.append("--leave-outputs")
+ return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool)
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index f2bf0f3..c303ef6 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -14,6 +14,7 @@ import logging
import arvados.commands._util as arv_cmd
logger = logging.getLogger('arvados.arv-run')
+logger.setLevel(logging.INFO)
arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
@@ -100,6 +101,61 @@ def statfile(prefix, fn):
return prefix+fn
+def uploadfiles(files):
+ # Find the smallest path prefix that includes all the files that need to be uploaded.
+ # This starts at the root and iteratively removes common parent directory prefixes
+ # until all file pathes no longer have a common parent.
+ while n:
+ pathstep = None
+ for c in files:
+ if pathstep is None:
+ sp = c.fn.split('/')
+ if len(sp) < 2:
+ # no parent directories left
+ n = False
+ break
+ # path step takes next directory
+ pathstep = sp[0] + "/"
+ else:
+ # check if pathstep is common prefix for all files
+ if not c.fn.startswith(pathstep):
+ n = False
+ break
+ if n:
+ # pathstep is common parent directory for all files, so remove the prefix
+ # from each path
+ pathprefix += pathstep
+ for c in files:
+ c.fn = c.fn[len(pathstep):]
+
+ orgdir = os.getcwd()
+ os.chdir(pathprefix)
+
+ logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
+
+ if args.dry_run:
+ logger.info("$(input) is %s", pathprefix.rstrip('/'))
+ pdh = "$(input)"
+ else:
+ files = sorted(files, key=lambda x: x.fn)
+ collection = arvados.CollectionWriter(api, num_retries=args.retries)
+ stream = None
+ for f in files:
+ sp = os.path.split(f.fn)
+ if sp[0] != stream:
+ stream = sp[0]
+ collection.start_new_stream(stream)
+ collection.write_file(f.fn, sp[1])
+ item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+ pdh = item["portable_data_hash"]
+ logger.info("Uploaded to %s", item["uuid"])
+
+ for c in files:
+ c.fn = "$(file %s/%s)" % (pdh, c.fn)
+
+ os.chdir(orgdir)
+
+
def main(arguments=None):
args = arvrun_parser.parse_args(arguments)
@@ -182,58 +238,7 @@ def main(arguments=None):
pathprefix = "/"
files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
if len(files) > 0:
- # Find the smallest path prefix that includes all the files that need to be uploaded.
- # This starts at the root and iteratively removes common parent directory prefixes
- # until all file pathes no longer have a common parent.
- while n:
- pathstep = None
- for c in files:
- if pathstep is None:
- sp = c.fn.split('/')
- if len(sp) < 2:
- # no parent directories left
- n = False
- break
- # path step takes next directory
- pathstep = sp[0] + "/"
- else:
- # check if pathstep is common prefix for all files
- if not c.fn.startswith(pathstep):
- n = False
- break
- if n:
- # pathstep is common parent directory for all files, so remove the prefix
- # from each path
- pathprefix += pathstep
- for c in files:
- c.fn = c.fn[len(pathstep):]
-
- orgdir = os.getcwd()
- os.chdir(pathprefix)
-
- print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files]))
-
- if args.dry_run:
- print("$(input) is %s" % pathprefix.rstrip('/'))
- pdh = "$(input)"
- else:
- files = sorted(files, key=lambda x: x.fn)
- collection = arvados.CollectionWriter(api, num_retries=args.retries)
- stream = None
- for f in files:
- sp = os.path.split(f.fn)
- if sp[0] != stream:
- stream = sp[0]
- collection.start_new_stream(stream)
- collection.write_file(f.fn, sp[1])
- item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
- pdh = item["portable_data_hash"]
- print "Uploaded to %s" % item["uuid"]
-
- for c in files:
- c.fn = "$(file %s/%s)" % (pdh, c.fn)
-
- os.chdir(orgdir)
+ uploadfiles(files)
for i in xrange(1, len(slots)):
slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
@@ -298,7 +303,7 @@ def main(arguments=None):
else:
pipeline["owner_uuid"] = project
pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
- print "Running pipeline %s" % pi["uuid"]
+ logger.info("Running pipeline %s", pi["uuid"])
if args.local:
subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
@@ -306,11 +311,11 @@ def main(arguments=None):
ws.main(["--pipeline", pi["uuid"]])
pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
- print "Pipeline is %s" % pi["state"]
+ logger.info("Pipeline is %s", pi["state"])
if "output_uuid" in pi["components"]["command"]:
- print "Output is %s" % pi["components"]["command"]["output_uuid"]
+ logger.info("Output is %s", pi["components"]["command"]["output_uuid"])
else:
- print "No output"
+ logger.info("No output")
if __name__ == '__main__':
main()
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
new file mode 100644
index 0000000..7008ae4
--- /dev/null
+++ b/sdk/python/bin/cwl-runner
@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+
+import sys
+
+from arvados.commands.cwl_runner import main
+
+sys.exit(main(sys.argv[1:], sys.stdout, sys.stderr))
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 5c0b09d..6288eca 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -33,7 +33,8 @@ setup(name='arvados-python-client',
'bin/arv-normalize',
'bin/arv-put',
'bin/arv-run',
- 'bin/arv-ws'
+ 'bin/arv-ws',
+ 'bin/cwl-runner'
],
install_requires=[
'ciso8601',
@@ -41,7 +42,8 @@ setup(name='arvados-python-client',
'httplib2',
'pycurl>=7.19.5.1',
'python-gflags',
- 'ws4py'
+ 'ws4py',
+ 'cwltool'
],
test_suite='tests',
tests_require=['mock>=1.0', 'PyYAML'],
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list