[ARVADOS] created: 67fbf00f0d868384f1585f2473b5f89455001638

Git user git at public.curoverse.com
Mon May 22 12:52:08 EDT 2017


        at  67fbf00f0d868384f1585f2473b5f89455001638 (commit)


commit 67fbf00f0d868384f1585f2473b5f89455001638
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Mar 31 17:49:38 2017 -0400

    11100: a-c-r sets output_ttl and deletes intermediate collections on success.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 46436b5..e0d103c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -74,6 +74,8 @@ class ArvCwlRunner(object):
         self.output_name = output_name
         self.output_tags = output_tags
         self.project_uuid = None
+        self.output_ttl = 0
+        self.intermediate_output_collections = []
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -202,6 +204,20 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def add_intermediate_output(self, uuid):
+        if uuid:
+            self.intermediate_output_collections.append(uuid)
+
+    def trash_intermediate_output(self):
+        logger.info("Cleaning up intermediate output collections")
+        for i in self.intermediate_output_collections:
+            try:
+                self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries)
+            except:
+                logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+            if sys.exc_info()[0] is KeyboardInterrupt:
+                break
+
     def check_features(self, obj):
         if isinstance(obj, dict):
             if obj.get("writable"):
@@ -329,6 +345,10 @@ class ArvCwlRunner(object):
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
+        self.output_ttl = kwargs["intermediate_output_ttl"]
+        if self.output_ttl and self.work_api != "containers":
+            raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
+
         if not kwargs.get("name"):
             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
@@ -512,6 +532,9 @@ class ArvCwlRunner(object):
             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
+        if self.output_ttl and self.final_status == "success":
+            self.trash_intermediate_output()
+
         return (self.final_output, self.final_status)
 
 
@@ -619,6 +642,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     parser.add_argument("--enable-dev", action="store_true",
                         help="Enable loading and running development versions "
                              "of CWL spec.", default=False)
+    parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
+                        help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
+                        default=0)
 
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 0b302b6..fe3efcc 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -42,7 +42,8 @@ class ArvadosContainer(object):
             "cwd": self.outdir,
             "priority": 1,
             "state": "Committed",
-            "properties": {}
+            "properties": {},
+            "output_ttl", self.arvrunner.output_ttl
         }
         runtime_constraints = {}
 
@@ -200,6 +201,8 @@ class ArvadosContainer(object):
 
     def done(self, record):
         try:
+            self.arvrunner.add_intermediate_output(record["output_uuid"])
+
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
             ).execute(num_retries=self.arvrunner.num_retries)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list