[arvados] created: 2.4.2-20-gc1a2ea038
git repository hosting
git at public.arvados.org
Mon Sep 12 19:14:11 UTC 2022
at c1a2ea038d331749aa1409259a76ac8fde1683f6 (commit)
commit c1a2ea038d331749aa1409259a76ac8fde1683f6
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Fri Sep 9 16:59:27 2022 -0400
Merge branch '19464-capture-git-info' refs #19464
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 5094ea3bf..e9b58bc83 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -453,7 +453,7 @@ class ArvadosContainer(JobBase):
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, runtimeContext):
+ def arvados_job_spec(self, runtimeContext, git_info):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
@@ -515,7 +515,7 @@ class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
@@ -524,6 +524,8 @@ class RunnerContainer(Runner):
if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
+
properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
if properties_req:
builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
@@ -595,7 +597,7 @@ class RunnerContainer(Runner):
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
- job_spec = self.arvados_job_spec(runtimeContext)
+ job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
if runtimeContext.project_uuid:
job_spec["owner_uuid"] = runtimeContext.project_uuid
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index 51e7cd8b9..5f3feabf8 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -40,9 +40,10 @@ sum_res_pars = ("outdirMin", "outdirMax")
def upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
- submit_runner_image=None):
+ submit_runner_image=None,
+ git_info=None):
- packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 8635d5fcf..5c74eb1f9 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -17,6 +17,7 @@ import copy
import json
import re
from functools import partial
+import subprocess
import time
import urllib
@@ -24,6 +25,7 @@ from cwltool.errors import WorkflowException
import cwltool.workflow
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
+from schema_salad.ref_resolver import file_uri, uri_file_path
import arvados
import arvados.config
@@ -518,9 +520,75 @@ The 'jobs' API is no longer supported.
for req in job_reqs:
tool.requirements.append(req)
+ @staticmethod
+ def get_git_info(tool):
+ in_a_git_repo = False
+ cwd = None
+ filepath = None
+
+ if tool.tool["id"].startswith("file://"):
+ # check if git is installed
+ try:
+ filepath = uri_file_path(tool.tool["id"])
+ cwd = os.path.dirname(filepath)
+ subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
+ in_a_git_repo = True
+ except Exception as e:
+ pass
+
+ gitproperties = {}
+
+ if in_a_git_repo:
+ git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout
+ git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
+ git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
+ git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout
+ git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
+ git_path = filepath[len(git_toplevel):]
+
+ gitproperties = {
+ "http://arvados.org/cwl#gitCommit": git_commit.strip(),
+ "http://arvados.org/cwl#gitDate": git_date.strip(),
+ "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
+ "http://arvados.org/cwl#gitBranch": git_branch.strip(),
+ "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
+ "http://arvados.org/cwl#gitStatus": git_status.strip(),
+ "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
+ "http://arvados.org/cwl#gitPath": git_path.strip(),
+ }
+ else:
+ for g in ("http://arvados.org/cwl#gitCommit",
+ "http://arvados.org/cwl#gitDate",
+ "http://arvados.org/cwl#gitCommitter",
+ "http://arvados.org/cwl#gitBranch",
+ "http://arvados.org/cwl#gitOrigin",
+ "http://arvados.org/cwl#gitStatus",
+ "http://arvados.org/cwl#gitDescribe",
+ "http://arvados.org/cwl#gitPath"):
+ if g in tool.metadata:
+ gitproperties[g] = tool.metadata[g]
+
+ return gitproperties
+
+ def set_container_request_properties(self, container, properties):
+ resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
+ for cr in resp["items"]:
+ cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
+ self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
+
def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
+ git_info = self.get_git_info(updated_tool)
+ if git_info:
+ logger.info("Git provenance")
+ for g in git_info:
+ if git_info[g]:
+ logger.info(" %s: %s", g.split("#", 1)[1], git_info[g])
+
workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
@@ -554,7 +622,10 @@ The 'jobs' API is no longer supported.
runtimeContext.intermediate_storage_classes = default_storage_classes
if not runtimeContext.name:
- runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if git_info.get("http://arvados.org/cwl#gitDescribe"):
+ self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
+ runtimeContext.name = self.name
if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
# When creating or updating workflow record, by default
@@ -628,7 +699,8 @@ The 'jobs' API is no longer supported.
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ submit_runner_image=runtimeContext.submit_runner_image,
+ git_info=git_info)
self.stdout.write(uuid + "\n")
return (None, "success")
@@ -690,7 +762,8 @@ The 'jobs' API is no longer supported.
priority=runtimeContext.priority,
secret_store=self.secret_store,
collection_cache_size=runtimeContext.collection_cache_size,
- collection_cache_is_default=self.should_estimate_cache_size)
+ collection_cache_is_default=self.should_estimate_cache_size,
+ git_info=git_info)
else:
runtimeContext.runnerjob = tool.tool["id"]
@@ -710,6 +783,7 @@ The 'jobs' API is no longer supported.
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
+ self.set_container_request_properties(current_container, git_info)
self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
self.polling_thread = threading.Thread(target=self.poll_states)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 225f4ae60..1544d05cd 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -604,7 +604,7 @@ def upload_docker(arvrunner, tool, runtimeContext):
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
@@ -644,6 +644,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
for l in v:
visit(l, cur_id)
visit(packed, None)
+
+ if git_info:
+ for g in git_info:
+ packed[g] = git_info[g]
+
return packed
@@ -794,7 +799,8 @@ class Runner(Process):
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
- collection_cache_is_default=True):
+ collection_cache_is_default=True,
+ git_info=None):
loadingContext = loadingContext.copy()
loadingContext.metadata = updated_tool.metadata.copy()
@@ -823,6 +829,7 @@ class Runner(Process):
self.priority = priority
self.secret_store = secret_store
self.enable_dev = loadingContext.enable_dev
+ self.git_info = git_info
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index a726ec501..b44f6feb5 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -19,6 +19,7 @@ import sys
import unittest
import cwltool.process
import re
+import os
from io import BytesIO
@@ -45,7 +46,7 @@ import ruamel.yaml as yaml
_rootDesc = None
-def stubs(wfname='submit_wf.cwl'):
+def stubs(wfdetails=('submit_wf.cwl', None)):
def outer_wrapper(func, *rest):
@functools.wraps(func)
@mock.patch("arvados_cwl.arvdocker.determine_image_id")
@@ -58,6 +59,10 @@ def stubs(wfname='submit_wf.cwl'):
uuid4, determine_image_id, *args, **kwargs):
class Stubs(object):
pass
+
+ wfname = wfdetails[0]
+ wfpath = wfdetails[1]
+
stubs = Stubs()
stubs.events = events
stubs.keepdocker = keepdocker
@@ -273,9 +278,28 @@ def stubs(wfname='submit_wf.cwl'):
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
- with open("tests/wf/submit_wf_packed.cwl") as f:
+ cwd = os.getcwd()
+ filepath = os.path.join(cwd, "tests/wf/submit_wf_packed.cwl")
+ with open(filepath) as f:
expect_packed_workflow = yaml.round_trip_load(f)
+ if wfpath is None:
+ wfpath = wfname
+
+ gitinfo_workflow = copy.deepcopy(expect_packed_workflow)
+ gitinfo_workflow["$graph"][0]["id"] = "file://%s/tests/wf/%s" % (cwd, wfpath)
+ mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
+
+ git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
+ expect_packed_workflow.update(git_info)
+
+ git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in git_info.items()}
+
+ if wfname == wfpath:
+ container_name = "%s (%s)" % (wfpath, git_props["arv:gitDescribe"])
+ else:
+ container_name = wfname
+
stubs.expect_container_spec = {
'priority': 500,
'mounts': {
@@ -321,12 +345,12 @@ def stubs(wfname='submit_wf.cwl'):
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- '--output-name=Output from workflow '+wfname,
+ '--output-name=Output from workflow '+container_name,
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
- 'name': wfname,
+ 'name': container_name,
'container_image': '999999999999999999999999999999d3+99',
- 'output_name': 'Output from workflow '+wfname,
+ 'output_name': 'Output from workflow %s' % (container_name),
'output_path': '/var/spool/cwl',
'cwd': '/var/spool/cwl',
'runtime_constraints': {
@@ -335,7 +359,7 @@ def stubs(wfname='submit_wf.cwl'):
'ram': (1024+256)*1024*1024
},
'use_existing': False,
- 'properties': {},
+ 'properties': git_props,
'secret_mounts': {}
}
@@ -444,7 +468,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('submit_wf_no_reuse.cwl')
+ @stubs(('submit_wf_no_reuse.cwl', None))
def test_submit_container_reuse_disabled_by_workflow(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@@ -453,13 +477,7 @@ class TestSubmit(unittest.TestCase):
self.assertEqual(exited, 0)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = [
- 'arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--disable-reuse', "--collection-cache-size=256",
- '--output-name=Output from workflow submit_wf_no_reuse.cwl', '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ["--disable-reuse" if v == "--enable-reuse" else v for v in expect_container["command"]]
expect_container["use_existing"] = False
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
@@ -891,7 +909,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('hello container 123')
+ @stubs(('hello container 123', 'submit_wf.cwl'))
def test_submit_container_name(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
@@ -1042,7 +1060,7 @@ class TestSubmit(unittest.TestCase):
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('submit_wf_runner_resources.cwl')
+ @stubs(('submit_wf_runner_resources.cwl', None))
def test_submit_wf_runner_resources(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@@ -1066,13 +1084,7 @@ class TestSubmit(unittest.TestCase):
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
"arv": "http://arvados.org/cwl#",
}
- expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=512",
- '--output-name=Output from workflow submit_wf_runner_resources.cwl',
- '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ["--collection-cache-size=512" if v == "--collection-cache-size=256" else v for v in expect_container["command"]]
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -1470,7 +1482,7 @@ class TestSubmit(unittest.TestCase):
finally:
cwltool_logger.removeHandler(stderr_logger)
- @stubs('submit_wf_process_properties.cwl')
+ @stubs(('submit_wf_process_properties.cwl', None))
def test_submit_set_process_properties(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
@@ -1500,14 +1512,14 @@ class TestSubmit(unittest.TestCase):
"arv": "http://arvados.org/cwl#"
}
- expect_container["properties"] = {
+ expect_container["properties"].update({
"baz": "blorp.txt",
"foo": "bar",
"quux": {
"q1": 1,
"q2": 2
}
- }
+ })
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
commit 857f9369b549d1d9dfef4d941a6aecbf747334f1
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Thu Sep 8 15:05:54 2022 -0400
Merge branch '19454-cwl-output-json' refs #19454
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 19b12a5e9..63d3f1331 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -36,8 +36,8 @@ setup(name='arvados-cwl-runner',
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220623174452',
- 'schema-salad==8.3.20220801194920',
+ 'cwltool==3.1.20220907141119',
+ 'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'ciso8601 >= 2.0.0',
'networkx < 2.6',
commit a1a34a40b6fd5681e11175a8e0fa17d105828b13
Author: Peter Amstutz <peter.amstutz at curii.com>
Date: Wed Aug 24 14:35:43 2022 -0400
Merge branch '19413-warning-update' refs #19413
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz at curii.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 3241fb607..8635d5fcf 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -252,6 +252,11 @@ The 'jobs' API is no longer supported.
Called when there's a need to report errors, warnings or just
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
+
+ if kind not in ('error', 'warning'):
+ # Ignore any other status kind
+ return
+
with self.workflow_eval_lock:
current = None
try:
@@ -261,32 +266,35 @@ The 'jobs' API is no longer supported.
if current is None:
return
runtime_status = current.get('runtime_status', {})
- if kind in ('error', 'warning'):
- updatemessage = runtime_status.get(kind, "")
- if not updatemessage:
- updatemessage = message
-
- # Subsequent messages tacked on in detail
- updatedetail = runtime_status.get(kind+'Detail', "")
- maxlines = 40
- if updatedetail.count("\n") < maxlines:
- if updatedetail:
- updatedetail += "\n"
- updatedetail += message + "\n"
-
- if detail:
- updatedetail += detail + "\n"
-
- if updatedetail.count("\n") >= maxlines:
- updatedetail += "\nSome messages may have been omitted. Check the full log."
-
- runtime_status.update({
- kind: updatemessage,
- kind+'Detail': updatedetail,
- })
- else:
- # Ignore any other status kind
+
+ original_updatemessage = updatemessage = runtime_status.get(kind, "")
+ if not updatemessage:
+ updatemessage = message
+
+ # Subsequent messages tacked on in detail
+ original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
+ maxlines = 40
+ if updatedetail.count("\n") < maxlines:
+ if updatedetail:
+ updatedetail += "\n"
+ updatedetail += message + "\n"
+
+ if detail:
+ updatedetail += detail + "\n"
+
+ if updatedetail.count("\n") >= maxlines:
+ updatedetail += "\nSome messages may have been omitted. Check the full log."
+
+ if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
+ # don't waste time doing an update if nothing changed
+ # (usually because we exceeded the max lines)
return
+
+ runtime_status.update({
+ kind: updatemessage,
+ kind+'Detail': updatedetail,
+ })
+
try:
self.api.containers().update(uuid=current['uuid'],
body={
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list