[ARVADOS] created: f72b0e8bcc350966ce54954711bed538c527eb00
Git user
git at public.curoverse.com
Fri Sep 30 16:50:49 EDT 2016
at f72b0e8bcc350966ce54954711bed538c527eb00 (commit)
commit f72b0e8bcc350966ce54954711bed538c527eb00
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Sep 30 15:59:51 2016 -0400
10165: Add FinalOutputPathMapper. Delete basename/size/listing from output object because that information is captured by the enclosing Collection. Sort keys.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0452486..21d3019 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -28,7 +28,7 @@ from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess
from .perf import Perf
-from .pathmapper import InitialWorkDirPathMapper
+from .pathmapper import FinalOutputPathMapper
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement
@@ -176,8 +176,7 @@ class ArvCwlRunner(object):
adjustDirObjs(outputObj, capture)
adjustFileObjs(outputObj, capture)
- generatemapper = InitialWorkDirPathMapper(files, "", "",
- separateDirs=False)
+ generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
final = arvados.collection.Collection()
@@ -194,13 +193,19 @@ class ArvCwlRunner(object):
logger.warn("While preparing output collection: %s", e)
def rewrite(fileobj):
- fileobj["location"] = generatemapper.mapper(fileobj["location"])
+ fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+ if "basename" in fileobj:
+ del fileobj["basename"]
+ if "size" in fileobj:
+ del fileobj["size"]
+ if "listing" in fileobj:
+ del fileobj["listing"]
- adjustDirObjs(outputObj, capture)
- adjustFileObjs(outputObj, capture)
+ adjustDirObjs(outputObj, rewrite)
+ adjustFileObjs(outputObj, rewrite)
with final.open("cwl.output.json", "w") as f:
- json.dump(outputObj, f, indent=4)
+ json.dump(outputObj, f, sort_keys=True, indent=4)
final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
@@ -338,15 +343,15 @@ class ArvCwlRunner(object):
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
- if kwargs.get("compute_checksum"):
- adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
-
if kwargs.get("submit"):
logger.info("Final output collection %s", runnerjob.final_output)
else:
self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
self.final_output)
+ if kwargs.get("compute_checksum"):
+ adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
return self.final_output
diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py
index 0fd9a0e..bd71a13 100644
--- a/sdk/cwl/arvados_cwl/pathmapper.py
+++ b/sdk/cwl/arvados_cwl/pathmapper.py
@@ -150,11 +150,11 @@ class InitialWorkDirPathMapper(PathMapper):
def visit(self, obj, stagedir, basedir, copy=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+ loc = obj["location"]
if obj["class"] == "Directory":
- self._pathmap[obj["location"]] = MapperEnt(obj["location"], stagedir, "Directory")
+ self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
self.visitlisting(obj.get("listing", []), stagedir, basedir)
elif obj["class"] == "File":
- loc = obj["location"]
if loc in self._pathmap:
return
tgt = os.path.join(stagedir, obj["basename"])
@@ -172,10 +172,26 @@ class InitialWorkDirPathMapper(PathMapper):
# Go through each file and set the target to its own directory along
# with any secondary files.
- stagedir = self.stagedir
- for fob in referenced_files:
- self.visit(fob, stagedir, basedir)
+ self.visitlisting(referenced_files, self.stagedir, basedir)
for path, (ab, tgt, type) in self._pathmap.items():
if type in ("File", "Directory") and ab.startswith("keep:"):
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+
+
+class FinalOutputPathMapper(PathMapper):
+ def visit(self, obj, stagedir, basedir, copy=False):
+ # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
+ loc = obj["location"]
+ if obj["class"] == "Directory":
+ self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
+ elif obj["class"] == "File":
+ if loc in self._pathmap:
+ return
+ tgt = os.path.join(stagedir, obj["basename"])
+ self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+ self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None
+ self.visitlisting(referenced_files, self.stagedir, basedir)
commit ad4344e21db3c894869d09fdcb09e3202bf9cf78
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Sep 30 14:16:32 2016 -0400
10165: Always make an output collection when a workflow completes.
diff --git a/crunch_scripts/cwl-runner b/crunch_scripts/cwl-runner
index 89699f5..3f2d53d 100755
--- a/crunch_scripts/cwl-runner
+++ b/crunch_scripts/cwl-runner
@@ -72,36 +72,14 @@ try:
args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
outputObj = runner.arv_executor(t, job_order_object, **vars(args))
- files = {}
- def capture(fileobj):
- path = fileobj["location"]
- sp = path.split("/")
- col = sp[0][5:]
- if col not in files:
- files[col] = set()
- files[col].add("/".join(sp[1:]))
- fileobj["location"] = path
-
- adjustFileObjs(outputObj, capture)
-
- final = arvados.collection.Collection()
-
- for k,v in files.iteritems():
- with arvados.collection.Collection(k) as c:
- for f in c:
- final.copy(f, f, c, True)
-
- def makeRelative(fileobj):
- fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
-
- adjustFileObjs(outputObj, makeRelative)
-
- with final.open("cwl.output.json", "w") as f:
- json.dump(outputObj, f, indent=4)
+ if runner.final_output_collection:
+ outputCollection = runner.final_output_collection.portable_data_hash()
+ else:
+ outputCollection = None
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
- 'output': final.save_new(create_collection_record=False),
+ 'output': outputCollection,
'success': True,
'progress':1.0
}).execute()
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 5262cb4..0452486 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -9,6 +9,8 @@ import os
import sys
import threading
import hashlib
+import copy
+import json
from functools import partial
import pkg_resources # part of setuptools
@@ -26,10 +28,11 @@ from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess
from .perf import Perf
-from cwltool.pack import pack
+from .pathmapper import InitialWorkDirPathMapper
+from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement
-from cwltool.pathmapper import adjustFileObjs
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
@@ -58,6 +61,7 @@ class ArvCwlRunner(object):
self.stop_polling = threading.Event()
self.poll_api = None
self.pipeline = None
+ self.final_output_collection = None
if self.work_api is None:
# todo: autodetect API to use.
@@ -162,6 +166,48 @@ class ArvCwlRunner(object):
for v in obj:
self.check_writable(v)
+ def make_output_collection(self, name, outputObj):
+ outputObj = copy.deepcopy(outputObj)
+
+ files = []
+ def capture(fileobj):
+ files.append(fileobj)
+
+ adjustDirObjs(outputObj, capture)
+ adjustFileObjs(outputObj, capture)
+
+ generatemapper = InitialWorkDirPathMapper(files, "", "",
+ separateDirs=False)
+
+ final = arvados.collection.Collection()
+
+ srccollections = {}
+ for k,v in generatemapper.items():
+ sp = k.split("/")
+ srccollection = sp[0][5:]
+ if srccollection not in srccollections:
+ srccollections[srccollection] = arvados.collection.CollectionReader(srccollection)
+ reader = srccollections[srccollection]
+ try:
+ final.copy("/".join(sp[1:]), v.target, source_collection=reader, overwrite=False)
+ except IOError as e:
+ logger.warn("While preparing output collection: %s", e)
+
+ def rewrite(fileobj):
+ fileobj["location"] = generatemapper.mapper(fileobj["location"])
+
+ adjustDirObjs(outputObj, capture)
+ adjustFileObjs(outputObj, capture)
+
+ with final.open("cwl.output.json", "w") as f:
+ json.dump(outputObj, f, indent=4)
+
+ final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
+
+ logger.info("Final output collection %s (%s)", final.portable_data_hash(), final.manifest_locator())
+
+ self.final_output_collection = final
+
def arv_executor(self, tool, job_order, **kwargs):
self.debug = kwargs.get("debug")
@@ -295,6 +341,12 @@ class ArvCwlRunner(object):
if kwargs.get("compute_checksum"):
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+ if kwargs.get("submit"):
+ logger.info("Final output collection %s", runnerjob.final_output)
+ else:
+ self.make_output_collection("Output of %s" % (shortname(tool.tool["id"])),
+ self.final_output)
+
return self.final_output
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 07f85bb..7828bfd 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -121,6 +121,7 @@ class Runner(object):
self.running = False
self.enable_reuse = enable_reuse
self.uuid = None
+ self.final_output = None
def update_pipeline_component(self, record):
pass
@@ -169,7 +170,8 @@ class Runner(object):
outputs = None
try:
try:
- outc = arvados.collection.Collection(record["output"])
+ self.final_output = record["output"]
+ outc = arvados.collection.Collection(self.final_output)
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
def keepify(fileobj):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list