[ARVADOS] created: 00161497a817a58810419a316f014bb28806b966
Git user
git at public.curoverse.com
Wed Apr 12 11:06:12 EDT 2017
at 00161497a817a58810419a316f014bb28806b966 (commit)
commit 00161497a817a58810419a316f014bb28806b966
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Mar 31 17:49:38 2017 -0400
11100: a-c-r sets output_ttl and deletes intermediate collections on success.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 3b14701..a7e52bd 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -74,6 +74,8 @@ class ArvCwlRunner(object):
self.output_name = output_name
self.output_tags = output_tags
self.project_uuid = None
+ self.output_ttl = 0
+ self.intermediate_output_collections = []
if keep_client is not None:
self.keep_client = keep_client
@@ -199,6 +201,20 @@ class ArvCwlRunner(object):
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
+ def add_intermediate_output(self, uuid):
+ if uuid:
+ self.intermediate_output_collections.append(uuid)
+
+ def trash_intermediate_output(self):
+ logger.info("Cleaning up intermediate output collections")
+ for i in self.intermediate_output_collections:
+ try:
+ self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+ except:
+ logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ break
+
def check_features(self, obj):
if isinstance(obj, dict):
if obj.get("writable"):
@@ -341,6 +357,10 @@ class ArvCwlRunner(object):
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
+ self.output_ttl = kwargs["intermediate_output_ttl"]
+ if self.output_ttl and self.work_api != "containers":
+ raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
+
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
@@ -524,6 +544,9 @@ class ArvCwlRunner(object):
adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+ if self.output_ttl and self.final_status == "success":
+ self.trash_intermediate_output()
+
return (self.final_output, self.final_status)
@@ -623,6 +646,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
"Default is 'continue'.", default="continue", choices=("stop", "continue"))
+ parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
+ help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
+ default=0)
+
parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 3090936..12ea2f3 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -41,7 +41,8 @@ class ArvadosContainer(object):
"cwd": self.outdir,
"priority": 1,
"state": "Committed",
- "properties": {}
+ "properties": {},
+ "output_ttl", self.arvrunner.output_ttl
}
runtime_constraints = {}
mounts = {
@@ -181,6 +182,8 @@ class ArvadosContainer(object):
def done(self, record):
try:
+ self.arvrunner.add_intermediate_output(record["output_uuid"])
+
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
).execute(num_retries=self.arvrunner.num_retries)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list