[ARVADOS] created: c5e5db25e95289376a0df35695ab2b5f48131b1f

git at public.curoverse.com git at public.curoverse.com
Mon Jun 29 16:14:21 EDT 2015


        at  c5e5db25e95289376a0df35695ab2b5f48131b1f (commit)


commit c5e5db25e95289376a0df35695ab2b5f48131b1f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 29 16:14:43 2015 -0400

    6264: First pass complete, ready for testing.

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
index 9db55ee..3799507 100644
--- a/sdk/python/arvados/commands/cwl_runner.py
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -33,6 +33,42 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
 
     return dockerRequirement["dockerImageId"]
 
+class CollectionFsAccess(object):
+    def __init__(self):
+        self.collections = {}
+
+    def get_collection(self, path):
+        p = path.split("/")
+        if p[0] == "keep":
+            del p[0]
+        if p[0] not in self.collections:
+            self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+        return (self.collections[p[0]], "/".join(p[1:]))
+
+    def _match(self, collection, patternsegments, parent):
+        ret = []
+        for i in collection:
+            if fnmatch.fnmatch(i, patternsegments[0]):
+                cur = os.path.join(parent, i)
+                if len(patternsegments) == 1:
+                    ret.append(cur)
+                else:
+                    ret.extend(self._match(collection[i], patternsegments[1:], cur))
+        return ret
+
+    def glob(self, pattern):
+        collection, rest = self.get_path(pattern)
+        patternsegments = rest.split("/")
+        return self._match(collection, patternsegments, collection.manifest_locator())
+
+    def open(self, fn, mode):
+        collection, rest = self.get_path(fn)
+        return c.open(rest, mode)
+
+    def exists(self, fn):
+        collection, rest = self.get_path(fn)
+        return c.exists(rest)
+
 
 class ArvadosJob(object):
     def __init__(self, runner):
@@ -65,44 +101,8 @@ class ArvadosJob(object):
 
         self.arvrunner.jobs[response["uuid"]] = self
 
-    class CollectionFsAccess(object):
-        def __init__(self):
-            self.collections = {}
-
-        def get_collection(self, path):
-            p = path.split("/")
-            if p[0] == "keep":
-                del p[0]
-            if p[0] not in self.collections:
-                self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
-            return (self.collections[p[0]], "/".join(p[1:]))
-
-        def _match(self, collection, patternsegments, parent):
-            ret = []
-            for i in collection:
-                if fnmatch.fnmatch(i, patternsegments[0]):
-                    cur = os.path.join(parent, i)
-                    if len(patternsegments) == 1:
-                        ret.append(cur)
-                    else:
-                        ret.extend(self._match(collection[i], patternsegments[1:], cur))
-            return ret
-
-        def glob(self, pattern):
-            collection, rest = self.get_path(pattern)
-            patternsegments = rest.split("/")
-            return self._match(collection, patternsegments, collection.manifest_locator())
-
-        def open(self, fn, mode):
-            collection, rest = self.get_path(fn)
-            return c.open(rest, mode)
-
-        def exists(self, fn):
-            collection, rest = self.get_path(fn)
-            return c.exists(rest)
-
     def done(self, record):
-        outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+        outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
 
         if record["state"] == "Complete":
             processStatus = "success"
@@ -112,6 +112,31 @@ class ArvadosJob(object):
         self.output_callback(outputs, processStatus)
 
 
+class ArvPathMapper(cwltool.pathmapper.PathMapper):
+    def __init__(self, arvrunner, referenced_files, basedir):
+        self._pathmap = {}
+        uploadfiles = []
+
+        pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+        for src in referenced_files:
+            ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+            st = arvados.commands.run.statfile("", ab)
+            if isinstance(st, arvados.commands.run.UploadFile):
+                uploadfiles.append((src, ab, st))
+            elif isinstance(st, arvados.commands.run.ArvFile):
+                self._pathmap[src] = (ab, st.fn)
+            elif isinstance(st, basestring) and pdh_path.match(st):
+                self._pathmap[src] = (st, "$(file %s") % st)
+            else:
+                workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+        arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
+
+        for src, ab, st in uploadfiles:
+            self._pathmap[src] = (ab, st.fn)
+
+
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def __init__(self, arvrunner, toolpath_object, **kwargs):
         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
@@ -120,27 +145,9 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
     def makeJobRunner(self):
         return ArvadosJob(self.arvrunner)
 
-    class ArvPathMapper(cwltool.pathmapper.PathMapper):
-        def __init__(self, referenced_files, basedir):
-            self._pathmap = {}
-            uploadfiles = []
-
-            pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
-
-            for src in referenced_files:
-                ab = src if os.path.isabs(src) else os.path.join(basedir, src)
-                st = arvados.commands.run.statfile("", ab)
-                if isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab))
-                elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
-                elif isinstance(st, basestring) and pdh_path.match(st):
-                    self._pathmap[src] = (st, "$(file %s") % st)
-                else:
-                    workflow.WorkflowException("Input file path '%s' is invalid", st)
-
     def makePathMapper(self, reffiles, input_basedir):
-        pass
+        return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
+
 
 class ArvCwlRunner(object):
     def __init__(self, api_client):
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index c303ef6..6b929ac 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -101,7 +101,7 @@ def statfile(prefix, fn):
 
     return prefix+fn
 
-def uploadfiles(files):
+def uploadfiles(files, api):
     # Find the smallest path prefix that includes all the files that need to be uploaded.
     # This starts at the root and iteratively removes common parent directory prefixes
     # until all file pathes no longer have a common parent.
@@ -237,8 +237,8 @@ def main(arguments=None):
     n = True
     pathprefix = "/"
     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
-    if len(files) > 0:
-        uploadfiles(files)
+    if files:
+        uploadfiles(files, api)
 
     for i in xrange(1, len(slots)):
         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]

commit 8f10d54cf1d1b03d566366bed3a04ceaa315dd5b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Jun 29 14:10:54 2015 -0400

    6264: Arvados CWL runner prototype work in progress.

diff --git a/sdk/python/arvados/commands/cwl_runner.py b/sdk/python/arvados/commands/cwl_runner.py
new file mode 100644
index 0000000..9db55ee
--- /dev/null
+++ b/sdk/python/arvados/commands/cwl_runner.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+
+import argparse
+import arvados
+import arvados.events
+import arvados.commands.keepdocker
+import arvados.commands.run
+import cwltool.draft2tool
+import cwltool.workflow
+import cwltool.main
+import threading
+import cwltool.docker
+import fnmatch
+
+logger = logging.getLogger('arvados.cwl-runner')
+logger.setLevel(logging.INFO)
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+    if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+        dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+    sp = dockerRequirement["dockerImageId"].split(":")
+    image_name = sp[0]
+    image_tag = sp[1] if len(sp) > 1 else None
+
+    images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+                                                            image_name=image_name,
+                                                            image_tag=image_tag)
+
+    if not images:
+        imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+        arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
+
+    return dockerRequirement["dockerImageId"]
+
+
+class ArvadosJob(object):
+    def __init__(self, runner):
+        self.arvrunner = runner
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        script_parameters = {
+            "command": self.command_line
+        }
+        runtime_constraints = {}
+
+        if self.stdin:
+            command["stdin"] = self.stdin
+
+        if self.stdout:
+            command["stdout"] = self.stdout
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if docker_req and kwargs.get("use_container") is not False:
+            runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
+            runtime_constraints["arvados_sdk_version"] = "master"
+
+        response = self.arvrunner.api.jobs().create(body={
+            "script": "run-command",
+            "repository": "arvados",
+            "script_version": "master",
+            "script_parameters": script_parameters,
+            "runtime_constraints": runtime_constraints
+        }).execute()
+
+        self.arvrunner.jobs[response["uuid"]] = self
+
+    class CollectionFsAccess(object):
+        def __init__(self):
+            self.collections = {}
+
+        def get_collection(self, path):
+            p = path.split("/")
+            if p[0] == "keep":
+                del p[0]
+            if p[0] not in self.collections:
+                self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
+            return (self.collections[p[0]], "/".join(p[1:]))
+
+        def _match(self, collection, patternsegments, parent):
+            ret = []
+            for i in collection:
+                if fnmatch.fnmatch(i, patternsegments[0]):
+                    cur = os.path.join(parent, i)
+                    if len(patternsegments) == 1:
+                        ret.append(cur)
+                    else:
+                        ret.extend(self._match(collection[i], patternsegments[1:], cur))
+            return ret
+
+        def glob(self, pattern):
+            collection, rest = self.get_path(pattern)
+            patternsegments = rest.split("/")
+            return self._match(collection, patternsegments, collection.manifest_locator())
+
+        def open(self, fn, mode):
+            collection, rest = self.get_path(fn)
+            return c.open(rest, mode)
+
+        def exists(self, fn):
+            collection, rest = self.get_path(fn)
+            return c.exists(rest)
+
+    def done(self, record):
+        outputs = self.collect_outputs(record["output"], fs_access=ArvadosJob.CollectionFsAccess())
+
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+
+        self.output_callback(outputs, processStatus)
+
+
+class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+    def __init__(self, arvrunner, toolpath_object, **kwargs):
+        super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
+        self.arvrunner = arvrunner
+
+    def makeJobRunner(self):
+        return ArvadosJob(self.arvrunner)
+
+    class ArvPathMapper(cwltool.pathmapper.PathMapper):
+        def __init__(self, referenced_files, basedir):
+            self._pathmap = {}
+            uploadfiles = []
+
+            pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
+
+            for src in referenced_files:
+                ab = src if os.path.isabs(src) else os.path.join(basedir, src)
+                st = arvados.commands.run.statfile("", ab)
+                if isinstance(st, arvados.commands.run.UploadFile):
+                    uploadfiles.append((src, ab))
+                elif isinstance(st, arvados.commands.run.ArvFile):
+                    self._pathmap[src] = (ab, st.fn)
+                elif isinstance(st, basestring) and pdh_path.match(st):
+                    self._pathmap[src] = (st, "$(file %s") % st)
+                else:
+                    workflow.WorkflowException("Input file path '%s' is invalid", st)
+
+    def makePathMapper(self, reffiles, input_basedir):
+        pass
+
+class ArvCwlRunner(object):
+    def __init__(self, api_client):
+        self.api = api_client
+        self.jobs = {}
+        self.lock = threading.Lock()
+        self.cond = threading.Condition(self.lock)
+        self.final_output = None
+
+    def arvMakeTool(self, toolpath_object, **kwargs):
+        if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
+        else:
+            return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
+
+    def output_callback(out, processStatus):
+        if processStatus == "success":
+            _logger.info("Overall job status is %s", processStatus)
+        else:
+            _logger.warn("Overall job status is %s", processStatus)
+        self.final_output = out
+
+    def on_message(self, event):
+        if "object_uuid" in event:
+                if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+                    if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    try:
+                        self.cond.acquire()
+                        self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
+                        self.cond.notify()
+                    finally:
+                        self.cond.release()
+
+    def arvExecutor(self, t, job_order, input_basedir, **kwargs):
+        events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
+
+        if kwargs.get("conformance_test"):
+            return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
+        else:
+            jobiter = t.job(job_order,
+                            input_basedir,
+                            self.output_callback,
+                            **kwargs)
+
+            for r in jobiter:
+                if r:
+                    with self.lock:
+                        r.run(**kwargs)
+                else:
+                    if self.jobs:
+                        try:
+                            self.cond.acquire()
+                            self.cond.wait()
+                        finally:
+                            self.cond.release()
+                    else:
+                        raise workflow.WorkflowException("Workflow deadlocked.")
+
+            while self.jobs:
+                try:
+                    self.cond.acquire()
+                    self.cond.wait()
+                finally:
+                    self.cond.release()
+
+            events.close()
+
+            if self.final_output is None:
+                raise workflow.WorkflowException("Workflow did not return a result.")
+
+            return self.final_output
+
+
+def main(args, stdout, stderr, api_client=None):
+    runner = ArvCwlRunner(api_client=arvados.api('v1'))
+    args.append("--leave-outputs")
+    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool)
diff --git a/sdk/python/arvados/commands/run.py b/sdk/python/arvados/commands/run.py
index f2bf0f3..c303ef6 100644
--- a/sdk/python/arvados/commands/run.py
+++ b/sdk/python/arvados/commands/run.py
@@ -14,6 +14,7 @@ import logging
 import arvados.commands._util as arv_cmd
 
 logger = logging.getLogger('arvados.arv-run')
+logger.setLevel(logging.INFO)
 
 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
 arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
@@ -100,6 +101,61 @@ def statfile(prefix, fn):
 
     return prefix+fn
 
+def uploadfiles(files):
+    # Find the smallest path prefix that includes all the files that need to be uploaded.
+    # This starts at the root and iteratively removes common parent directory prefixes
+    # until all file pathes no longer have a common parent.
+    while n:
+        pathstep = None
+        for c in files:
+            if pathstep is None:
+                sp = c.fn.split('/')
+                if len(sp) < 2:
+                    # no parent directories left
+                    n = False
+                    break
+                # path step takes next directory
+                pathstep = sp[0] + "/"
+            else:
+                # check if pathstep is common prefix for all files
+                if not c.fn.startswith(pathstep):
+                    n = False
+                    break
+        if n:
+            # pathstep is common parent directory for all files, so remove the prefix
+            # from each path
+            pathprefix += pathstep
+            for c in files:
+                c.fn = c.fn[len(pathstep):]
+
+    orgdir = os.getcwd()
+    os.chdir(pathprefix)
+
+    logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
+
+    if args.dry_run:
+        logger.info("$(input) is %s", pathprefix.rstrip('/'))
+        pdh = "$(input)"
+    else:
+        files = sorted(files, key=lambda x: x.fn)
+        collection = arvados.CollectionWriter(api, num_retries=args.retries)
+        stream = None
+        for f in files:
+            sp = os.path.split(f.fn)
+            if sp[0] != stream:
+                stream = sp[0]
+                collection.start_new_stream(stream)
+            collection.write_file(f.fn, sp[1])
+        item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
+        pdh = item["portable_data_hash"]
+        logger.info("Uploaded to %s", item["uuid"])
+
+    for c in files:
+        c.fn = "$(file %s/%s)" % (pdh, c.fn)
+
+    os.chdir(orgdir)
+
+
 def main(arguments=None):
     args = arvrun_parser.parse_args(arguments)
 
@@ -182,58 +238,7 @@ def main(arguments=None):
     pathprefix = "/"
     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
     if len(files) > 0:
-        # Find the smallest path prefix that includes all the files that need to be uploaded.
-        # This starts at the root and iteratively removes common parent directory prefixes
-        # until all file pathes no longer have a common parent.
-        while n:
-            pathstep = None
-            for c in files:
-                if pathstep is None:
-                    sp = c.fn.split('/')
-                    if len(sp) < 2:
-                        # no parent directories left
-                        n = False
-                        break
-                    # path step takes next directory
-                    pathstep = sp[0] + "/"
-                else:
-                    # check if pathstep is common prefix for all files
-                    if not c.fn.startswith(pathstep):
-                        n = False
-                        break
-            if n:
-                # pathstep is common parent directory for all files, so remove the prefix
-                # from each path
-                pathprefix += pathstep
-                for c in files:
-                    c.fn = c.fn[len(pathstep):]
-
-        orgdir = os.getcwd()
-        os.chdir(pathprefix)
-
-        print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files]))
-
-        if args.dry_run:
-            print("$(input) is %s" % pathprefix.rstrip('/'))
-            pdh = "$(input)"
-        else:
-            files = sorted(files, key=lambda x: x.fn)
-            collection = arvados.CollectionWriter(api, num_retries=args.retries)
-            stream = None
-            for f in files:
-                sp = os.path.split(f.fn)
-                if sp[0] != stream:
-                    stream = sp[0]
-                    collection.start_new_stream(stream)
-                collection.write_file(f.fn, sp[1])
-            item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
-            pdh = item["portable_data_hash"]
-            print "Uploaded to %s" % item["uuid"]
-
-        for c in files:
-            c.fn = "$(file %s/%s)" % (pdh, c.fn)
-
-        os.chdir(orgdir)
+        uploadfiles(files)
 
     for i in xrange(1, len(slots)):
         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
@@ -298,7 +303,7 @@ def main(arguments=None):
     else:
         pipeline["owner_uuid"] = project
         pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
-        print "Running pipeline %s" % pi["uuid"]
+        logger.info("Running pipeline %s", pi["uuid"])
 
         if args.local:
             subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
@@ -306,11 +311,11 @@ def main(arguments=None):
             ws.main(["--pipeline", pi["uuid"]])
 
         pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
-        print "Pipeline is %s" % pi["state"]
+        logger.info("Pipeline is %s", pi["state"])
         if "output_uuid" in pi["components"]["command"]:
-            print "Output is %s" % pi["components"]["command"]["output_uuid"]
+            logger.info("Output is %s", pi["components"]["command"]["output_uuid"])
         else:
-            print "No output"
+            logger.info("No output")
 
 if __name__ == '__main__':
     main()
diff --git a/sdk/python/bin/cwl-runner b/sdk/python/bin/cwl-runner
new file mode 100644
index 0000000..7008ae4
--- /dev/null
+++ b/sdk/python/bin/cwl-runner
@@ -0,0 +1,7 @@
+#!/usr/bin/env python
+
+import sys
+
+from arvados.commands.cwl_runner import main
+
+sys.exit(main(sys.argv[1:], sys.stdout, sys.stderr))
diff --git a/sdk/python/setup.py b/sdk/python/setup.py
index 5c0b09d..6288eca 100644
--- a/sdk/python/setup.py
+++ b/sdk/python/setup.py
@@ -33,7 +33,8 @@ setup(name='arvados-python-client',
           'bin/arv-normalize',
           'bin/arv-put',
           'bin/arv-run',
-          'bin/arv-ws'
+          'bin/arv-ws',
+          'bin/cwl-runner'
       ],
       install_requires=[
           'ciso8601',
@@ -41,7 +42,8 @@ setup(name='arvados-python-client',
           'httplib2',
           'pycurl>=7.19.5.1',
           'python-gflags',
-          'ws4py'
+          'ws4py',
+          'cwltool'
       ],
       test_suite='tests',
       tests_require=['mock>=1.0', 'PyYAML'],

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list