[ARVADOS] updated: 1.1.2-53-g2702b79

Git user git at public.curoverse.com
Tue Jan 16 07:27:46 EST 2018


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py     | 27 ++++++++++--------------
 sdk/cwl/arvados_cwl/arvcontainer.py |  3 ++-
 sdk/cwl/arvados_cwl/arvjob.py       |  7 ++++---
 sdk/cwl/arvados_cwl/arvworkflow.py  |  4 ++--
 sdk/cwl/arvados_cwl/fsaccess.py     |  7 +------
 sdk/cwl/arvados_cwl/runner.py       | 42 ++++++++++++++++++++++++++++++-------
 sdk/cwl/setup.py                    |  2 +-
 7 files changed, 56 insertions(+), 36 deletions(-)

       via  2702b79d8981e562aa9848f41d96bd0a37a278c6 (commit)
      from  f44a15adce692614ecb816dbe2d0205704d9a4ab (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 2702b79d8981e562aa9848f41d96bd0a37a278c6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Mon Jan 15 22:22:39 2018 -0500

    12934: Rewrite file paths in pack instead of using "overrides"
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 6aa4c8f..e82fd9f 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -110,8 +110,7 @@ class ArvCwlRunner(object):
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries,
-                                                overrides=kwargs.get("override_tools"))
+                                                num_retries=self.num_retries)
         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
@@ -366,8 +365,7 @@ class ArvCwlRunner(object):
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        override_tools = {}
-        upload_workflow_deps(self, tool, override_tools)
+        merged_map = upload_workflow_deps(self, tool)
 
         # Reload tool object which may have been updated by
         # upload_workflow_deps
@@ -375,14 +373,7 @@ class ArvCwlRunner(object):
                                   makeTool=self.arv_make_tool,
                                   loader=tool.doc_loader,
                                   avsc_names=tool.doc_schema,
-                                  metadata=tool.metadata,
-                                  override_tools=override_tools)
-
-        tool.doc_loader.fetcher_constructor = partial(CollectionFetcher,
-                                                api_client=self.api,
-                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
-                                                num_retries=self.num_retries,
-                                                overrides=override_tools)
+                                  metadata=tool.metadata)
 
         # Upload local file references in the job order.
         job_order = upload_job_order(self, "%s input" % kwargs["name"],
@@ -396,7 +387,8 @@ class ArvCwlRunner(object):
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                      name=kwargs["name"])
+                                      name=kwargs["name"],
+                                      merged_map=merged_map)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
@@ -405,7 +397,8 @@ class ArvCwlRunner(object):
                                         self.project_uuid,
                                         uuid=existing_uuid,
                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
-                                        name=kwargs["name"]),
+                                        name=kwargs["name"],
+                                        merged_map=merged_map),
                         "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
@@ -443,7 +436,8 @@ class ArvCwlRunner(object):
                                                 name=kwargs.get("name"),
                                                 on_error=kwargs.get("on_error"),
                                                 submit_runner_image=kwargs.get("submit_runner_image"),
-                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"))
+                                                intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
+                                                merged_map=merged_map)
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
                                       self.output_name,
@@ -451,7 +445,8 @@ class ArvCwlRunner(object):
                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
                                       name=kwargs.get("name"),
                                       on_error=kwargs.get("on_error"),
-                                      submit_runner_image=kwargs.get("submit_runner_image"))
+                                      submit_runner_image=kwargs.get("submit_runner_image"),
+                                      merged_map=merged_map)
         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index e8e2a51..014e1b9 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -330,7 +330,7 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
-            packed = packed_workflow(self.arvrunner, self.tool)
+            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
@@ -339,6 +339,7 @@ class RunnerContainer(Runner):
             if self.tool.tool.get("id", "").startswith("arvwf:"):
                 container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
 
+
         command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
         if self.output_name:
             command.append("--output-name=" + self.output_name)
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 25f64ea..2731b26 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -280,7 +280,7 @@ class RunnerJob(Runner):
         if self.tool.tool["id"].startswith("keep:"):
             self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
         else:
-            packed = packed_workflow(self.arvrunner, self.tool)
+            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
             wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
@@ -370,7 +370,7 @@ class RunnerTemplate(object):
     }
 
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
-                 submit_runner_ram=0, name=None):
+                 submit_runner_ram=0, name=None, merged_map=None):
         self.runner = runner
         self.tool = tool
         self.job = RunnerJob(
@@ -381,7 +381,8 @@ class RunnerTemplate(object):
             output_name=None,
             output_tags=None,
             submit_runner_ram=submit_runner_ram,
-            name=name)
+            name=name,
+            merged_map=merged_map)
         self.uuid = uuid
 
     def pipeline_component_spec(self):
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 4acdc32..f0f9c77 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -27,9 +27,9 @@ logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
-                    submit_runner_ram=0, name=None):
+                    submit_runner_ram=0, name=None, merged_map=None):
 
-    packed = packed_workflow(arvRunner, tool)
+    packed = packed_workflow(arvRunner, tool, merged_map)
 
     adjustDirObjs(job_order, trim_listing)
     adjustFileObjs(job_order, trim_anonymous_location)
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 47749ee..69f918e 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -179,16 +179,13 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
             return os.path.realpath(path)
 
 class CollectionFetcher(DefaultFetcher):
-    def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4, overrides=None):
+    def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
         super(CollectionFetcher, self).__init__(cache, session)
         self.api_client = api_client
         self.fsaccess = fs_access
         self.num_retries = num_retries
-        self.overrides = overrides if overrides else {}
 
     def fetch_text(self, url):
-        if url in self.overrides:
-            return self.overrides[url]
         if url.startswith("keep:"):
             with self.fsaccess.open(url, "r") as f:
                 return f.read()
@@ -199,8 +196,6 @@ class CollectionFetcher(DefaultFetcher):
         return super(CollectionFetcher, self).fetch_text(url)
 
     def check_exists(self, url):
-        if url in self.overrides:
-            return True
         try:
             if url.startswith("http://arvados.org/cwl"):
                 return True
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 28c35ad..2ca63cf 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -172,13 +172,32 @@ def upload_docker(arvrunner, tool):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
 
-def packed_workflow(arvrunner, tool):
+def packed_workflow(arvrunner, tool, merged_map):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
 
-    return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
-                tool.tool["id"], tool.metadata)
+    rewrites = {}
+    packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
+                  tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+
+    rewrite_to_orig = {}
+    for k,v in rewrites.items():
+        rewrite_to_orig[v] = k
+
+    def visit(v, cur_id):
+        if isinstance(v, dict):
+            if v.get("class") in ("CommandLineTool", "Workflow"):
+                cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if "location" in v and not v["location"].startswith("keep:"):
+                v["location"] = merged_map[cur_id][v["location"]]
+            for l in v:
+                visit(v[l], cur_id)
+        if isinstance(v, list):
+            for l in v:
+                visit(l, cur_id)
+    visit(packed, None)
+    return packed
 
 def tag_git_version(packed):
     if tool.tool["id"].startswith("file://"):
@@ -229,16 +248,18 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
     return job_order
 
-def upload_workflow_deps(arvrunner, tool, override_tools):
+def upload_workflow_deps(arvrunner, tool):
     # Ensure that Docker images needed by this workflow are available
 
     upload_docker(arvrunner, tool)
 
     document_loader = tool.doc_loader
 
+    merged_map = {}
+
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            upload_dependencies(arvrunner,
+            pm = upload_dependencies(arvrunner,
                                 "%s dependencies" % (shortname(deptool["id"])),
                                 document_loader,
                                 deptool,
@@ -246,10 +267,15 @@ def upload_workflow_deps(arvrunner, tool, override_tools):
                                 False,
                                 include_primary=False)
             document_loader.idx[deptool["id"]] = deptool
-            override_tools[deptool["id"]] = yaml.round_trip_dump(deptool)
+            toolmap = {}
+            for k,v in pm.items():
+                toolmap[k] = v.resolved
+            merged_map[deptool["id"]] = toolmap
 
     tool.visit(upload_tool_deps)
 
+    return merged_map
+
 def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
@@ -291,7 +317,7 @@ class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
-                 intermediate_output_ttl=0):
+                 intermediate_output_ttl=0, merged_map=None):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -320,6 +346,8 @@ class Runner(object):
         if self.submit_runner_ram <= 0:
             raise Exception("Value of --submit-runner-ram must be greater than zero")
 
+        self.merged_map = merged_map or {}
+
     def update_pipeline_component(self, record):
         pass
 
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index de47098..9645a33 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180111185617',
+          'cwltool==1.0.20180116032016',
           'schema-salad==2.6.20171201034858',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list