[ARVADOS] updated: a262ca9978c9faef9f0363f94c4166a635d81f5f

Git user git at public.curoverse.com
Wed Jun 15 14:51:21 EDT 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 47 +++++++++++++++++++++++------------------
 sdk/cwl/arvados_cwl/runner.py   |  9 +++++++-
 2 files changed, 35 insertions(+), 21 deletions(-)

       via  a262ca9978c9faef9f0363f94c4166a635d81f5f (commit)
      from  d58bfc39208835e745f34d3c1f05fcc09f7db99f (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit a262ca9978c9faef9f0363f94c4166a635d81f5f
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jun 15 14:51:14 2016 -0400

    8442: Submit CommandLineTool containers directly without an intermediate
    workflow runner.  Handle unsupported feature gracefully.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 050b7b9..e14eb4b 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -37,6 +37,7 @@ class ArvCwlRunner(object):
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
+        self.final_status = None
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
@@ -106,9 +107,29 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
+        self.debug = kwargs.get("debug")
+        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+        self.fs_access = CollectionFsAccess(kwargs["basedir"])
+
+        kwargs["fs_access"] = self.fs_access
+        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+
+        if self.crunch2:
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
+
+        runnerjob = None
         if kwargs.get("submit"):
             if self.crunch2:
-                runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+                if tool.tool["class"] == "CommandLineTool":
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         **kwargs).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
             else:
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
@@ -122,30 +143,16 @@ class ArvCwlRunner(object):
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if kwargs.get("submit") and not kwargs.get("wait"):
-                runnerjob.run()
-                return runnerjob.uuid
+        if runnerjob and not kwargs.get("wait"):
+            runnerjob.run()
+            return runnerjob.uuid
 
         if self.crunch2:
             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
         else:
             events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        self.debug = kwargs.get("debug")
-        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.fs_access = CollectionFsAccess(kwargs["basedir"])
-
-        kwargs["fs_access"] = self.fs_access
-        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-
-        if self.crunch2:
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["tmpdir"] = "/tmp"
-        else:
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
-
-        if kwargs.get("submit"):
+        if runnerjob:
             jobiter = iter((runnerjob,))
         else:
             if "cwl_runner_job" in kwargs:
@@ -185,7 +192,7 @@ class ArvCwlRunner(object):
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and self.crunch2:
+            if runnerjob and runnerjob.uuid and self.crunch2:
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 002c0ca..639426a 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -23,6 +23,7 @@ class Runner(object):
         self.job_order = job_order
         self.running = False
         self.enable_reuse = enable_reuse
+        self.uuid = None
 
     def update_pipeline_component(self, record):
         pass
@@ -51,8 +52,14 @@ class Runner(object):
             return path
 
         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
+        loaded = set()
         def loadref(b, u):
-            return document_loader.fetch(urlparse.urljoin(b, u))
+            joined = urlparse.urljoin(b, u)
+            if joined not in loaded:
+                loaded.add(joined)
+                return document_loader.fetch(urlparse.urljoin(b, u))
+            else:
+                return {}
 
         sc = scandeps(uri, workflowobj,
                       set(("$import", "run")),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list