[arvados] updated: 2.4.1-17-g7b7b32206
git repository hosting
git at public.arvados.org
Thu Aug 4 16:26:08 UTC 2022
Summary of changes:
apps/workbench/app/helpers/application_helper.rb | 2 +-
sdk/cwl/arvados_cwl/executor.py | 16 +-
sdk/cwl/arvados_cwl/runner.py | 204 +++++++++++++++--------
sdk/cwl/setup.py | 7 +-
4 files changed, 152 insertions(+), 77 deletions(-)
via 7b7b3220676a800ecc0a9b9562051c21f45aa904 (commit)
via d7fdbfafc049ef81d2a7453c7572aa589b4d35bf (commit)
from 549ad3ac5dd288d549ec8310f2dc84509e260a60 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 7b7b3220676a800ecc0a9b9562051c21f45aa904
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Aug 4 12:24:53 2022 -0400
Merge branch '19321-wb1-enum' refs #19321
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/apps/workbench/app/helpers/application_helper.rb b/apps/workbench/app/helpers/application_helper.rb
index f22ab5016..697c469b5 100644
--- a/apps/workbench/app/helpers/application_helper.rb
+++ b/apps/workbench/app/helpers/application_helper.rb
@@ -564,7 +564,7 @@ module ApplicationHelper
"data-emptytext" => "none",
"data-placement" => "bottom",
"data-type" => "select",
- "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => i, :text => i} }).to_json,
+ "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => cwl_shortname(i), :text => cwl_shortname(i)} }).to_json,
"data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
"data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
"data-name" => dn,
commit d7fdbfafc049ef81d2a7453c7572aa589b4d35bf
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Aug 4 11:59:15 2022 -0400
Merge branch '19280-cache-loadref' refs #19280
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 778af58ac..3241fb607 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -565,8 +565,9 @@ The 'jobs' API is no longer supported.
self.project_uuid = runtimeContext.project_uuid
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order, runtimeContext)
+ with Perf(metrics, "upload_job_order"):
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
@@ -581,19 +582,23 @@ The 'jobs' API is no longer supported.
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
+ loadingContext.disable_js_validation = True
if submitting:
loadingContext.do_update = False
# Document may have been auto-updated. Reload the original
# document with updating disabled because we want to
# submit the document with its original CWL version, not
# the auto-updated one.
- tool = load_tool(updated_tool.tool["id"], loadingContext)
+ with Perf(metrics, "load_tool original"):
+ tool = load_tool(updated_tool.tool["id"], loadingContext)
else:
tool = updated_tool
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ logger.info("Uploading workflow dependencies")
+ with Perf(metrics, "upload_workflow_deps"):
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
@@ -602,7 +607,8 @@ The 'jobs' API is no longer supported.
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- tool = load_tool(tool.tool, loadingContext)
+ with Perf(metrics, "load_tool"):
+ tool = load_tool(tool.tool, loadingContext)
if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 644713bce..225f4ae60 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -17,7 +17,30 @@ import json
import copy
from collections import namedtuple
from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Sequence,
+ MutableSequence,
+ Optional,
+ Set,
+ Sized,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+from cwltool.utils import (
+ CWLObjectType,
+ CWLOutputAtomType,
+ CWLOutputType,
+)
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
@@ -49,8 +72,10 @@ from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, col
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
def trim_anonymous_location(obj):
"""Remove 'location' field from File and Directory literals.
@@ -228,23 +253,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
if sfname is None:
continue
- p_location = primary["location"]
- if "/" in p_location:
- sfpath = (
- p_location[0 : p_location.rindex("/") + 1]
- + sfname
- )
+ if isinstance(sfname, str):
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
- if fsaccess.exists(sfpath):
- if pattern is not None:
- found.append({"location": sfpath, "class": "File"})
- else:
- found.append(sf)
- elif required:
- raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
- "Required secondary file '%s' does not exist" % sfpath)
+ if isinstance(sfname, list) or isinstance(sfname, dict):
+ each = aslist(sfname)
+ for e in each:
+ if required and not fsaccess.exists(e.get("location")):
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % e.get("location"))
+ found.extend(each)
+
+ if isinstance(sfname, str):
+ if fsaccess.exists(sfpath):
+ if pattern is not None:
+ found.append({"location": sfpath, "class": "File"})
+ else:
+ found.append(sf)
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
@@ -260,7 +295,8 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run, runtimeContext,
- include_primary=True, discovered_secondaryfiles=None):
+ include_primary=True, discovered_secondaryfiles=None,
+ cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
@@ -279,6 +315,8 @@ def upload_dependencies(arvrunner, name, document_loader,
defrg, _ = urllib.parse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
+ if cache is not None and defrg in cache:
+ return cache[defrg]
# Use fetch_text to get raw file (before preprocessing).
text = document_loader.fetch_text(defrg)
if isinstance(text, bytes):
@@ -286,7 +324,10 @@ def upload_dependencies(arvrunner, name, document_loader,
else:
textIO = StringIO(text)
yamlloader = YAML(typ='safe', pure=True)
- return yamlloader.load(textIO)
+ result = yamlloader.load(textIO)
+ if cache is not None:
+ cache[defrg] = result
+ return result
else:
return {}
@@ -297,25 +338,37 @@ def upload_dependencies(arvrunner, name, document_loader,
scanobj = workflowobj
if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- # Need raw file content (before preprocessing) to ensure
- # that external references in $include and $mixin are captured.
- scanobj = loadref("", workflowobj["id"])
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if cache is not None and defrg not in cache:
+ # if we haven't seen this file before, want raw file
+ # content (before preprocessing) to ensure that external
+ # references like $include haven't already been inlined.
+ scanobj = loadref("", workflowobj["id"])
metadata = scanobj
- sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps include, location"):
+ sc_result = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps $schemas"):
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- sc_result.extend(optional_deps)
+ if sc_result is None:
+ sc_result = []
+
+ if optional_deps is None:
+ optional_deps = []
+
+ if optional_deps:
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
@@ -343,35 +396,45 @@ def upload_dependencies(arvrunner, name, document_loader,
sc.append(obj)
collect_uuids(obj)
- visit_class(workflowobj, ("File", "Directory"), collect_uuids)
- visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ with Perf(metrics, "collect uuids"):
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+ with Perf(metrics, "collect uploads"):
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
# Resolve any collection uuids we found to portable data hashes
# and assign them to uuid_map
uuid_map = {}
fetch_uuids = list(uuids.keys())
- while fetch_uuids:
- # For a large number of fetch_uuids, API server may limit
- # response size, so keep fetching from API server has nothing
- # more to give us.
- lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", fetch_uuids]],
- count="none",
- select=["uuid", "portable_data_hash"]).execute(
- num_retries=arvrunner.num_retries)
+ with Perf(metrics, "fetch_uuids"):
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
- if not lookups["items"]:
- break
+ if not lookups["items"]:
+ break
- for l in lookups["items"]:
- uuid_map[l["uuid"]] = l["portable_data_hash"]
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
- fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
- if include_primary and "id" in workflowobj:
- sc.append({"class": "File", "location": workflowobj["id"]})
+ if "id" in workflowobj:
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if include_primary:
+ # make sure it's included
+ sc.append({"class": "File", "location": defrg})
+ else:
+ # make sure it's excluded
+ sc = [d for d in sc if d.get("location") != defrg]
def visit_default(obj):
def defaults_are_optional(f):
@@ -412,12 +475,13 @@ def upload_dependencies(arvrunner, name, document_loader,
else:
del discovered[d]
- mapper = ArvPathMapper(arvrunner, sc, "",
- "keep:%s",
- "keep:%s/%s",
- name=name,
- single_collection=True,
- optional_deps=optional_deps)
+ with Perf(metrics, "mapper"):
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name,
+ single_collection=True,
+ optional_deps=optional_deps)
keeprefs = set()
def addkeepref(k):
@@ -461,8 +525,9 @@ def upload_dependencies(arvrunner, name, document_loader,
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), setloc)
+ visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
@@ -647,24 +712,27 @@ FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool, runtimeContext)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
merged_map = {}
-
+ tool_dep_cache = {}
def upload_tool_deps(deptool):
if "id" in deptool:
discovered_secondaryfiles = {}
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles)
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index c885ebd4b..66cda19f4 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,12 +36,13 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220224085855',
- 'schema-salad==8.2.20211116214159',
+ 'cwltool==3.1.20220623174452',
+ 'schema-salad==8.3.20220801194920',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
- 'networkx < 2.6'
+ 'networkx < 2.6',
+ 'msgpack==1.0.3'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list