[ARVADOS] created: b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296
Git user
git at public.curoverse.com
Thu Jun 9 16:33:10 EDT 2016
at b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296 (commit)
commit b2f2eee87c7c5e93b9e439f32aa0dddc2b1a8296
Merge: 3333173 84b538b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 16:23:00 2016 -0400
Merge branch 'master' into 8442-cwl-crunch2
commit 3333173032fffece1a6b092ae307ae3fe6e8a2b0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 15:16:09 2016 -0400
8442: raise UnsupportedRequirement for unsupported features in the conformance
tests. Bump cwltool dependency.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 171d92d..af74808 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -31,7 +31,7 @@ from .arvcontainer import ArvadosContainer
from .arvjob import ArvadosJob
from .arvdocker import arv_docker_get_image
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps, UnsupportedRequirement
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
@@ -477,7 +477,7 @@ class ArvCwlRunner(object):
if kwargs.get("submit"):
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
@@ -543,11 +543,13 @@ class ArvCwlRunner(object):
self.cond.wait(1)
events.close()
+ except UnsupportedRequirement:
+ raise
except:
if sys.exc_info()[0] is KeyboardInterrupt:
logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0cb885f..17fe8cb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -4,6 +4,7 @@ from cwltool.process import get_feature, adjustFiles
from .arvdocker import arv_docker_get_image
from . import done
from cwltool.errors import WorkflowException
+from cwltool.process import UnsupportedRequirement
logger = logging.getLogger('arvados.cwl-runner')
@@ -42,6 +43,8 @@ class ArvadosContainer(object):
}
if self.generatefiles:
+ raise UnsupportedRequirement("Stdin redirection currently not suppported")
+
vwd = arvados.collection.Collection()
container_request["task.vwd"] = {}
for t in self.generatefiles:
@@ -60,13 +63,12 @@ class ArvadosContainer(object):
if self.environment:
container_request["environment"].update(self.environment)
- # TODO, not supported by crunchv2 yet
- #if self.stdin:
- # container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+ if self.stdin:
+ raise UnsupportedRequirement("Stdin redirection currently not suppported")
if self.stdout:
mounts["stdout"] = {"kind": "file",
- "path": self.stdout}
+ "path": "/var/spool/cwl/%s" % (self.stdout)}
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if not docker_req:
diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py
index 591bdde..b1ff7f3 100644
--- a/sdk/cwl/setup.py
+++ b/sdk/cwl/setup.py
@@ -30,7 +30,7 @@ setup(name='arvados-cwl-runner',
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool==1.0.20160519182434',
+ 'cwltool==1.0.20160609160402',
'arvados-python-client>=0.1.20160322001610'
],
data_files=[
commit 9cb3fb78406631a68533a85e8b1c8364d5ed900f
Merge: 1bcd619 427da80
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 10:58:00 2016 -0400
Merge branch 'master' into 8442-cwl-crunch2
commit 1bcd61945d2265dae09cb402b393f46bcc800be3
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Thu Jun 9 08:21:34 2016 -0400
8442: Setting up mount points works. Capturing output works.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 387bc4a..171d92d 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,6 +28,7 @@ from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
import urlparse
from .arvcontainer import ArvadosContainer
+from .arvjob import ArvadosJob
from .arvdocker import arv_docker_get_image
from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
@@ -36,10 +37,6 @@ from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
logger.setLevel(logging.INFO)
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
@@ -98,194 +95,6 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
else:
return os.path.exists(self._abs(fn))
-class ArvadosJob(object):
- """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
- def __init__(self, runner):
- self.arvrunner = runner
- self.running = False
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- if self.generatefiles:
- vwd = arvados.collection.Collection()
- script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t])
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
- script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
- else:
- runtime_constraints["docker_image"] = "arvados/jobs"
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- filters = [["repository", "=", "arvados"],
- ["script", "=", "crunchrunner"],
- ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
- if not self.arvrunner.ignore_docker_for_reuse:
- filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
- try:
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=kwargs.get("enable_reuse", True)
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- self.update_pipeline_component(response)
-
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
- body={
- "components": components
- }).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- try:
- outputs = {}
- if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
- except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
- processStatus = "permanentFail"
-
- self.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
class RunnerJob(object):
@@ -574,10 +383,16 @@ class ArvadosCommandTool(CommandLineTool):
return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
+ if self.crunch2:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "/keep/%s",
+ "/keep/%s/%s",
+ **kwargs)
+ else:
+ return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ "$(task.keep)/%s",
+ "$(task.keep)/%s/%s",
+ **kwargs)
class ArvCwlRunner(object):
@@ -676,7 +491,10 @@ class ArvCwlRunner(object):
runnerjob.run()
return runnerjob.uuid
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ if self.crunch2:
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
+ else:
+ events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
self.debug = kwargs.get("debug")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 2179398..0cb885f 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,7 +1,9 @@
import logging
import arvados.collection
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles
from .arvdocker import arv_docker_get_image
+from . import done
+from cwltool.errors import WorkflowException
logger = logging.getLogger('arvados.cwl-runner')
@@ -12,6 +14,9 @@ class ArvadosContainer(object):
self.arvrunner = runner
self.running = False
+ def update_pipeline_component(self, r):
+ pass
+
def run(self, dry_run=False, pull_image=True, **kwargs):
container_request = {
"command": self.command_line,
@@ -26,13 +31,15 @@ class ArvadosContainer(object):
mounts = {
"/var/spool/cwl": {
"kind": "tmp"
- },
- "/tmp": {
- "kind": "tmp"
}
}
- # TODO mount normal inputs...
+ for f in self.pathmapper.files():
+ _, p = self.pathmapper.mapper(f)
+ mounts[p] = {
+ "kind": "collection",
+ "portable_data_hash": p[6:]
+ }
if self.generatefiles:
vwd = arvados.collection.Collection()
@@ -53,7 +60,7 @@ class ArvadosContainer(object):
if self.environment:
container_request["environment"].update(self.environment)
- # TODO, not supported
+ # TODO, not supported by crunchv2 yet
#if self.stdin:
# container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
@@ -84,11 +91,11 @@ class ArvadosContainer(object):
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
- self.arvrunner.jobs[response["uuid"]] = self
+ self.arvrunner.jobs[response["container_uuid"]] = self
- logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+ logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
- if response["state"] in ("Complete", "Cancelled"):
+ if response["state"] == "Final":
self.done(response)
except Exception as e:
logger.error("Got error %s" % str(e))
@@ -104,69 +111,9 @@ class ArvadosContainer(object):
try:
outputs = {}
if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
+ outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
except Exception as e:
logger.exception("Got unknown exception while collecting job outputs:")
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
new file mode 100644
index 0000000..88a8eeb
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -0,0 +1,166 @@
+import logging
+import re
+from . import done
+from .arvdocker import arv_docker_get_image
+from cwltool.process import get_feature
+from cwltool.errors import WorkflowException
+import arvados.collection
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+class ArvadosJob(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+ def __init__(self, runner):
+ self.arvrunner = runner
+ self.running = False
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ script_parameters = {
+ "command": self.command_line
+ }
+ runtime_constraints = {}
+
+ if self.generatefiles:
+ vwd = arvados.collection.Collection()
+ script_parameters["task.vwd"] = {}
+ for t in self.generatefiles:
+ if isinstance(self.generatefiles[t], dict):
+ src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+ vwd.copy(rest, t, source_collection=src)
+ else:
+ with vwd.open(t, "w") as f:
+ f.write(self.generatefiles[t])
+ vwd.save_new()
+ for t in self.generatefiles:
+ script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+ script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
+ if self.environment:
+ script_parameters["task.env"].update(self.environment)
+
+ if self.stdin:
+ script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+ if self.stdout:
+ script_parameters["task.stdout"] = self.stdout
+
+ (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ if docker_req and kwargs.get("use_container") is not False:
+ runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+ else:
+ runtime_constraints["docker_image"] = "arvados/jobs"
+
+ resources = self.builder.resources
+ if resources is not None:
+ runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+ runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+ runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+ filters = [["repository", "=", "arvados"],
+ ["script", "=", "crunchrunner"],
+ ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+ if not self.arvrunner.ignore_docker_for_reuse:
+ filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
+
+ try:
+ response = self.arvrunner.api.jobs().create(
+ body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "script": "crunchrunner",
+ "repository": "arvados",
+ "script_version": "master",
+ "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+ "script_parameters": {"tasks": [script_parameters]},
+ "runtime_constraints": runtime_constraints
+ },
+ filters=filters,
+ find_or_create=kwargs.get("enable_reuse", True)
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ self.update_pipeline_component(response)
+
+ logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+ if response["state"] in ("Complete", "Failed", "Cancelled"):
+ self.done(response)
+ except Exception as e:
+ logger.error("Got error %s" % str(e))
+ self.output_callback({}, "permanentFail")
+
+ def update_pipeline_component(self, record):
+ if self.arvrunner.pipeline:
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+ body={
+ "components": self.arvrunner.pipeline["components"]
+ }).execute(num_retries=self.arvrunner.num_retries)
+ if self.arvrunner.uuid:
+ try:
+ job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+ if job:
+ components = job["components"]
+ components[self.name] = record["uuid"]
+ self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
+ body={
+ "components": components
+ }).execute(num_retries=self.arvrunner.num_retries)
+ except Exception as e:
+ logger.info("Error adding to components: %s", e)
+
+ def done(self, record):
+ try:
+ self.update_pipeline_component(record)
+ except:
+ pass
+
+ try:
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ outputs = {}
+ try:
+ if record["output"]:
+ logc = arvados.collection.Collection(record["log"])
+ log = logc.open(logc.keys()[0])
+ tmpdir = None
+ outdir = None
+ keepdir = None
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
+ g = tmpdirre.match(l)
+ if g:
+ tmpdir = g.group(1)
+ g = outdirre.match(l)
+ if g:
+ outdir = g.group(1)
+ g = keepre.match(l)
+ if g:
+ keepdir = g.group(1)
+
+ outputs = done.done(self, record, tmpdir, outdir, keepdir)
+ except WorkflowException as e:
+ logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ processStatus = "permanentFail"
+ except Exception as e:
+ logger.exception("Got unknown exception while collecting job outputs:")
+ processStatus = "permanentFail"
+
+ self.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
diff --git a/sdk/cwl/arvados_cwl/done.py b/sdk/cwl/arvados_cwl/done.py
new file mode 100644
index 0000000..8a6fc9d
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/done.py
@@ -0,0 +1,38 @@
+def done(self, record, tmpdir, outdir, keepdir):
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
+ # First, get output record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
+
+ self.builder.outdir = outdir
+ self.builder.pathmapper.keepdir = keepdir
+ return self.collect_outputs("keep:" + record["output"])
commit 2e2efff6b6e7bff2880996953ed23a17e8757b7a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 8 11:45:00 2016 -0400
8442: Return PDH for Docker container. Working on setting up mount points.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5a1ff07..387bc4a 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,9 +40,6 @@ tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 4ea63a0..2179398 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -23,7 +23,16 @@ class ArvadosContainer(object):
"state": "Committed"
}
runtime_constraints = {}
- mounts = {}
+ mounts = {
+ "/var/spool/cwl": {
+ "kind": "tmp"
+ },
+ "/tmp": {
+ "kind": "tmp"
+ }
+ }
+
+ # TODO mount normal inputs...
if self.generatefiles:
vwd = arvados.collection.Collection()
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 8f76bbf..253df99 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -26,6 +26,11 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
logger.info("Uploading Docker image %s", ":".join(args[1:]))
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
- # XXX return PDH instead
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag)
+
+ #return dockerRequirement["dockerImageId"]
- return dockerRequirement["dockerImageId"]
+ pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+ return pdh
commit 79838b34419f3091e1562aabca9ee8a9b29c61db
Merge: 2f7418e 3c4bcfb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 8 11:26:11 2016 -0400
Merge branch '9187-requeued-containers' into 8442-cwl-crunch2
commit 2f7418e07bae597216c8d69a93f3059ef51f972a
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jun 8 11:15:58 2016 -0400
8442: Submit containers Work in progess.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0878998..5a1ff07 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -27,6 +27,8 @@ import threading
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
import urlparse
+from .arvcontainer import ArvadosContainer
+from .arvdocker import arv_docker_get_image
from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
from arvados.api import OrderedJsonModel
@@ -39,31 +41,6 @@ outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.
keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
- """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
-
- if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
- dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
-
- sp = dockerRequirement["dockerImageId"].split(":")
- image_name = sp[0]
- image_tag = sp[1] if len(sp) > 1 else None
-
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
- image_name=image_name,
- image_tag=image_tag)
-
- if not images:
- imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
- args = ["--project-uuid="+project_uuid, image_name]
- if image_tag:
- args.append(image_tag)
- logger.info("Uploading Docker image %s", ":".join(args[1:]))
- arvados.commands.keepdocker.main(args, stdout=sys.stderr)
-
- # XXX return PDH instead
-
- return dockerRequirement["dockerImageId"]
class CollectionFsAccess(cwltool.process.StdFsAccess):
@@ -588,12 +565,13 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
+ def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
+ self.crunch2 = crunch2
def makeJobRunner(self):
- if kwargs.get("crunch2"):
+ if self.crunch2:
return ArvadosContainer(self.arvrunner)
else:
return ArvadosJob(self.arvrunner)
@@ -609,7 +587,7 @@ class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit crunch jobs, wait for them to
complete, and report output."""
- def __init__(self, api_client):
+ def __init__(self, api_client, crunch2):
self.api = api_client
self.jobs = {}
self.lock = threading.Lock()
@@ -618,10 +596,11 @@ class ArvCwlRunner(object):
self.uploaded = {}
self.num_retries = 4
self.uuid = None
+ self.crunch2 = crunch2
def arvMakeTool(self, toolpath_object, **kwargs):
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
else:
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
@@ -709,7 +688,7 @@ class ArvCwlRunner(object):
kwargs["fs_access"] = self.fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- if kwargs.get("crunch2"):
+ if self.crunch2:
kwargs["outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
else:
@@ -849,7 +828,7 @@ def main(args, stdout, stderr, api_client=None):
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client)
+ runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
except Exception as e:
logger.error(e)
return 1
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index fe0f7ca..4ea63a0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -1,3 +1,10 @@
+import logging
+import arvados.collection
+from cwltool.process import get_feature
+from .arvdocker import arv_docker_get_image
+
+logger = logging.getLogger('arvados.cwl-runner')
+
class ArvadosContainer(object):
"""Submit and manage a Crunch job for executing a CWL CommandLineTool."""
@@ -7,12 +14,13 @@ class ArvadosContainer(object):
def run(self, dry_run=False, pull_image=True, **kwargs):
container_request = {
- "command": self.command_line
+ "command": self.command_line,
"owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
- "output_path", "/var/spool/cwl",
- "cwd", "/var/spool/cwl",
- "priority": 1
+ "output_path": "/var/spool/cwl",
+ "cwd": "/var/spool/cwl",
+ "priority": 1,
+ "state": "Committed"
}
runtime_constraints = {}
mounts = {}
@@ -46,7 +54,7 @@ class ArvadosContainer(object):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if not docker_req:
- docker_req = "arvados/jobs"
+ docker_req = {"dockerImageId": "arvados/jobs"}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
new file mode 100644
index 0000000..8f76bbf
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -0,0 +1,31 @@
+import logging
+import cwltool.docker
+import arvados.commands.keepdocker
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+ """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
+
+ if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
+ dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+
+ sp = dockerRequirement["dockerImageId"].split(":")
+ image_name = sp[0]
+ image_tag = sp[1] if len(sp) > 1 else None
+
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag)
+
+ if not images:
+ imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
+ args = ["--project-uuid="+project_uuid, image_name]
+ if image_tag:
+ args.append(image_tag)
+ logger.info("Uploading Docker image %s", ":".join(args[1:]))
+ arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+
+ # XXX return PDH instead
+
+ return dockerRequirement["dockerImageId"]
commit 268250130d4927d49dbac3497affb98fda158645
Merge: c46dd60 b7db50d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jun 7 16:59:39 2016 -0400
Merge branch 'master' into 8442-cwl-crunch2
commit c46dd60f1dceac0e55da7bb328c3ad6313066993
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 15:54:35 2016 -0400
8442: Add --crunch1/--crunch2 switch
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 9b97dfc..f774b66 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -806,6 +806,15 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
default=True, dest="wait")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--crunch1", action="store_false",
+ default=False, dest="crunch2",
+ help="Use Crunch v1 Jobs API")
+
+ exgroup.add_argument("--crunch2", action="store_true",
+ default=False, dest="crunch2",
+ help="Use Crunch v2 Containers API")
+
parser.add_argument("workflow", type=str, nargs="?", default=None)
parser.add_argument("job_order", nargs=argparse.REMAINDER)
commit 4503c419a8c752f4ff6c0c57c343989405cd8ebb
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Mon Jun 6 15:24:50 2016 -0400
8442: CWL create crunch2 containers WIP
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 1740a90..9b97dfc 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
logger.info("Uploading Docker image %s", ":".join(args[1:]))
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+ # XXX return PDH instead
+
return dockerRequirement["dockerImageId"]
@@ -574,7 +576,10 @@ class ArvadosCommandTool(CommandLineTool):
self.arvrunner = arvrunner
def makeJobRunner(self):
- return ArvadosJob(self.arvrunner)
+ if kwargs.get("crunch2"):
+ return ArvadosContainer(self.arvrunner)
+ else:
+ return ArvadosJob(self.arvrunner)
def makePathMapper(self, reffiles, **kwargs):
return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -675,8 +680,12 @@ class ArvCwlRunner(object):
kwargs["fs_access"] = self.fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
- kwargs["outdir"] = "$(task.outdir)"
- kwargs["tmpdir"] = "$(task.tmpdir)"
+ if kwargs.get("crunch2"):
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["tmpdir"] = "/tmp"
+ else:
+ kwargs["outdir"] = "$(task.outdir)"
+ kwargs["tmpdir"] = "$(task.tmpdir)"
if kwargs.get("conformance_test"):
return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644
index 0000000..fe0f7ca
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+ """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+ def __init__(self, runner):
+ self.arvrunner = runner
+ self.running = False
+
+ def run(self, dry_run=False, pull_image=True, **kwargs):
+ container_request = {
+ "command": self.command_line
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": self.name,
+ "output_path", "/var/spool/cwl",
+ "cwd", "/var/spool/cwl",
+ "priority": 1
+ }
+ runtime_constraints = {}
+ mounts = {}
+
+ if self.generatefiles:
+ vwd = arvados.collection.Collection()
+ container_request["task.vwd"] = {}
+ for t in self.generatefiles:
+ if isinstance(self.generatefiles[t], dict):
+ src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+ vwd.copy(rest, t, source_collection=src)
+ else:
+ with vwd.open(t, "w") as f:
+ f.write(self.generatefiles[t])
+ vwd.save_new()
+ # TODO
+ # for t in self.generatefiles:
+ # container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+ container_request["environment"] = {"TMPDIR": "/tmp"}
+ if self.environment:
+ container_request["environment"].update(self.environment)
+
+ # TODO, not supported
+ #if self.stdin:
+ # container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+ if self.stdout:
+ mounts["stdout"] = {"kind": "file",
+ "path": self.stdout}
+
+ (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+ if not docker_req:
+ docker_req = "arvados/jobs"
+
+ container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+ docker_req,
+ pull_image,
+ self.arvrunner.project_uuid)
+
+ resources = self.builder.resources
+ if resources is not None:
+ runtime_constraints["vcpus"] = resources.get("cores", 1)
+ runtime_constraints["ram"] = resources.get("ram") * 2**20
+ #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+ container_request["mounts"] = mounts
+ container_request["runtime_constraints"] = runtime_constraints
+
+ try:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ self.arvrunner.jobs[response["uuid"]] = self
+
+ logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+ if response["state"] in ("Complete", "Cancelled"):
+ self.done(response)
+ except Exception as e:
+ logger.error("Got error %s" % str(e))
+ self.output_callback({}, "permanentFail")
+
+ def done(self, record):
+ try:
+ if record["state"] == "Complete":
+ processStatus = "success"
+ else:
+ processStatus = "permanentFail"
+
+ try:
+ outputs = {}
+ if record["output"]:
+ logc = arvados.collection.Collection(record["log"])
+ log = logc.open(logc.keys()[0])
+ tmpdir = None
+ outdir = None
+ keepdir = None
+ for l in log:
+ # Determine the tmpdir, outdir and keepdir paths from
+ # the job run. Unfortunately, we can't take the first
+ # values we find (which are expected to be near the
+ # top) and stop scanning because if the node fails and
+ # the job restarts on a different node these values
+ # will different runs, and we need to know about the
+ # final run that actually produced output.
+
+ g = tmpdirre.match(l)
+ if g:
+ tmpdir = g.group(1)
+ g = outdirre.match(l)
+ if g:
+ outdir = g.group(1)
+ g = keepre.match(l)
+ if g:
+ keepdir = g.group(1)
+
+ colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+ # check if collection already exists with same owner, name and content
+ collection_exists = self.arvrunner.api.collections().list(
+ filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+ ['portable_data_hash', '=', record["output"]],
+ ["name", "=", colname]]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collection_exists["items"]:
+ # Create a collection located in the same project as the
+ # pipeline with the contents of the output.
+ # First, get output record.
+ collections = self.arvrunner.api.collections().list(
+ limit=1,
+ filters=[['portable_data_hash', '=', record["output"]]],
+ select=["manifest_text"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ if not collections["items"]:
+ raise WorkflowException(
+ "Job output '%s' cannot be found on API server" % (
+ record["output"]))
+
+ # Create new collection in the parent project
+ # with the output contents.
+ self.arvrunner.api.collections().create(body={
+ "owner_uuid": self.arvrunner.project_uuid,
+ "name": colname,
+ "portable_data_hash": record["output"],
+ "manifest_text": collections["items"][0]["manifest_text"]
+ }, ensure_unique_name=True).execute(
+ num_retries=self.arvrunner.num_retries)
+
+ self.builder.outdir = outdir
+ self.builder.pathmapper.keepdir = keepdir
+ outputs = self.collect_outputs("keep:" + record["output"])
+ except WorkflowException as e:
+ logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ processStatus = "permanentFail"
+ except Exception as e:
+ logger.exception("Got unknown exception while collecting job outputs:")
+ processStatus = "permanentFail"
+
+ self.output_callback(outputs, processStatus)
+ finally:
+ del self.arvrunner.jobs[record["uuid"]]
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list