[arvados] updated: 2.4.1-17-g7b7b32206

git repository hosting git at public.arvados.org
Thu Aug 4 16:26:08 UTC 2022


Summary of changes:
 apps/workbench/app/helpers/application_helper.rb |   2 +-
 sdk/cwl/arvados_cwl/executor.py                  |  16 +-
 sdk/cwl/arvados_cwl/runner.py                    | 204 +++++++++++++++--------
 sdk/cwl/setup.py                                 |   7 +-
 4 files changed, 152 insertions(+), 77 deletions(-)

       via  7b7b3220676a800ecc0a9b9562051c21f45aa904 (commit)
       via  d7fdbfafc049ef81d2a7453c7572aa589b4d35bf (commit)
      from  549ad3ac5dd288d549ec8310f2dc84509e260a60 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7b7b3220676a800ecc0a9b9562051c21f45aa904
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Aug 4 12:24:53 2022 -0400

    Merge branch '19321-wb1-enum' refs #19321
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/apps/workbench/app/helpers/application_helper.rb b/apps/workbench/app/helpers/application_helper.rb
index f22ab5016..697c469b5 100644
--- a/apps/workbench/app/helpers/application_helper.rb
+++ b/apps/workbench/app/helpers/application_helper.rb
@@ -564,7 +564,7 @@ module ApplicationHelper
                      "data-emptytext" => "none",
                      "data-placement" => "bottom",
                      "data-type" => "select",
-                     "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => i, :text => i} }).to_json,
+                     "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => cwl_shortname(i), :text => cwl_shortname(i)} }).to_json,
                      "data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
                      "data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
                      "data-name" => dn,

commit d7fdbfafc049ef81d2a7453c7572aa589b4d35bf
Author: Peter Amstutz <peter.amstutz at curii.com>
Date:   Thu Aug 4 11:59:15 2022 -0400

    Merge branch '19280-cache-loadref' refs #19280
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>

diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 778af58ac..3241fb607 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -565,8 +565,9 @@ The 'jobs' API is no longer supported.
         self.project_uuid = runtimeContext.project_uuid
 
         # Upload local file references in the job order.
-        job_order = upload_job_order(self, "%s input" % runtimeContext.name,
-                                     updated_tool, job_order, runtimeContext)
+        with Perf(metrics, "upload_job_order"):
+            job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+                                         updated_tool, job_order, runtimeContext)
 
         # the last clause means: if it is a command line tool, and we
         # are going to wait for the result, and always_submit_runner
@@ -581,19 +582,23 @@ The 'jobs' API is no longer supported.
 
         loadingContext = self.loadingContext.copy()
         loadingContext.do_validate = False
+        loadingContext.disable_js_validation = True
         if submitting:
             loadingContext.do_update = False
             # Document may have been auto-updated. Reload the original
             # document with updating disabled because we want to
             # submit the document with its original CWL version, not
             # the auto-updated one.
-            tool = load_tool(updated_tool.tool["id"], loadingContext)
+            with Perf(metrics, "load_tool original"):
+                tool = load_tool(updated_tool.tool["id"], loadingContext)
         else:
             tool = updated_tool
 
         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
         # Also uploads docker images.
-        merged_map = upload_workflow_deps(self, tool, runtimeContext)
+        logger.info("Uploading workflow dependencies")
+        with Perf(metrics, "upload_workflow_deps"):
+            merged_map = upload_workflow_deps(self, tool, runtimeContext)
 
         # Recreate process object (ArvadosWorkflow or
         # ArvadosCommandTool) because tool document may have been
@@ -602,7 +607,8 @@ The 'jobs' API is no longer supported.
         loadingContext.loader = tool.doc_loader
         loadingContext.avsc_names = tool.doc_schema
         loadingContext.metadata = tool.metadata
-        tool = load_tool(tool.tool, loadingContext)
+        with Perf(metrics, "load_tool"):
+            tool = load_tool(tool.tool, loadingContext)
 
         if runtimeContext.update_workflow or runtimeContext.create_workflow:
             # Create a pipeline template or workflow record and exit.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 644713bce..225f4ae60 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -49,8 +72,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
+from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 def trim_anonymous_location(obj):
     """Remove 'location' field from File and Directory literals.
@@ -228,23 +253,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 if sfname is None:
                     continue
 
-                p_location = primary["location"]
-                if "/" in p_location:
-                    sfpath = (
-                        p_location[0 : p_location.rindex("/") + 1]
-                        + sfname
-                    )
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
@@ -260,7 +295,8 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
 
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run, runtimeContext,
-                        include_primary=True, discovered_secondaryfiles=None):
+                        include_primary=True, discovered_secondaryfiles=None,
+                        cache=None):
     """Upload the dependencies of the workflowobj document to Keep.
 
     Returns a pathmapper object mapping local paths to keep references.  Also
@@ -279,6 +315,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         defrg, _ = urllib.parse.urldefrag(joined)
         if defrg not in loaded:
             loaded.add(defrg)
+            if cache is not None and defrg in cache:
+                return cache[defrg]
             # Use fetch_text to get raw file (before preprocessing).
             text = document_loader.fetch_text(defrg)
             if isinstance(text, bytes):
@@ -286,7 +324,10 @@ def upload_dependencies(arvrunner, name, document_loader,
             else:
                 textIO = StringIO(text)
             yamlloader = YAML(typ='safe', pure=True)
-            return yamlloader.load(textIO)
+            result = yamlloader.load(textIO)
+            if cache is not None:
+                cache[defrg] = result
+            return result
         else:
             return {}
 
@@ -297,25 +338,37 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     scanobj = workflowobj
     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if cache is not None and defrg not in cache:
+            # if we haven't seen this file before, want raw file
+            # content (before preprocessing) to ensure that external
+            # references like $include haven't already been inlined.
+            scanobj = loadref("", workflowobj["id"])
 
     metadata = scanobj
 
-    sc_result = scandeps(uri, scanobj,
-                         loadref_fields,
-                         set(("$include", "location")),
-                         loadref, urljoin=document_loader.fetcher.urljoin,
-                         nestdirs=False)
+    with Perf(metrics, "scandeps include, location"):
+        sc_result = scandeps(uri, scanobj,
+                             loadref_fields,
+                             set(("$include", "location")),
+                             loadref, urljoin=document_loader.fetcher.urljoin,
+                             nestdirs=False)
 
-    optional_deps = scandeps(uri, scanobj,
-                                  loadref_fields,
-                                  set(("$schemas",)),
-                                  loadref, urljoin=document_loader.fetcher.urljoin,
-                                  nestdirs=False)
+    with Perf(metrics, "scandeps $schemas"):
+        optional_deps = scandeps(uri, scanobj,
+                                      loadref_fields,
+                                      set(("$schemas",)),
+                                      loadref, urljoin=document_loader.fetcher.urljoin,
+                                      nestdirs=False)
 
-    sc_result.extend(optional_deps)
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -343,35 +396,45 @@ def upload_dependencies(arvrunner, name, document_loader,
             sc.append(obj)
         collect_uuids(obj)
 
-    visit_class(workflowobj, ("File", "Directory"), collect_uuids)
-    visit_class(sc_result, ("File", "Directory"), collect_uploads)
+    with Perf(metrics, "collect uuids"):
+        visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+    with Perf(metrics, "collect uploads"):
+        visit_class(sc_result, ("File", "Directory"), collect_uploads)
 
     # Resolve any collection uuids we found to portable data hashes
     # and assign them to uuid_map
     uuid_map = {}
     fetch_uuids = list(uuids.keys())
-    while fetch_uuids:
-        # For a large number of fetch_uuids, API server may limit
-        # response size, so keep fetching from API server has nothing
-        # more to give us.
-        lookups = arvrunner.api.collections().list(
-            filters=[["uuid", "in", fetch_uuids]],
-            count="none",
-            select=["uuid", "portable_data_hash"]).execute(
-                num_retries=arvrunner.num_retries)
+    with Perf(metrics, "fetch_uuids"):
+        while fetch_uuids:
+            # For a large number of fetch_uuids, API server may limit
+            # response size, so keep fetching from API server has nothing
+            # more to give us.
+            lookups = arvrunner.api.collections().list(
+                filters=[["uuid", "in", fetch_uuids]],
+                count="none",
+                select=["uuid", "portable_data_hash"]).execute(
+                    num_retries=arvrunner.num_retries)
 
-        if not lookups["items"]:
-            break
+            if not lookups["items"]:
+                break
 
-        for l in lookups["items"]:
-            uuid_map[l["uuid"]] = l["portable_data_hash"]
+            for l in lookups["items"]:
+                uuid_map[l["uuid"]] = l["portable_data_hash"]
 
-        fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+            fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -412,12 +475,13 @@ def upload_dependencies(arvrunner, name, document_loader,
         else:
             del discovered[d]
 
-    mapper = ArvPathMapper(arvrunner, sc, "",
-                           "keep:%s",
-                           "keep:%s/%s",
-                           name=name,
-                           single_collection=True,
-                           optional_deps=optional_deps)
+    with Perf(metrics, "mapper"):
+        mapper = ArvPathMapper(arvrunner, sc, "",
+                               "keep:%s",
+                               "keep:%s/%s",
+                               name=name,
+                               single_collection=True,
+                               optional_deps=optional_deps)
 
     keeprefs = set()
     def addkeepref(k):
@@ -461,8 +525,9 @@ def upload_dependencies(arvrunner, name, document_loader,
         p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
         p[collectionUUID] = uuid
 
-    visit_class(workflowobj, ("File", "Directory"), setloc)
-    visit_class(discovered, ("File", "Directory"), setloc)
+    with Perf(metrics, "setloc"):
+        visit_class(workflowobj, ("File", "Directory"), setloc)
+        visit_class(discovered, ("File", "Directory"), setloc)
 
     if discovered_secondaryfiles is not None:
         for d in discovered:
@@ -647,24 +712,27 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
 def upload_workflow_deps(arvrunner, tool, runtimeContext):
     # Ensure that Docker images needed by this workflow are available
 
-    upload_docker(arvrunner, tool, runtimeContext)
+    with Perf(metrics, "upload_docker"):
+        upload_docker(arvrunner, tool, runtimeContext)
 
     document_loader = tool.doc_loader
 
     merged_map = {}
-
+    tool_dep_cache = {}
     def upload_tool_deps(deptool):
         if "id" in deptool:
             discovered_secondaryfiles = {}
-            pm = upload_dependencies(arvrunner,
-                                     "%s dependencies" % (shortname(deptool["id"])),
-                                     document_loader,
-                                     deptool,
-                                     deptool["id"],
-                                     False,
-                                     runtimeContext,
-                                     include_primary=False,
-                                     discovered_secondaryfiles=discovered_secondaryfiles)
+            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+                pm = upload_dependencies(arvrunner,
+                                         "%s dependencies" % (shortname(deptool["id"])),
+                                         document_loader,
+                                         deptool,
+                                         deptool["id"],
+                                         False,
+                                         runtimeContext,
+                                         include_primary=False,
+                                         discovered_secondaryfiles=discovered_secondaryfiles,
+                                         cache=tool_dep_cache)
             document_loader.idx[deptool["id"]] = deptool
             toolmap = {}
             for k,v in pm.items():
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index c885ebd4b..66cda19f4 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,12 +36,13 @@ setup(name='arvados-cwl-runner',
       # file to determine what version of cwltool and schema-salad to
       # build.
       install_requires=[
-          'cwltool==3.1.20220224085855',
-          'schema-salad==8.2.20211116214159',
+          'cwltool==3.1.20220623174452',
+          'schema-salad==8.3.20220801194920',
           'arvados-python-client{}'.format(pysdk_dep),
           'setuptools',
           'ciso8601 >= 2.0.0',
-          'networkx < 2.6'
+          'networkx < 2.6',
+          'msgpack==1.0.3'
       ],
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list