[ARVADOS] created: 86af9b2ebc9240e9041870786e87021c94f11eea

Git user git at public.curoverse.com
Mon Aug 29 15:25:51 EDT 2016


        at  86af9b2ebc9240e9041870786e87021c94f11eea (commit)


commit 86af9b2ebc9240e9041870786e87021c94f11eea
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Aug 26 17:11:50 2016 -0400

    9766: Supports packing workflow, setting defaults, uploading dependent files/docker images, creating/updating workflow record.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0d0d416..80734d2 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -23,6 +23,7 @@ from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
@@ -172,6 +173,9 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -335,6 +339,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, help="Update Arvados workflow.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -361,7 +367,7 @@ def main(args, stdout, stderr, api_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     try:
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
new file mode 100644
index 0000000..9a649eb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -0,0 +1,37 @@
+import os
+import json
+
+from cwltool.pack import pack
+from cwltool.load_tool import fetch_document
+from cwltool.process import shortname
+
+from .runner import upload_docker, upload_dependencies
+
+def make_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
+    upload_docker(arvRunner, tool)
+
+    document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
+    print workflowobj["steps"]
+    for inp in workflowobj["inputs"]:
+        sn = shortname(inp["id"])
+        if sn in job_order:
+            inp["default"] = job_order[sn]
+
+    packed = pack(document_loader, workflowobj, uri, tool.metadata)
+
+    name = os.path.basename(tool.tool["id"])
+    upload_dependencies(arvRunner, name, document_loader,
+                        packed, uri, "keep:", False)
+
+    body = {
+        "workflow": {
+            "owner_uuid": project_uuid,
+            "name": tool.tool.get("label", name),
+            "description": tool.tool.get("doc", ""),
+            "workflow":json.dumps(packed, sort_keys=True, indent=4)
+        }}
+
+    if update_uuid:
+        return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+    else:
+        return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index d7d5d2b..315be0c 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -21,6 +21,59 @@ logger = logging.getLogger('arvados.cwl-runner')
 
 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
 
+def upload_dependencies(arvrunner, name, document_loader,
+                        workflowobj, uri, keepprefix, loadref_run):
+    loaded = set()
+    def loadref(b, u):
+        joined = urlparse.urljoin(b, u)
+        if joined not in loaded:
+            loaded.add(joined)
+            return document_loader.fetch(urlparse.urljoin(b, u))
+        else:
+            return {}
+
+    if loadref_run:
+        loadref_fields = set(("$import", "run"))
+    else:
+        loadref_fields = set(("$import",))
+
+    sc = scandeps(uri, workflowobj,
+                  loadref_fields,
+                  set(("$include", "$schemas", "path", "location")),
+                  loadref)
+
+    files = []
+    def visitFiles(path):
+        files.append(path)
+
+    adjustFileObjs(sc, visitFiles)
+    adjustDirObjs(sc, visitFiles)
+
+    normalizeFilesDirs(files)
+
+    mapper = ArvPathMapper(arvrunner, files, "",
+                           keepprefix+"%s",
+                           keepprefix+"%s/%s",
+                           name=name)
+
+    def setloc(p):
+        p["location"] = mapper.mapper(p["location"]).target
+    adjustFileObjs(workflowobj, setloc)
+    adjustDirObjs(workflowobj, setloc)
+
+    return mapper
+
+
+def upload_docker(arvrunner, tool):
+    if isinstance(tool, CommandLineTool):
+        (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+        if docker_req:
+            arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+    elif isinstance(tool, cwltool.workflow.Workflow):
+        for s in tool.steps:
+            upload_docker(arvrunner, s.embedded_tool)
+
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
@@ -33,67 +86,26 @@ class Runner(object):
     def update_pipeline_component(self, record):
         pass
 
-    def upload_docker(self, tool):
-        if isinstance(tool, CommandLineTool):
-            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
-            if docker_req:
-                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
-        elif isinstance(tool, cwltool.workflow.Workflow):
-            for s in tool.steps:
-                self.upload_docker(s.embedded_tool)
-
-
     def arvados_job_spec(self, *args, **kwargs):
-        self.upload_docker(self.tool)
-
-        workflowfiles = []
-        jobfiles = []
-        workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
+        upload_docker(self.arvrunner, self.tool)
 
         self.name = os.path.basename(self.tool.tool["id"])
 
-        def visitFiles(files, path):
-            files.append(path)
-
-        document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
-        loaded = set()
-        def loadref(b, u):
-            joined = urlparse.urljoin(b, u)
-            if joined not in loaded:
-                loaded.add(joined)
-                return document_loader.fetch(urlparse.urljoin(b, u))
-            else:
-                return {}
-
-        sc = scandeps(uri, workflowobj,
-                      set(("$import", "run")),
-                      set(("$include", "$schemas", "path", "location")),
-                      loadref)
-        adjustFileObjs(sc, partial(visitFiles, workflowfiles))
-        adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
-        adjustDirObjs(sc, partial(visitFiles, workflowfiles))
-        adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
-
-        normalizeFilesDirs(jobfiles)
-        normalizeFilesDirs(workflowfiles)
-
-        keepprefix = kwargs.get("keepprefix", "")
-        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
-                                       keepprefix+"%s",
-                                       keepprefix+"%s/%s",
-                                       name=self.name,
-                                       **kwargs)
-
-        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
-                                  keepprefix+"%s",
-                                  keepprefix+"%s/%s",
-                                  name=os.path.basename(self.job_order.get("id", "#")),
-                                  **kwargs)
-
-        def setloc(p):
-            p["location"] = jobmapper.mapper(p["location"])[1]
-        adjustFileObjs(self.job_order, setloc)
-        adjustDirObjs(self.job_order, setloc)
+        workflowmapper = upload_dependencies(self.arvrunner,
+                                             self.name,
+                                             self.tool.doc_loader,
+                                             self.tool.tool,
+                                             self.tool.tool["id"],
+                                             kwargs.get("keepprefix", ""),
+                                             True)
+
+        jobmapper = upload_dependencies(self.arvrunner,
+                                        os.path.basename(self.job_order.get("id", "#")),
+                                        self.tool.doc_loader,
+                                        self.job_order,
+                                        self.job_order.get("id", "#"),
+                                        kwargs.get("keepprefix", ""),
+                                        False)
 
         if "id" in self.job_order:
             del self.job_order["id"]
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index e22a74f..7ff823a 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -32,7 +32,7 @@ setup(name='arvados-cwl-runner',
       # Make sure to update arvados/build/run-build-packages.sh as well
       # when updating the cwltool version pin.
       install_requires=[
-          'cwltool==1.0.20160811184335',
+          'cwltool==1.0.20160829192223',
           'arvados-python-client>=0.1.20160714204738',
       ],
       data_files=[

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list