[ARVADOS] updated: 39df0227516440638035168f5f89df4634f37e1a

Git user git at public.curoverse.com
Mon Mar 21 21:43:32 EDT 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 230 ++++++++++++++++++++++++----------------
 1 file changed, 136 insertions(+), 94 deletions(-)

       via  39df0227516440638035168f5f89df4634f37e1a (commit)
      from  e1655f12ed1cfec6b5763e1db81e7ec8dd0f7a14 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 39df0227516440638035168f5f89df4634f37e1a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Mon Mar 21 21:43:25 2016 -0400

    8654: Support waiting for submitted runner job to complete

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index d534abd..82d7eb9 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -20,6 +20,7 @@ import re
 import os
 import sys
 import functools
+import json
 
 from cwltool.process import get_feature, adjustFiles, scandeps
 from arvados.api import OrderedJsonModel
@@ -248,6 +249,100 @@ class ArvadosJob(object):
             del self.arvrunner.jobs[record["uuid"]]
 
 
+class RunnerJob(object):
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.arvrunner = runner
+        self.tool = tool
+        self.job_order = job_order
+        self.running = False
+        self.enable_reuse = enable_reuse
+
+    def update_pipeline_component(self, record):
+        pass
+
+    def upload_docker(self, tool):
+        if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+            if docker_req:
+                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+        elif isinstance(tool, cwltool.workflow.Workflow):
+            for s in tool.steps:
+                self.upload_docker(s.embedded_tool)
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        self.upload_docker(self.tool)
+
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(self.tool.tool["id"])
+
+        self.name = os.path.basename(self.tool.tool["id"])
+
+        def visitFiles(files, path):
+            files.add(path)
+            return path
+
+        document_loader, _, _ = cwltool.process.get_schema()
+        def loadref(b, u):
+            return document_loader.resolve_ref(u, base_url=b)[0]
+
+        adjustFiles(scandeps("", self.tool.tool,
+                             set(("run",)),
+                             set(("$schemas", "path")),
+                             loadref),
+                    functools.partial(visitFiles, workflowfiles))
+        adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+
+        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=self.name,
+                                       **kwargs)
+
+        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(self.job_order.get("id", "#")),
+                                  **kwargs)
+
+        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+        if "id" in self.job_order:
+            del self.job_order["id"]
+
+        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+
+        response = self.arvrunner.api.jobs().create(body={
+            "script": "cwl-runner",
+            "script_version": "8654-arv-jobs-cwl-runner",
+            "repository": "arvados",
+            "script_parameters": self.job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+
+        self.arvrunner.jobs[response["uuid"]] = self
+
+        logger.info("Submitted job %s", response["uuid"])
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+
+    def done(self, record):
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+
+        outc = arvados.collection.Collection(record["output"])
+        with outc.open("cwl.output.json") as f:
+            outputs = json.load(f)
+
+        self.arvrunner.output_callback(outputs, processStatus)
+
+        del self.arvrunner.jobs[record["uuid"]]
+
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
     def __init__(self, arvrunner, referenced_files, basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
@@ -329,13 +424,15 @@ class ArvCwlRunner(object):
     def output_callback(self, out, processStatus):
         if processStatus == "success":
             logger.info("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
 
         else:
             logger.warn("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
         self.final_output = out
 
 
@@ -366,80 +463,7 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
-    def upload_docker(self, tool):
-        if isinstance(tool, cwltool.draft2tool.CommandLineTool):
-            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
-            if docker_req:
-                arv_docker_get_image(self.api, docker_req, True, self.project_uuid)
-        elif isinstance(tool, cwltool.workflow.Workflow):
-            for s in tool.steps:
-                self.upload_docker(s.embedded_tool)
-
-    def submit(self, tool, job_order, input_basedir, args, **kwargs):
-        self.upload_docker(tool)
-
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(tool.tool["id"])
-
-        def visitFiles(files, path):
-            files.add(path)
-            return path
-
-        document_loader, _, _ = cwltool.process.get_schema()
-        def loadref(b, u):
-            return document_loader.resolve_ref(u, base_url=b)[0]
-
-        adjustFiles(scandeps("", tool.tool,
-                             set(("run",)),
-                             set(("$schemas", "path")),
-                             loadref),
-                    functools.partial(visitFiles, workflowfiles))
-        adjustFiles(job_order, functools.partial(visitFiles, jobfiles))
-
-        workflowmapper = ArvPathMapper(self, workflowfiles, "",
-                                       "%s",
-                                       "%s/%s",
-                                       name=os.path.basename(tool.tool["id"]),
-                                       **kwargs)
-
-        jobmapper = ArvPathMapper(self, jobfiles, "",
-                                  "%s",
-                                  "%s/%s",
-                                  name=os.path.basename(job_order.get("id", "#")),
-                                  **kwargs)
-
-        adjustFiles(job_order, lambda p: jobmapper.mapper(p)[1])
-
-        if "id" in job_order:
-            del job_order["id"]
-
-        job_order["cwl:tool"] = workflowmapper.mapper(tool.tool["id"])[1]
-
-        response = self.api.jobs().create(body={
-            "script": "cwl-runner",
-            "script_version": "8654-arv-jobs-cwl-runner",
-            "repository": "arvados",
-            "script_parameters": job_order,
-            "runtime_constraints": {
-                "docker_image": "arvados/jobs"
-            }
-        }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
-
-        logger.info("Submitted job %s", response["uuid"])
-
-        return
-
-
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
-        if args.submit:
-            return self.submit(tool, job_order, input_basedir, args, **kwargs)
-
-        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-
         self.debug = args.debug
 
         try:
@@ -457,6 +481,18 @@ class ArvCwlRunner(object):
 
                 col.save_new("crunchrunner binary", ensure_unique_name=True)
 
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+        self.pipeline = None
+
+        if args.submit:
+            runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
+            if not args.wait:
+                runnerjob.run()
+                return
+
+        events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -468,24 +504,27 @@ class ArvCwlRunner(object):
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            components = {}
-            if "cwl_runner_job" in kwargs:
-                components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
+            if args.submit:
+                jobiter = iter((runnerjob,))
+            else:
+                components = {}
+                if "cwl_runner_job" in kwargs:
+                    components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
 
-            self.pipeline = self.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
-                    "components": components,
-                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+                self.pipeline = self.api.pipeline_instances().create(
+                    body={
+                        "owner_uuid": self.project_uuid,
+                        "name": shortname(tool.tool["id"]),
+                        "components": components,
+                        "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+                logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-            jobiter = tool.job(job_order,
-                               input_basedir,
-                               self.output_callback,
-                               docker_outdir="$(task.outdir)",
-                               **kwargs)
+                jobiter = tool.job(job_order,
+                                   input_basedir,
+                                   self.output_callback,
+                                   docker_outdir="$(task.outdir)",
+                                   **kwargs)
 
             try:
                 self.cond.acquire()
@@ -517,8 +556,9 @@ class ArvCwlRunner(object):
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
             finally:
                 self.cond.release()
 
@@ -538,6 +578,8 @@ def main(args, stdout, stderr, api_client=None):
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
     parser.add_argument("--submit", action="store_true", help="Submit job and print job uuid.",
                         default=False)
+    parser.add_argument("--wait", action="store_true", help="Wait for completion after submitting cwl-runner job.",
+                        default=False)
 
     try:
         if api_client is None:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list