[ARVADOS] created: 1.2.0-299-gabfb5bea8
Git user
git at public.curoverse.com
Tue Oct 30 14:05:43 EDT 2018
at abfb5bea8d7e6e4026c887fe0b7a512a87e2b42e (commit)
commit abfb5bea8d7e6e4026c887fe0b7a512a87e2b42e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 17:21:05 2018 -0400
14198: Fix typo current -> current_container, add copyright header
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 7256e1d0d..bf81853be 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
import argparse
import logging
import os
@@ -618,7 +622,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
return (runnerjob.uuid, "success")
current_container = get_current_container(self.api, self.num_retries, logger)
- if current:
+ if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
commit 124f097faf9ebd1295350699e03d3084de3a39d9
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 17:12:14 2018 -0400
14198: Only assign runtime token when api token is for local cluster
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/fed_containers.go b/lib/controller/fed_containers.go
index a3c292583..ebe324de7 100644
--- a/lib/controller/fed_containers.go
+++ b/lib/controller/fed_containers.go
@@ -76,17 +76,19 @@ func remoteContainerRequestCreate(
return true
}
- if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
- httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
- return true
- }
-
- newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusForbidden)
- return true
+ if currentUser.Authorization.UUID[0:5] == h.handler.Cluster.ClusterID {
+ if len(currentUser.Authorization.Scopes) != 1 || currentUser.Authorization.Scopes[0] != "all" {
+ httpserver.Error(w, "Token scope is not [all]", http.StatusForbidden)
+ return true
+ }
+
+ newtok, err := h.handler.createAPItoken(req, currentUser.UUID, nil)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusForbidden)
+ return true
+ }
+ containerRequest["runtime_token"] = newtok.TokenV2()
}
- containerRequest["runtime_token"] = newtok.TokenV2()
}
newbody, err := json.Marshal(request)
commit 966719c51c98419ebc12303cfbea90f4197e4746
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 17:11:59 2018 -0400
14198: Log current container
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 6eaa4b92c..7256e1d0d 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -617,6 +617,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
runnerjob.run(submitargs)
return (runnerjob.uuid, "success")
+ current_container = get_current_container(self.api, self.num_retries, logger)
+ if current:
+ logger.info("Running inside container %s", current_container.get("uuid"))
+
self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
commit 7958d434f8a08b62b265ff7b62a5b0ea545af1b0
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 16:14:50 2018 -0400
14198: Support expressions in TargetCluster[clusterID, ownerUUID]
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index d9466a83a..823b41ce9 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -36,7 +36,7 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
class ArvadosContainer(JobBase):
"""Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
- def __init__(self, runner,
+ def __init__(self, runner, cluster_target,
builder, # type: Builder
joborder, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
make_path_mapper, # type: Callable[..., PathMapper]
@@ -46,6 +46,7 @@ class ArvadosContainer(JobBase):
):
super(ArvadosContainer, self).__init__(builder, joborder, make_path_mapper, requirements, hints, name)
self.arvrunner = runner
+ self.cluster_target = cluster_target
self.running = False
self.uuid = None
@@ -251,13 +252,11 @@ class ArvadosContainer(JobBase):
scheduling_parameters["max_run_time"] = self.timelimit
extra_submit_params = {}
- cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
- if cluster_target_req:
- cluster_id = cluster_target_req.get("clusterID")
- if cluster_id:
- extra_submit_params["cluster_id"] = cluster_id
- if cluster_target_req.get("ownerUUID"):
- container_request["owner_uuid"] = cluster_target_req.get("ownerUUID")
+ if self.cluster_target is not None:
+ if self.cluster_target.cluster_id:
+ extra_submit_params["cluster_id"] = self.cluster_target.cluster_id
+ if self.cluster_target.owner_uuid:
+ container_request["owner_uuid"] = self.cluster_target.owner_uuid
container_request["output_name"] = "Output for step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py
index 119acc303..e0997db5b 100644
--- a/sdk/cwl/arvados_cwl/arvtool.py
+++ b/sdk/cwl/arvados_cwl/arvtool.py
@@ -6,6 +6,7 @@ from cwltool.command_line_tool import CommandLineTool
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from .context import ClusterTarget
from functools import partial
class ArvadosCommandTool(CommandLineTool):
@@ -17,7 +18,7 @@ class ArvadosCommandTool(CommandLineTool):
def make_job_runner(self, runtimeContext):
if runtimeContext.work_api == "containers":
- return partial(ArvadosContainer, self.arvrunner)
+ return partial(ArvadosContainer, self.arvrunner, runtimeContext.cluster_target)
elif runtimeContext.work_api == "jobs":
return partial(ArvadosJob, self.arvrunner)
else:
@@ -44,6 +45,12 @@ class ArvadosCommandTool(CommandLineTool):
runtimeContext = runtimeContext.copy()
+ cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
+ if runtimeContext.cluster_target is None or runtimeContext.cluster_target.instance != id(cluster_target_req):
+ runtimeContext.cluster_target = ClusterTarget(id(cluster_target_req),
+ builder.do_eval(cluster_target_req.get("clusterID")),
+ builder.do_eval(cluster_target_req.get("ownerUUID")))
+
if runtimeContext.work_api == "containers":
dockerReq, is_req = self.get_requirement("DockerRequirement")
if dockerReq and dockerReq.get("dockerOutputDirectory"):
diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py
index ae9062510..f86641bfd 100644
--- a/sdk/cwl/arvados_cwl/arvworkflow.py
+++ b/sdk/cwl/arvados_cwl/arvworkflow.py
@@ -131,158 +131,167 @@ class ArvadosWorkflow(Workflow):
self.loadingContext = loadingContext
def job(self, joborder, output_callback, runtimeContext):
+
+ cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
+ if runtimeContext.cluster_target is None or runtimeContext.cluster_target.instance != id(cluster_target_req):
+ runtimeContext.cluster_target = ClusterTarget(id(cluster_target_req),
+ builder.do_eval(cluster_target_req.get("clusterID")),
+ builder.do_eval(cluster_target_req.get("ownerUUID")))
+
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
- if req:
- with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if "id" not in self.tool:
- raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
- document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+ if not req:
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
- discover_secondary_files(self.tool["inputs"], joborder)
+ # RunInSingleContainer is true
- with Perf(metrics, "subworkflow upload_deps"):
- upload_dependencies(self.arvrunner,
- os.path.basename(joborder.get("id", "#")),
- document_loader,
- joborder,
- joborder.get("id", "#"),
- False)
+ with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if "id" not in self.tool:
+ raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
+ document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
+
+ discover_secondary_files(self.tool["inputs"], joborder)
+
+ with Perf(metrics, "subworkflow upload_deps"):
+ upload_dependencies(self.arvrunner,
+ os.path.basename(joborder.get("id", "#")),
+ document_loader,
+ joborder,
+ joborder.get("id", "#"),
+ False)
+
+ if self.wf_pdh is None:
+ workflowobj["requirements"] = dedup_reqs(self.requirements)
+ workflowobj["hints"] = dedup_reqs(self.hints)
+
+ packed = pack(document_loader, workflowobj, uri, self.metadata)
- if self.wf_pdh is None:
- workflowobj["requirements"] = dedup_reqs(self.requirements)
- workflowobj["hints"] = dedup_reqs(self.hints)
-
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- builder = Builder(joborder,
- requirements=workflowobj["requirements"],
- hints=workflowobj["hints"],
- resources={})
-
- def visit(item):
- for t in ("hints", "requirements"):
- if t not in item:
- continue
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- dyn = False
- for k in max_res_pars + sum_res_pars:
- if k in req:
- if isinstance(req[k], basestring):
- if item["id"] == "#main":
- # only the top-level requirements/hints may contain expressions
- self.dynamic_resource_req.append(req)
- dyn = True
- break
- else:
- with SourceLine(req, k, WorkflowException):
- raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
- if not dyn:
- self.static_resource_req.append(req)
-
- visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
-
- if self.static_resource_req:
- self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
-
- upload_dependencies(self.arvrunner,
- runtimeContext.name,
- document_loader,
- packed,
- uri,
- False)
-
- # Discover files/directories referenced by the
- # workflow (mainly "default" values)
- visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
-
-
- if self.dynamic_resource_req:
builder = Builder(joborder,
- requirements=self.requirements,
- hints=self.hints,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
resources={})
- # Evaluate dynamic resource requirements using current builder
- rs = copy.copy(self.static_resource_req)
- for dyn_rs in self.dynamic_resource_req:
- eval_req = {"class": "ResourceRequirement"}
- for a in max_res_pars + sum_res_pars:
- if a in dyn_rs:
- eval_req[a] = builder.do_eval(dyn_rs[a])
- rs.append(eval_req)
- job_res_reqs = [get_overall_res_req(rs)]
- else:
- job_res_reqs = self.static_resource_req
-
- with Perf(metrics, "subworkflow adjust"):
- joborder_resolved = copy.deepcopy(joborder)
- joborder_keepmount = copy.deepcopy(joborder)
-
- reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
-
- mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
- "/keep/%s",
- "/keep/%s/%s")
-
- # For containers API, we need to make sure any extra
- # referenced files (ie referenced by the workflow but
- # not in the inputs) are included in the mounts.
- if self.wf_reffiles:
- runtimeContext = runtimeContext.copy()
- runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
-
- def keepmount(obj):
- remove_redundant_fields(obj)
- with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if "location" not in obj:
- raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
- with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).target
- if "listing" in obj:
- del obj["listing"]
- elif obj["location"].startswith("_:"):
- del obj["location"]
- else:
- raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
-
- visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
-
- def resolved(obj):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).resolved
-
- visit_class(joborder_resolved, ("File", "Directory"), resolved)
-
- if self.wf_pdh is None:
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
-
- wf_runner = cmap({
- "class": "CommandLineTool",
- "baseCommand": "cwltool",
- "inputs": self.tool["inputs"],
- "outputs": self.tool["outputs"],
- "stdout": "cwl.output.json",
- "requirements": self.requirements+job_res_reqs+[
- {"class": "InlineJavascriptRequirement"},
- {
- "class": "InitialWorkDirRequirement",
- "listing": [{
- "entryname": "workflow.cwl",
- "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
- }, {
- "entryname": "cwl.input.yml",
- "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
- }]
- }],
- "hints": self.hints,
- "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
- "id": "#"
- })
- return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+ def visit(item):
+ for t in ("hints", "requirements"):
+ if t not in item:
+ continue
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ dyn = False
+ for k in max_res_pars + sum_res_pars:
+ if k in req:
+ if isinstance(req[k], basestring):
+ if item["id"] == "#main":
+ # only the top-level requirements/hints may contain expressions
+ self.dynamic_resource_req.append(req)
+ dyn = True
+ break
+ else:
+ with SourceLine(req, k, WorkflowException):
+ raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+ if not dyn:
+ self.static_resource_req.append(req)
+
+ visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
+ if self.static_resource_req:
+ self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
+
+ upload_dependencies(self.arvrunner,
+ runtimeContext.name,
+ document_loader,
+ packed,
+ uri,
+ False)
+
+ # Discover files/directories referenced by the
+ # workflow (mainly "default" values)
+ visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
+ if self.dynamic_resource_req:
+ builder = Builder(joborder,
+ requirements=self.requirements,
+ hints=self.hints,
+ resources={})
+
+ # Evaluate dynamic resource requirements using current builder
+ rs = copy.copy(self.static_resource_req)
+ for dyn_rs in self.dynamic_resource_req:
+ eval_req = {"class": "ResourceRequirement"}
+ for a in max_res_pars + sum_res_pars:
+ if a in dyn_rs:
+ eval_req[a] = builder.do_eval(dyn_rs[a])
+ rs.append(eval_req)
+ job_res_reqs = [get_overall_res_req(rs)]
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)
+ job_res_reqs = self.static_resource_req
+
+ with Perf(metrics, "subworkflow adjust"):
+ joborder_resolved = copy.deepcopy(joborder)
+ joborder_keepmount = copy.deepcopy(joborder)
+
+ reffiles = []
+ visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
+
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
+
+ # For containers API, we need to make sure any extra
+ # referenced files (ie referenced by the workflow but
+ # not in the inputs) are included in the mounts.
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
+
+ def keepmount(obj):
+ remove_redundant_fields(obj)
+ with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if "location" not in obj:
+ raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
+ with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
+ if obj["location"].startswith("keep:"):
+ obj["location"] = mapper.mapper(obj["location"]).target
+ if "listing" in obj:
+ del obj["listing"]
+ elif obj["location"].startswith("_:"):
+ del obj["location"]
+ else:
+ raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
+
+ visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
+
+ def resolved(obj):
+ if obj["location"].startswith("keep:"):
+ obj["location"] = mapper.mapper(obj["location"]).resolved
+
+ visit_class(joborder_resolved, ("File", "Directory"), resolved)
+
+ if self.wf_pdh is None:
+ adjustFileObjs(packed, keepmount)
+ adjustDirObjs(packed, keepmount)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+
+ wf_runner = cmap({
+ "class": "CommandLineTool",
+ "baseCommand": "cwltool",
+ "inputs": self.tool["inputs"],
+ "outputs": self.tool["outputs"],
+ "stdout": "cwl.output.json",
+ "requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
+ {
+ "class": "InitialWorkDirRequirement",
+ "listing": [{
+ "entryname": "workflow.cwl",
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
+ }, {
+ "entryname": "cwl.input.yml",
+ "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ }]
+ }],
+ "hints": self.hints,
+ "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
+ "id": "#"
+ })
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 4e1334c1c..23e7b91a0 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -3,11 +3,14 @@
# SPDX-License-Identifier: Apache-2.0
from cwltool.context import LoadingContext, RuntimeContext
+from collections import namedtuple
class ArvLoadingContext(LoadingContext):
def __init__(self, kwargs=None):
super(ArvLoadingContext, self).__init__(kwargs)
+ClusterTarget = namedtuple("ClusterTarget", ("instance", "cluster_id", "owner_uuid"))
+
class ArvRuntimeContext(RuntimeContext):
def __init__(self, kwargs=None):
self.work_api = None
@@ -31,6 +34,7 @@ class ArvRuntimeContext(RuntimeContext):
self.current_container = None
self.http_timeout = 300
self.submit_runner_cluster = None
+ self.cluster_target = None
super(ArvRuntimeContext, self).__init__(kwargs)
commit d5196aa73549e5becfbcb982a65d7e3d632c7068
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 15:19:05 2018 -0400
14198: Resolve to Docker images to PDH and set "http://arvados.org/cwl#dockerCollectionPDH"
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 7e22a7d39..d9466a83a 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -214,8 +214,7 @@ class ArvadosContainer(JobBase):
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
- self.arvrunner.project_uuid,
- runtimeContext.submit_runner_cluster)
+ self.arvrunner.project_uuid)
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 6bca07c88..84006b47d 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -21,6 +21,9 @@ cached_lookups_lock = threading.Lock()
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+ if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
+ return dockerRequirement["http://arvados.org/cwl#dockerCollectionPDH"]
+
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement = copy.deepcopy(dockerRequirement)
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index bbfb8ffc6..6eaa4b92c 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -37,6 +37,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
class RuntimeStatusLoggingHandler(logging.Handler):
"""
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 3b40552ac..31a424d30 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -26,7 +26,7 @@ from cwltool.pack import pack
import arvados.collection
import ruamel.yaml as yaml
-from .arvdocker import arv_docker_get_image
+import arvdocker
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
@@ -215,9 +215,9 @@ def upload_docker(arvrunner, tool):
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
else:
- arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+ arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
@@ -244,6 +244,9 @@ def packed_workflow(arvrunner, tool, merged_map):
v["location"] = merged_map[cur_id].resolved[v["location"]]
if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ if v.get("class") == "DockerRequirement":
+ img = v.get("dockerImageId") or v.get("dockerPull")
+ v["http://arvados.org/cwl#dockerCollectionPDH"] = arvdocker.cached_lookups[img]
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
@@ -324,7 +327,7 @@ def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ return arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
commit 523cb8d312a4683eba66fb38b35ddea762381b31
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 15:03:23 2018 -0400
14198: Add genericFederatedRequestHandler support for links
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/controller/federation.go b/lib/controller/federation.go
index 0e016f301..8ad34157c 100644
--- a/lib/controller/federation.go
+++ b/lib/controller/federation.go
@@ -28,6 +28,7 @@ var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz
var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var linksRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "links", "o0j2j"))
func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, context.CancelFunc, error) {
remote, ok := h.Cluster.RemoteClusters[remoteID]
@@ -90,9 +91,12 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
containerRequestsHandler := &genericFederatedRequestHandler{next, h, containerRequestsRe,
[]federatedRequestDelegate{remoteContainerRequestCreate}}
+ linksHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
mux.Handle("/arvados/v1/workflows", wfHandler)
mux.Handle("/arvados/v1/workflows/", wfHandler)
+ mux.Handle("/arvados/v1/links", linksHandler)
+ mux.Handle("/arvados/v1/links/", linksHandler)
mux.Handle("/arvados/v1/containers", containersHandler)
mux.Handle("/arvados/v1/containers/", containersHandler)
mux.Handle("/arvados/v1/container_requests", containerRequestsHandler)
commit 4443107224506875604655ea202fd94c4632a048
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 15:02:23 2018 -0400
14198: Refactor and add support for --submit-runner-cluster
Rename ArvCwlRunner to ArvCwlExecutor and move into its own file.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0866f69d6..63fc3ea47 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -10,19 +10,9 @@ import argparse
import logging
import os
import sys
-import threading
-import hashlib
-import copy
-import json
import re
-from functools import partial
import pkg_resources # part of setuptools
-import Queue
-import time
-import signal
-import thread
-from cwltool.errors import WorkflowException
import cwltool.main
import cwltool.workflow
import cwltool.process
@@ -36,23 +26,12 @@ from arvados.keep import KeepClient
from arvados.errors import ApiError
import arvados.commands._util as arv_cmd
-from .arvcontainer import ArvadosContainer, RunnerContainer
-from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
-from .pathmapper import NoFollowPathMapper
-from .task_queue import TaskQueue
-from .context import ArvLoadingContext, ArvRuntimeContext
-from .util import get_current_container
from ._version import __version__
+from .executor import ArvCwlExecutor
-from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.command_line_tool import compute_checksums
from arvados.api import OrderedJsonModel
@@ -66,677 +45,6 @@ arvados.log_handler.setFormatter(logging.Formatter(
DEFAULT_PRIORITY = 500
-class RuntimeStatusLoggingHandler(logging.Handler):
- """
- Intercepts logging calls and report them as runtime statuses on runner
- containers.
- """
- def __init__(self, runtime_status_update_func):
- super(RuntimeStatusLoggingHandler, self).__init__()
- self.runtime_status_update = runtime_status_update_func
-
- def emit(self, record):
- kind = None
- if record.levelno >= logging.ERROR:
- kind = 'error'
- elif record.levelno >= logging.WARNING:
- kind = 'warning'
- if kind is not None:
- log_msg = record.getMessage()
- if '\n' in log_msg:
- # If the logged message is multi-line, use its first line as status
- # and the rest as detail.
- status, detail = log_msg.split('\n', 1)
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, status),
- detail
- )
- else:
- self.runtime_status_update(
- kind,
- "%s: %s" % (record.name, record.getMessage())
- )
-
-class ArvCwlRunner(object):
- """Execute a CWL tool or workflow, submit work (using either jobs or
- containers API), wait for them to complete, and report output.
-
- """
-
- def __init__(self, api_client,
- arvargs=None,
- keep_client=None,
- num_retries=4,
- thread_count=4):
-
- if arvargs is None:
- arvargs = argparse.Namespace()
- arvargs.work_api = None
- arvargs.output_name = None
- arvargs.output_tags = None
- arvargs.thread_count = 1
-
- self.api = api_client
- self.processes = {}
- self.workflow_eval_lock = threading.Condition(threading.RLock())
- self.final_output = None
- self.final_status = None
- self.num_retries = num_retries
- self.uuid = None
- self.stop_polling = threading.Event()
- self.poll_api = None
- self.pipeline = None
- self.final_output_collection = None
- self.output_name = arvargs.output_name
- self.output_tags = arvargs.output_tags
- self.project_uuid = None
- self.intermediate_output_ttl = 0
- self.intermediate_output_collections = []
- self.trash_intermediate = False
- self.thread_count = arvargs.thread_count
- self.poll_interval = 12
- self.loadingContext = None
-
- if keep_client is not None:
- self.keep_client = keep_client
- else:
- self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
-
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
-
- self.fetcher_constructor = partial(CollectionFetcher,
- api_client=self.api,
- fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
- num_retries=self.num_retries)
-
- self.work_api = None
- expected_api = ["jobs", "containers"]
- for api in expected_api:
- try:
- methods = self.api._rootDesc.get('resources')[api]['methods']
- if ('httpMethod' in methods['create'] and
- (arvargs.work_api == api or arvargs.work_api is None)):
- self.work_api = api
- break
- except KeyError:
- pass
-
- if not self.work_api:
- if arvargs.work_api is None:
- raise Exception("No supported APIs")
- else:
- raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
-
- if self.work_api == "jobs":
- logger.warn("""
-*******************************
-Using the deprecated 'jobs' API.
-
-To get rid of this warning:
-
-Users: read about migrating at
-http://doc.arvados.org/user/cwl/cwl-style.html#migrate
-and use the option --api=containers
-
-Admins: configure the cluster to disable the 'jobs' API as described at:
-http://doc.arvados.org/install/install-api-server.html#disable_api_methods
-*******************************""")
-
- self.loadingContext = ArvLoadingContext(vars(arvargs))
- self.loadingContext.fetcher_constructor = self.fetcher_constructor
- self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
- self.loadingContext.construct_tool_object = self.arv_make_tool
-
- # Add a custom logging handler to the root logger for runtime status reporting
- # if running inside a container
- if get_current_container(self.api, self.num_retries, logger):
- root_logger = logging.getLogger('')
- handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
- root_logger.addHandler(handler)
-
- def arv_make_tool(self, toolpath_object, loadingContext):
- if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, loadingContext)
- elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
- return ArvadosWorkflow(self, toolpath_object, loadingContext)
- else:
- return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
-
- def output_callback(self, out, processStatus):
- with self.workflow_eval_lock:
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- state = "Complete"
- else:
- logger.error("Overall process status is %s", processStatus)
- state = "Failed"
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": state}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
- self.workflow_eval_lock.notifyAll()
-
-
- def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
-
- def process_submitted(self, container):
- with self.workflow_eval_lock:
- self.processes[container.uuid] = container
-
- def process_done(self, uuid, record):
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
- del self.processes[uuid]
-
- def runtime_status_update(self, kind, message, detail=None):
- """
- Updates the runtime_status field on the runner container.
- Called when there's a need to report errors, warnings or just
- activity statuses, for example in the RuntimeStatusLoggingHandler.
- """
- with self.workflow_eval_lock:
- current = get_current_container(self.api, self.num_retries, logger)
- if current is None:
- return
- runtime_status = current.get('runtime_status', {})
- # In case of status being an error, only report the first one.
- if kind == 'error':
- if not runtime_status.get('error'):
- runtime_status.update({
- 'error': message
- })
- if detail is not None:
- runtime_status.update({
- 'errorDetail': detail
- })
- # Further errors are only mentioned as a count.
- else:
- # Get anything before an optional 'and N more' string.
- try:
- error_msg = re.match(
- r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
- more_failures = re.match(
- r'.*\(and (\d+) more\)', runtime_status.get('error'))
- except TypeError:
- # Ignore tests stubbing errors
- return
- if more_failures:
- failure_qty = int(more_failures.groups()[0])
- runtime_status.update({
- 'error': "%s (and %d more)" % (error_msg, failure_qty+1)
- })
- else:
- runtime_status.update({
- 'error': "%s (and 1 more)" % error_msg
- })
- elif kind in ['warning', 'activity']:
- # Record the last warning/activity status without regard of
- # previous occurences.
- runtime_status.update({
- kind: message
- })
- if detail is not None:
- runtime_status.update({
- kind+"Detail": detail
- })
- else:
- # Ignore any other status kind
- return
- try:
- self.api.containers().update(uuid=current['uuid'],
- body={
- 'runtime_status': runtime_status,
- }).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Couldn't update runtime_status: %s", e)
-
- def wrapped_callback(self, cb, obj, st):
- with self.workflow_eval_lock:
- cb(obj, st)
- self.workflow_eval_lock.notifyAll()
-
- def get_wrapped_callback(self, cb):
- return partial(self.wrapped_callback, cb)
-
- def on_message(self, event):
- if event.get("object_uuid") in self.processes and event["event_type"] == "update":
- uuid = event["object_uuid"]
- if event["properties"]["new_attributes"]["state"] == "Running":
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- if j.running is False:
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
- logger.info("%s %s is Running", self.label(j), uuid)
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- self.process_done(uuid, event["properties"]["new_attributes"])
-
- def label(self, obj):
- return "[%s %s]" % (self.work_api[0:-1], obj.name)
-
- def poll_states(self):
- """Poll status of jobs or containers listed in the processes dict.
-
- Runs in a separate thread.
- """
-
- try:
- remain_wait = self.poll_interval
- while True:
- if remain_wait > 0:
- self.stop_polling.wait(remain_wait)
- if self.stop_polling.is_set():
- break
- with self.workflow_eval_lock:
- keys = list(self.processes.keys())
- if not keys:
- remain_wait = self.poll_interval
- continue
-
- begin_poll = time.time()
- if self.work_api == "containers":
- table = self.poll_api.container_requests()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
-
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- remain_wait = self.poll_interval
- continue
-
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
- finish_poll = time.time()
- remain_wait = self.poll_interval - (finish_poll - begin_poll)
- except:
- logger.exception("Fatal error in state polling thread.")
- with self.workflow_eval_lock:
- self.processes.clear()
- self.workflow_eval_lock.notifyAll()
- finally:
- self.stop_polling.set()
-
- def add_intermediate_output(self, uuid):
- if uuid:
- self.intermediate_output_collections.append(uuid)
-
- def trash_intermediate_output(self):
- logger.info("Cleaning up intermediate output collections")
- for i in self.intermediate_output_collections:
- try:
- self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
- except:
- logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
- break
-
- def check_features(self, obj):
- if isinstance(obj, dict):
- if obj.get("writable") and self.work_api != "containers":
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
- if obj.get("class") == "DockerRequirement":
- if obj.get("dockerOutputDirectory"):
- if self.work_api != "containers":
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
- if not obj.get("dockerOutputDirectory").startswith('/'):
- raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
- "Option 'dockerOutputDirectory' must be an absolute path.")
- if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
- raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
- for v in obj.itervalues():
- self.check_features(v)
- elif isinstance(obj, list):
- for i,v in enumerate(obj):
- with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
- self.check_features(v)
-
- def make_output_collection(self, name, storage_classes, tagsString, outputObj):
- outputObj = copy.deepcopy(outputObj)
-
- files = []
- def capture(fileobj):
- files.append(fileobj)
-
- adjustDirObjs(outputObj, capture)
- adjustFileObjs(outputObj, capture)
-
- generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
-
- final = arvados.collection.Collection(api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
-
- for k,v in generatemapper.items():
- if k.startswith("_:"):
- if v.type == "Directory":
- continue
- if v.type == "CreateFile":
- with final.open(v.target, "wb") as f:
- f.write(v.resolved.encode("utf-8"))
- continue
-
- if not k.startswith("keep:"):
- raise Exception("Output source is not in keep or a literal")
- sp = k.split("/")
- srccollection = sp[0][5:]
- try:
- reader = self.collection_cache.get(srccollection)
- srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
- final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- except IOError as e:
- logger.warn("While preparing output collection: %s", e)
-
- def rewrite(fileobj):
- fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
- if k in fileobj:
- del fileobj[k]
-
- adjustDirObjs(outputObj, rewrite)
- adjustFileObjs(outputObj, rewrite)
-
- with final.open("cwl.output.json", "w") as f:
- json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
-
- final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
-
- logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
- final.api_response()["name"],
- final.manifest_locator())
-
- final_uuid = final.manifest_locator()
- tags = tagsString.split(',')
- for tag in tags:
- self.api.links().create(body={
- "head_uuid": final_uuid, "link_class": "tag", "name": tag
- }).execute(num_retries=self.num_retries)
-
- def finalcollection(fileobj):
- fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
-
- adjustDirObjs(outputObj, finalcollection)
- adjustFileObjs(outputObj, finalcollection)
-
- return (outputObj, final)
-
- def set_crunch_output(self):
- if self.work_api == "containers":
- current = get_current_container(self.api, self.num_retries, logger)
- if current is None:
- return
- try:
- self.api.containers().update(uuid=current['uuid'],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- }).execute(num_retries=self.num_retries)
- self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
- body={
- 'is_trashed': True
- }).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.info("Setting container output: %s", e)
- elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
- self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
- body={
- 'output': self.final_output_collection.portable_data_hash(),
- 'success': self.final_status == "success",
- 'progress':1.0
- }).execute(num_retries=self.num_retries)
-
- def arv_executor(self, tool, job_order, runtimeContext, logger=None):
- self.debug = runtimeContext.debug
-
- tool.visit(self.check_features)
-
- self.project_uuid = runtimeContext.project_uuid
- self.pipeline = None
- self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
- self.secret_store = runtimeContext.secret_store
-
- self.trash_intermediate = runtimeContext.trash_intermediate
- if self.trash_intermediate and self.work_api != "containers":
- raise Exception("--trash-intermediate is only supported with --api=containers.")
-
- self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
- if self.intermediate_output_ttl and self.work_api != "containers":
- raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
- if self.intermediate_output_ttl < 0:
- raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
-
- if runtimeContext.submit_request_uuid and self.work_api != "containers":
- raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
-
- if not runtimeContext.name:
- runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
-
- # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
- # Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
-
- # Reload tool object which may have been updated by
- # upload_workflow_deps
- # Don't validate this time because it will just print redundant errors.
- loadingContext = self.loadingContext.copy()
- loadingContext.loader = tool.doc_loader
- loadingContext.avsc_names = tool.doc_schema
- loadingContext.metadata = tool.metadata
- loadingContext.do_validate = False
-
- tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
-
- # Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- tool, job_order)
-
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
- # Create a pipeline template or workflow record and exit.
- if self.work_api == "jobs":
- tmpl = RunnerTemplate(self, tool, job_order,
- runtimeContext.enable_reuse,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map)
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return (tmpl.uuid, "success")
- elif self.work_api == "containers":
- return (upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map),
- "success")
-
- self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
- self.eval_timeout = runtimeContext.eval_timeout
-
- runtimeContext = runtimeContext.copy()
- runtimeContext.use_container = True
- runtimeContext.tmpdir_prefix = "tmp"
- runtimeContext.work_api = self.work_api
-
- if self.work_api == "containers":
- if self.ignore_docker_for_reuse:
- raise Exception("--ignore-docker-for-reuse not supported with containers API.")
- runtimeContext.outdir = "/var/spool/cwl"
- runtimeContext.docker_outdir = "/var/spool/cwl"
- runtimeContext.tmpdir = "/tmp"
- runtimeContext.docker_tmpdir = "/tmp"
- elif self.work_api == "jobs":
- if runtimeContext.priority != DEFAULT_PRIORITY:
- raise Exception("--priority not implemented for jobs API.")
- runtimeContext.outdir = "$(task.outdir)"
- runtimeContext.docker_outdir = "$(task.outdir)"
- runtimeContext.tmpdir = "$(task.tmpdir)"
-
- if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
- raise Exception("--priority must be in the range 1..1000.")
-
- runnerjob = None
- if runtimeContext.submit:
- # Submit a runner job to run the workflow for us.
- if self.work_api == "containers":
- if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
- runtimeContext.runnerjob = tool.tool["id"]
- runnerjob = tool.job(job_order,
- self.output_callback,
- runtimeContext).next()
- else:
- runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
- merged_map=merged_map,
- priority=runtimeContext.priority,
- secret_store=self.secret_store)
- elif self.work_api == "jobs":
- runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
- self.output_name,
- self.output_tags,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- on_error=runtimeContext.on_error,
- submit_runner_image=runtimeContext.submit_runner_image,
- merged_map=merged_map)
- elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
- # Create pipeline for local run
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
-
- if runnerjob and not runtimeContext.wait:
- submitargs = runtimeContext.copy()
- submitargs.submit = False
- runnerjob.run(submitargs)
- return (runnerjob.uuid, "success")
-
- self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
- self.polling_thread = threading.Thread(target=self.poll_states)
- self.polling_thread.start()
-
- self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
-
- if runnerjob:
- jobiter = iter((runnerjob,))
- else:
- if runtimeContext.cwl_runner_job is not None:
- self.uuid = runtimeContext.cwl_runner_job.get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- runtimeContext)
-
- try:
- self.workflow_eval_lock.acquire()
- # Holds the lock while this code runs and releases it when
- # it is safe to do so in self.workflow_eval_lock.wait(),
- # at which point on_message can update job state and
- # process output callbacks.
-
- loopperf = Perf(metrics, "jobiter")
- loopperf.__enter__()
- for runnable in jobiter:
- loopperf.__exit__()
-
- if self.stop_polling.is_set():
- break
-
- if self.task_queue.error is not None:
- raise self.task_queue.error
-
- if runnable:
- with Perf(metrics, "run"):
- self.start_run(runnable, runtimeContext)
- else:
- if (self.task_queue.in_flight + len(self.processes)) > 0:
- self.workflow_eval_lock.wait(3)
- else:
- logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
- break
- loopperf.__enter__()
- loopperf.__exit__()
-
- while (self.task_queue.in_flight + len(self.processes)) > 0:
- if self.task_queue.error is not None:
- raise self.task_queue.error
- self.workflow_eval_lock.wait(3)
-
- except UnsupportedRequirement:
- raise
- except:
- if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
- logger.error("Interrupted, workflow will be cancelled")
- else:
- logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- if runnerjob and runnerjob.uuid and self.work_api == "containers":
- self.api.container_requests().update(uuid=runnerjob.uuid,
- body={"priority": "0"}).execute(num_retries=self.num_retries)
- finally:
- self.workflow_eval_lock.release()
- self.task_queue.drain()
- self.stop_polling.set()
- self.polling_thread.join()
- self.task_queue.join()
-
- if self.final_status == "UnsupportedRequirement":
- raise UnsupportedRequirement("Check log for details.")
-
- if self.final_output is None:
- raise WorkflowException("Workflow did not return a result.")
-
- if runtimeContext.submit and isinstance(runnerjob, Runner):
- logger.info("Final output collection %s", runnerjob.final_output)
- else:
- if self.output_name is None:
- self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- if self.output_tags is None:
- self.output_tags = ""
-
- storage_classes = runtimeContext.storage_classes.strip().split(",")
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
- self.set_crunch_output()
-
- if runtimeContext.compute_checksum:
- adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
- adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
- if self.trash_intermediate and self.final_status == "success":
- self.trash_intermediate_output()
-
- return (self.final_output, self.final_status)
-
-
def versionstring():
"""Print version string of key packages for provenance and debugging."""
@@ -831,9 +139,13 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
- parser.add_argument("--submit-request-uuid", type=str,
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--submit-request-uuid", type=str,
default=None,
- help="Update and commit supplied container request instead of creating a new one (containers API only).")
+ help="Update and commit to supplied container request instead of creating a new one (containers API only).")
+ exgroup.add_argument("--submit-runner-cluster", type=str,
+ help="Submit toplevel runner to a remote cluster (containers API only)",
+ default=None)
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
@@ -942,6 +254,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
add_arv_hints()
+ for key, val in cwltool.argparser.get_default_args().items():
+ if not hasattr(arvargs, key):
+ setattr(arvargs, key, val)
+
try:
if api_client is None:
api_client = arvados.safeapi.ThreadSafeApiCache(
@@ -952,7 +268,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
api_client.users().current().execute()
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
+ executor = ArvCwlExecutor(api_client, arvargs, keep_client=keep_client, num_retries=4)
except Exception as e:
logger.error(e)
return 1
@@ -977,22 +293,13 @@ def main(args, stdout, stderr, api_client=None, keep_client=None,
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
- for key, val in cwltool.argparser.get_default_args().items():
- if not hasattr(arvargs, key):
- setattr(arvargs, key, val)
-
- runtimeContext = ArvRuntimeContext(vars(arvargs))
- runtimeContext.make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
- runtimeContext.http_timeout = arvargs.http_timeout
-
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
- executor=runner.arv_executor,
+ executor=executor.arv_executor,
versionfunc=versionstring,
job_order_object=job_order_object,
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
- loadingContext=runner.loadingContext,
- runtimeContext=runtimeContext)
+ loadingContext=executor.loadingContext,
+ runtimeContext=executor.runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b46711af4..7e22a7d39 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -212,9 +212,10 @@ class ArvadosContainer(JobBase):
docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
- docker_req,
- runtimeContext.pull_image,
- self.arvrunner.project_uuid)
+ docker_req,
+ runtimeContext.pull_image,
+ self.arvrunner.project_uuid,
+ runtimeContext.submit_runner_cluster)
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
@@ -490,14 +491,20 @@ class RunnerContainer(Runner):
if self.arvrunner.project_uuid:
job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ extra_submit_params = {}
+ if runtimeContext.submit_runner_cluster:
+ extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
+
if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
- body=job_spec
+ body=job_spec,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
else:
response = self.arvrunner.api.container_requests().create(
- body=job_spec
+ body=job_spec,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 48a3edec5..4e1334c1c 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -30,5 +30,9 @@ class ArvRuntimeContext(RuntimeContext):
self.storage_classes = "default"
self.current_container = None
self.http_timeout = 300
+ self.submit_runner_cluster = None
super(ArvRuntimeContext, self).__init__(kwargs)
+
+ if self.submit_request_uuid:
+ self.submit_runner_cluster = self.submit_request_uuid[0:5]
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/executor.py
similarity index 69%
copy from sdk/cwl/arvados_cwl/__init__.py
copy to sdk/cwl/arvados_cwl/executor.py
index 0866f69d6..bbfb8ffc6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -1,43 +1,26 @@
-#!/usr/bin/env python
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-# Implement cwl-runner interface for submitting and running work on Arvados, using
-# either the Crunch jobs API or Crunch containers API.
-
import argparse
import logging
import os
import sys
import threading
-import hashlib
import copy
import json
import re
from functools import partial
-import pkg_resources # part of setuptools
-import Queue
import time
-import signal
-import thread
from cwltool.errors import WorkflowException
-import cwltool.main
import cwltool.workflow
-import cwltool.process
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
-import cwltool.argparser
import arvados
import arvados.config
from arvados.keep import KeepClient
from arvados.errors import ApiError
-import arvados.commands._util as arv_cmd
-from .arvcontainer import ArvadosContainer, RunnerContainer
-from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from .arvcontainer import RunnerContainer
+from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
@@ -49,22 +32,11 @@ from .context import ArvLoadingContext, ArvRuntimeContext
from .util import get_current_container
from ._version import __version__
-from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.command_line_tool import compute_checksums
-from arvados.api import OrderedJsonModel
-
logger = logging.getLogger('arvados.cwl-runner')
-metrics = logging.getLogger('arvados.cwl-runner.metrics')
-logger.setLevel(logging.INFO)
-
-arvados.log_handler.setFormatter(logging.Formatter(
- '%(asctime)s %(name)s %(levelname)s: %(message)s',
- '%Y-%m-%d %H:%M:%S'))
-
-DEFAULT_PRIORITY = 500
class RuntimeStatusLoggingHandler(logging.Handler):
"""
@@ -98,7 +70,7 @@ class RuntimeStatusLoggingHandler(logging.Handler):
"%s: %s" % (record.name, record.getMessage())
)
-class ArvCwlRunner(object):
+class ArvCwlExecutor(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
@@ -195,6 +167,11 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
+ self.runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ collection_cache=self.collection_cache)
+
+
def arv_make_tool(self, toolpath_object, loadingContext):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, loadingContext)
@@ -735,264 +712,3 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
self.trash_intermediate_output()
return (self.final_output, self.final_status)
-
-
-def versionstring():
- """Print version string of key packages for provenance and debugging."""
-
- arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
- arvpkg = pkg_resources.require("arvados-python-client")
- cwlpkg = pkg_resources.require("cwltool")
-
- return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
- "arvados-python-client", arvpkg[0].version,
- "cwltool", cwlpkg[0].version)
-
-
-def arg_parser(): # type: () -> argparse.ArgumentParser
- parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
-
- parser.add_argument("--basedir", type=str,
- help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
- parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
- help="Output directory, default current directory")
-
- parser.add_argument("--eval-timeout",
- help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
- type=float,
- default=20)
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--print-dot", action="store_true",
- help="Print workflow visualization in graphviz format and exit")
- exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
- exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--verbose", action="store_true", help="Default logging")
- exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
- exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
-
- parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
-
- parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--enable-reuse", action="store_true",
- default=True, dest="enable_reuse",
- help="Enable job or container reuse (default)")
- exgroup.add_argument("--disable-reuse", action="store_false",
- default=True, dest="enable_reuse",
- help="Disable job or container reuse")
-
- parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
- parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
- parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
- parser.add_argument("--ignore-docker-for-reuse", action="store_true",
- help="Ignore Docker image version when deciding whether to reuse past jobs.",
- default=False)
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
- default=True, dest="submit")
- exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
- default=True, dest="submit")
- exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
- dest="create_workflow")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
- exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
- default=True, dest="wait")
- exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
- default=True, dest="wait")
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
- default=True, dest="log_timestamps")
- exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
- default=True, dest="log_timestamps")
-
- parser.add_argument("--api", type=str,
- default=None, dest="work_api",
- choices=("jobs", "containers"),
- help="Select work submission API. Default is 'jobs' if that API is available, otherwise 'containers'.")
-
- parser.add_argument("--compute-checksum", action="store_true", default=False,
- help="Compute checksum of contents while collecting outputs",
- dest="compute_checksum")
-
- parser.add_argument("--submit-runner-ram", type=int,
- help="RAM (in MiB) required for the workflow runner job (default 1024)",
- default=None)
-
- parser.add_argument("--submit-runner-image", type=str,
- help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
- default=None)
-
- parser.add_argument("--submit-request-uuid", type=str,
- default=None,
- help="Update and commit supplied container request instead of creating a new one (containers API only).")
-
- parser.add_argument("--name", type=str,
- help="Name to use for workflow execution instance.",
- default=None)
-
- parser.add_argument("--on-error", type=str,
- help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
- "Default is 'continue'.", default="continue", choices=("stop", "continue"))
-
- parser.add_argument("--enable-dev", action="store_true",
- help="Enable loading and running development versions "
- "of CWL spec.", default=False)
- parser.add_argument('--storage-classes', default="default", type=str,
- help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
-
- parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
- help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
- default=0)
-
- parser.add_argument("--priority", type=int,
- help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
- default=DEFAULT_PRIORITY)
-
- parser.add_argument("--disable-validate", dest="do_validate",
- action="store_false", default=True,
- help=argparse.SUPPRESS)
-
- parser.add_argument("--disable-js-validation",
- action="store_true", default=False,
- help=argparse.SUPPRESS)
-
- parser.add_argument("--thread-count", type=int,
- default=4, help="Number of threads to use for job submit and output collection.")
-
- parser.add_argument("--http-timeout", type=int,
- default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
-
- exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--trash-intermediate", action="store_true",
- default=False, dest="trash_intermediate",
- help="Immediately trash intermediate outputs on workflow success.")
- exgroup.add_argument("--no-trash-intermediate", action="store_false",
- default=False, dest="trash_intermediate",
- help="Do not trash intermediate outputs (default).")
-
- parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
- parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
-
- return parser
-
-def add_arv_hints():
- cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
- cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
- res.close()
- cwltool.process.supportedProcessRequirements.extend([
- "http://arvados.org/cwl#RunInSingleContainer",
- "http://arvados.org/cwl#OutputDirType",
- "http://arvados.org/cwl#RuntimeConstraints",
- "http://arvados.org/cwl#PartitionRequirement",
- "http://arvados.org/cwl#APIRequirement",
- "http://commonwl.org/cwltool#LoadListingRequirement",
- "http://arvados.org/cwl#IntermediateOutput",
- "http://arvados.org/cwl#ReuseRequirement",
- "http://arvados.org/cwl#ClusterTarget"
- ])
-
-def exit_signal_handler(sigcode, frame):
- logger.error("Caught signal {}, exiting.".format(sigcode))
- sys.exit(-sigcode)
-
-def main(args, stdout, stderr, api_client=None, keep_client=None,
- install_sig_handlers=True):
- parser = arg_parser()
-
- job_order_object = None
- arvargs = parser.parse_args(args)
-
- if len(arvargs.storage_classes.strip().split(',')) > 1:
- logger.error("Multiple storage classes are not supported currently.")
- return 1
-
- arvargs.use_container = True
- arvargs.relax_path_checks = True
- arvargs.print_supported_versions = False
-
- if install_sig_handlers:
- arv_cmd.install_signal_handlers()
-
- if arvargs.update_workflow:
- if arvargs.update_workflow.find('-7fd4e-') == 5:
- want_api = 'containers'
- elif arvargs.update_workflow.find('-p5p6p-') == 5:
- want_api = 'jobs'
- else:
- want_api = None
- if want_api and arvargs.work_api and want_api != arvargs.work_api:
- logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
- arvargs.update_workflow, want_api, arvargs.work_api))
- return 1
- arvargs.work_api = want_api
-
- if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
- job_order_object = ({}, "")
-
- add_arv_hints()
-
- try:
- if api_client is None:
- api_client = arvados.safeapi.ThreadSafeApiCache(
- api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
- keep_params={"num_retries": 4})
- keep_client = api_client.keep
- # Make an API object now so errors are reported early.
- api_client.users().current().execute()
- if keep_client is None:
- keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
- runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
- except Exception as e:
- logger.error(e)
- return 1
-
- if arvargs.debug:
- logger.setLevel(logging.DEBUG)
- logging.getLogger('arvados').setLevel(logging.DEBUG)
-
- if arvargs.quiet:
- logger.setLevel(logging.WARN)
- logging.getLogger('arvados').setLevel(logging.WARN)
- logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
- if arvargs.metrics:
- metrics.setLevel(logging.DEBUG)
- logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
-
- if arvargs.log_timestamps:
- arvados.log_handler.setFormatter(logging.Formatter(
- '%(asctime)s %(name)s %(levelname)s: %(message)s',
- '%Y-%m-%d %H:%M:%S'))
- else:
- arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
-
- for key, val in cwltool.argparser.get_default_args().items():
- if not hasattr(arvargs, key):
- setattr(arvargs, key, val)
-
- runtimeContext = ArvRuntimeContext(vars(arvargs))
- runtimeContext.make_fs_access = partial(CollectionFsAccess,
- collection_cache=runner.collection_cache)
- runtimeContext.http_timeout = arvargs.http_timeout
-
- return cwltool.main.main(args=arvargs,
- stdout=stdout,
- stderr=stderr,
- executor=runner.arv_executor,
- versionfunc=versionstring,
- job_order_object=job_order_object,
- logger_handler=arvados.log_handler,
- custom_schema_callback=add_arv_hints,
- loadingContext=runner.loadingContext,
- runtimeContext=runtimeContext)
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 41166c512..3b40552ac 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -324,10 +324,10 @@ def arvados_jobs_image(arvrunner, img):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ return arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
- return img
+
def upload_workflow_collection(arvrunner, name, packed):
collection = arvados.collection.Collection(api_client=arvrunner.api,
commit ad2c44b49893d3dd226a54b0a11a5951f68907c0
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Oct 25 10:36:47 2018 -0400
14198: Use PDH for container_image instead of docker repo+tag
Needed to support federated container requests.
This reverts 6ea807b2caf6c934f170b2e4d89c23c4a08ca69c
Based on the commit comment, that change was made to accomodate the
Docker v1 to v2 image format migration (to enable the API server to
select the image with the correct format). However, the API server
subsequently gained the ability to detect if it needed to substitute a
PDH with a migrated image PDH in commit
a72205728f94f5261b657766e01f5767dc15d4b5 so now we want restore the
original behavior of locally resolving the image PDH and using that in
the container request.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 7508febb0..6bca07c88 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -31,7 +31,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
global cached_lookups_lock
with cached_lookups_lock:
if dockerRequirement["dockerImageId"] in cached_lookups:
- return dockerRequirement["dockerImageId"]
+ return cached_lookups[dockerRequirement["dockerImageId"]]
with SourceLine(dockerRequirement, "dockerImageId", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
sp = dockerRequirement["dockerImageId"].split(":")
@@ -70,10 +70,12 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
+ pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+
with cached_lookups_lock:
- cached_lookups[dockerRequirement["dockerImageId"]] = True
+ cached_lookups[dockerRequirement["dockerImageId"]] = pdh
- return dockerRequirement["dockerImageId"]
+ return pdh
def arv_docker_clear_cache():
global cached_lookups
commit 3a4c8c8f88723941c86252a8c72be75204c6fe8e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Wed Oct 24 14:45:21 2018 -0400
14198: Initial support ClusterTarget hint
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 2e1ea50a3..0866f69d6 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -898,7 +898,8 @@ def add_arv_hints():
"http://arvados.org/cwl#APIRequirement",
"http://commonwl.org/cwltool#LoadListingRequirement",
"http://arvados.org/cwl#IntermediateOutput",
- "http://arvados.org/cwl#ReuseRequirement"
+ "http://arvados.org/cwl#ReuseRequirement",
+ "http://arvados.org/cwl#ClusterTarget"
])
def exit_signal_handler(sigcode, frame):
diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
index 4f762192a..94eaf9560 100644
--- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
+++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
@@ -232,4 +232,24 @@ $graph:
coresMin:
type: int?
doc: Minimum cores allocated to cwl-runner
- jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
\ No newline at end of file
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+
+- name: ClusterTarget
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify where a workflow step should run
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:ClusterTarget'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ clusterID:
+ type: string?
+ doc: The cluster to run the container
+ ownerUUID:
+ type: string?
+ doc: The project that will own the container requests and intermediate collections
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index b4d01019f..b46711af4 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -250,6 +250,15 @@ class ArvadosContainer(JobBase):
if self.timelimit is not None:
scheduling_parameters["max_run_time"] = self.timelimit
+ extra_submit_params = {}
+ cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
+ if cluster_target_req:
+ cluster_id = cluster_target_req.get("clusterID")
+ if cluster_id:
+ extra_submit_params["cluster_id"] = cluster_id
+ if cluster_target_req.get("ownerUUID"):
+ container_request["owner_uuid"] = cluster_target_req.get("ownerUUID")
+
container_request["output_name"] = "Output for step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
@@ -277,11 +286,13 @@ class ArvadosContainer(JobBase):
if runtimeContext.submit_request_uuid:
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
- body=container_request
+ body=container_request,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
else:
response = self.arvrunner.api.container_requests().create(
- body=container_request
+ body=container_request,
+ **extra_submit_params
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list