[ARVADOS] created: f72b0e8bcc350966ce54954711bed538c527eb00

Git user git at public.curoverse.com
Fri Sep 30 16:50:49 EDT 2016


        at  f72b0e8bcc350966ce54954711bed538c527eb00 (commit)


commit f72b0e8bcc350966ce54954711bed538c527eb00
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Sep 30 15:59:51 2016 -0400

    10165: Add FinalOutputPathMapper.  Delete basename/size/listing from output object because that information is captured by the enclosing Collection.  Sort keys.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0452486..21d3019 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .perf import Perf
-from .pathmapper import InitialWorkDirPathMapper
+from .pathmapper import FinalOutputPathMapper
 
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement
@@ -176,8 +176,7 @@ class ArvCwlRunner(object):
         adjustDirObjs(outputObj, capture)
         adjustFileObjs(outputObj, capture)
 
-        generatemapper = InitialWorkDirPathMapper(files, "", "",
-                                                  separateDirs=False)
+        generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
 
         final = arvados.collection.Collection()
 
@@ -194,13 +193,19 @@ class ArvCwlRunner(object):
                 logger.warn("While preparing output collection: %s", e)
 
         def rewrite(fileobj):
-            fileobj["location"] = generatemapper.mapper(fileobj["location"])
+            fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+            if "basename" in fileobj:
+                del fileobj["basename"]
+            if "size" in fileobj:
+                del fileobj["size"]
+            if "listing" in fileobj:
+                del fileobj["listing"]
 
-        adjustDirObjs(outputObj, capture)
-        adjustFileObjs(outputObj, capture)
+        adjustDirObjs(outputObj, rewrite)
+        adjustFileObjs(outputObj, rewrite)
 
         with final.open("cwl.output.json", "w") as f:
-            json.dump(outputObj, f, indent=4)
+            json.dump(outputObj, f, sort_keys=True, indent=4)
 
         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
 
@@ -338,15 +343,15 @@ class ArvCwlRunner(object):
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if kwargs.get("compute_checksum"):
-            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
         if kwargs.get("submit"):
             logger.info("Final output collection %s", runnerjob.final_output)
         else:
             self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
                                         self.final_output)
 
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
         return self.final_output
 
 
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 0fd9a0e..bd71a13 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -150,11 +150,11 @@ class InitialWorkDirPathMapper(PathMapper):
 
     def visit(self, obj, stagedir, basedir, copy=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+        loc = obj["location"]
         if obj["class"] == "Directory":
-            self._pathmap[obj["location"]] = MapperEnt(obj["location"], stagedir, "Directory")
+            self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
             self.visitlisting(obj.get("listing", []), stagedir, basedir)
         elif obj["class"] == "File":
-            loc = obj["location"]
             if loc in self._pathmap:
                 return
             tgt = os.path.join(stagedir, obj["basename"])
@@ -172,10 +172,26 @@ class InitialWorkDirPathMapper(PathMapper):
 
         # Go through each file and set the target to its own directory along
         # with any secondary files.
-        stagedir = self.stagedir
-        for fob in referenced_files:
-            self.visit(fob, stagedir, basedir)
+        self.visitlisting(referenced_files, self.stagedir, basedir)
 
         for path, (ab, tgt, type) in self._pathmap.items():
             if type in ("File", "Directory") and ab.startswith("keep:"):
                 self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+
+
+class FinalOutputPathMapper(PathMapper):
+    def visit(self, obj, stagedir, basedir, copy=False):
+        # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+        loc = obj["location"]
+        if obj["class"] == "Directory":
+            self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
+        elif obj["class"] == "File":
+            if loc in self._pathmap:
+                return
+            tgt = os.path.join(stagedir, obj["basename"])
+            self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], unicode) -> None
+        self.visitlisting(referenced_files, self.stagedir, basedir)

commit ad4344e21db3c894869d09fdcb09e3202bf9cf78
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Sep 30 14:16:32 2016 -0400

    10165: Always make an output collection when a workflow completes.

diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner
index 89699f5..3f2d53d 100755
--- a/crunch_scripts/cwl-runner
+++ b/crunch_scripts/cwl-runner
@@ -72,36 +72,14 @@ try:
     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
 
-    files = {}
-    def capture(fileobj):
-        path = fileobj["location"]
-        sp = path.split("/")
-        col = sp[0][5:]
-        if col not in files:
-            files[col] = set()
-        files[col].add("/".join(sp[1:]))
-        fileobj["location"] = path
-
-    adjustFileObjs(outputObj, capture)
-
-    final = arvados.collection.Collection()
-
-    for k,v in files.iteritems():
-        with arvados.collection.Collection(k) as c:
-            for f in c:
-                final.copy(f, f, c, True)
-
-    def makeRelative(fileobj):
-        fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
-
-    adjustFileObjs(outputObj, makeRelative)
-
-    with final.open("cwl.output.json", "w") as f:
-        json.dump(outputObj, f, indent=4)
+    if runner.final_output_collection:
+        outputCollection = runner.final_output_collection.portable_data_hash()
+    else:
+        outputCollection = None
 
     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                          body={
-                                             'output': final.save_new(create_collection_record=False),
+                                             'output': outputCollection,
                                              'success': True,
                                              'progress':1.0
                                          }).execute()
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5262cb4..0452486 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -9,6 +9,8 @@ import os
 import sys
 import threading
 import hashlib
+import copy
+import json
 from functools import partial
 import pkg_resources  # part of setuptools
 
@@ -26,10 +28,11 @@ from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess
 from .perf import Perf
-from cwltool.pack import pack
+from .pathmapper import InitialWorkDirPathMapper
 
+from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -58,6 +61,7 @@ class ArvCwlRunner(object):
         self.stop_polling = threading.Event()
         self.poll_api = None
         self.pipeline = None
+        self.final_output_collection = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -162,6 +166,48 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
+    def make_output_collection(self, name, outputObj):
+        outputObj = copy.deepcopy(outputObj)
+
+        files = []
+        def capture(fileobj):
+            files.append(fileobj)
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        generatemapper = InitialWorkDirPathMapper(files, "", "",
+                                                  separateDirs=False)
+
+        final = arvados.collection.Collection()
+
+        srccollections = {}
+        for k,v in generatemapper.items():
+            sp = k.split("/")
+            srccollection = sp[0][5:]
+            if srccollection not in srccollections:
+                srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
+            reader = srccollections[srccollection]
+            try:
+                final.copy("/".join(sp[1:]), v.target, source_collection=reader, overwrite=False)
+            except IOError as e:
+                logger.warn("While preparing output collection: %s", e)
+
+        def rewrite(fileobj):
+            fileobj["location"] = generatemapper.mapper(fileobj["location"])
+
+        adjustDirObjs(outputObj, capture)
+        adjustFileObjs(outputObj, capture)
+
+        with final.open("cwl.output.json", "w") as f:
+            json.dump(outputObj, f, indent=4)
+
+        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+        logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+
+        self.final_output_collection = final
+
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
@@ -295,6 +341,12 @@ class ArvCwlRunner(object):
         if kwargs.get("compute_checksum"):
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
+        if kwargs.get("submit"):
+            logger.info("Final output collection %s", runnerjob.final_output)
+        else:
+            self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
+                                        self.final_output)
+
         return self.final_output
 
 
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 07f85bb..7828bfd 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -121,6 +121,7 @@ class Runner(object):
         self.running = False
         self.enable_reuse = enable_reuse
         self.uuid = None
+        self.final_output = None
 
     def update_pipeline_component(self, record):
         pass
@@ -169,7 +170,8 @@ class Runner(object):
         outputs = None
         try:
             try:
-                outc = arvados.collection.Collection(record["output"])
+                self.final_output = record["output"]
+                outc = arvados.collection.Collection(self.final_output)
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
                 def keepify(fileobj):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list