[ARVADOS] created: f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8
git at public.curoverse.com
git at public.curoverse.com
Fri May 1 11:34:10 EDT 2015
at f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8 (commit)
commit f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 1 10:49:18 2015 -0400
5787: Copy to the right destination name, add docstring, fix syntax errors
diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py
index 9d5578a..4a97546 100644
--- a/crunch_scripts/crunchutil/vwd.py
+++ b/crunch_scripts/crunchutil/vwd.py
@@ -2,7 +2,7 @@ import arvados
import os
import robust_put
import stat
-import arvados.command.run
+import arvados.commands.run
# Implements "Virtual Working Directory"
# Provides a way of emulating a shared writable directory in Keep based
@@ -33,13 +33,17 @@ def checkout(source_collection, target_dir, keepmount=None):
for f in files:
os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
-def is_collection(fn):
- if os.path.exists
-
-# Delete all symlinks and check in any remaining normal files.
-# If merge == True, merge the manifest with source_collection and return a
-# CollectionReader for the combined collection.
def checkin(target_dir):
+ """Write files in the target_dir to Keep.
+
+ Symlinks into the keep mount in the output dir are efficiently added to the
+ collection with no data copying.
+
+ Returns a new Collection object, with data flushed but the collection record
+ not saved to the API.
+
+ """
+
# delete symlinks, commit directory, merge manifests and return combined
# collection.
@@ -56,7 +60,7 @@ def checkin(target_dir):
if stat.S_ISLNK(s.st_mode):
# 1. check if it is a link into a collection
real = os.path.split(os.path.realpath(os.path.join(root, f)))
- (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1])
+ (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
if pdh is not None:
# 2. load collection
if pdh not in collections:
@@ -65,7 +69,7 @@ def checkin(target_dir):
keep_client=outputcollection._my_keep(),
num_retries=5)
# 3. copy arvfile to new collection
- outputcollection.copy(branch, branch, source_collection=collections[pdh])
+ outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
elif stat.S_ISREG(s.st_mode):
reldir = root[len(target_dir):]
@@ -76,4 +80,4 @@ def checkin(target_dir):
writer.write(dat)
dat = reader.read(64*1024)
- return outputcollection.manifest_text()
+ return outputcollection
commit 1281ecab8f2396739ee9232c36796e46cd551426
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri May 1 10:32:01 2015 -0400
5787: run-command uploading uses new collection API and supports symlinks into Keep
diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py
index 3d54c9c..9d5578a 100644
--- a/crunch_scripts/crunchutil/vwd.py
+++ b/crunch_scripts/crunchutil/vwd.py
@@ -2,6 +2,7 @@ import arvados
import os
import robust_put
import stat
+import arvados.command.run
# Implements "Virtual Working Directory"
# Provides a way of emulating a shared writable directory in Keep based
@@ -32,23 +33,47 @@ def checkout(source_collection, target_dir, keepmount=None):
for f in files:
os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
+def is_collection(fn):
+ if os.path.exists
+
# Delete all symlinks and check in any remaining normal files.
# If merge == True, merge the manifest with source_collection and return a
# CollectionReader for the combined collection.
-def checkin(source_collection, target_dir, merge=True):
+def checkin(target_dir):
# delete symlinks, commit directory, merge manifests and return combined
# collection.
+
+ outputcollection = arvados.collection.Collection(num_retries=5)
+
+ if target_dir[-1:] != '/':
+ target_dir += '/'
+
+ collections = {}
+
for root, dirs, files in os.walk(target_dir):
for f in files:
s = os.lstat(os.path.join(root, f))
if stat.S_ISLNK(s.st_mode):
- os.unlink(os.path.join(root, f))
-
- uuid = robust_put.upload(target_dir)
- if merge:
- cr1 = arvados.CollectionReader(source_collection)
- cr2 = arvados.CollectionReader(uuid)
- combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text())
- return combined
- else:
- return arvados.CollectionReader(uuid)
+ # 1. check if it is a link into a collection
+ real = os.path.split(os.path.realpath(os.path.join(root, f)))
+ (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1])
+ if pdh is not None:
+ # 2. load collection
+ if pdh not in collections:
+ collections[pdh] = arvados.collection.CollectionReader(pdh,
+ api_client=outputcollection._my_api(),
+ keep_client=outputcollection._my_keep(),
+ num_retries=5)
+ # 3. copy arvfile to new collection
+ outputcollection.copy(branch, branch, source_collection=collections[pdh])
+
+ elif stat.S_ISREG(s.st_mode):
+ reldir = root[len(target_dir):]
+ with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
+ with open(os.path.join(root, f), "rb") as reader:
+ dat = reader.read(64*1024)
+ while dat:
+ writer.write(dat)
+ dat = reader.read(64*1024)
+
+ return outputcollection.manifest_text()
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 1fcdb40..ae2233e 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -61,8 +61,6 @@ else:
if 'TASK_KEEPMOUNT' not in os.environ:
os.environ['TASK_KEEPMOUNT'] = '/keep'
-links = []
-
def sub_tmpdir(v):
return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
@@ -415,24 +413,20 @@ signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-for l in links:
- os.unlink(l)
-
logger.info("the following output files will be saved to keep:")
-subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
+subprocess.call(["find", ".", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
logger.info("start writing output to keep")
-if "task.vwd" in taskp:
- if "task.foreach" in jobp:
- # This is a subtask, so don't merge with the original collection, that will happen at the end
- outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
- else:
- # Just a single task, so do merge with the original collection
- outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
-else:
- outcollection = robust_put.upload(outdir, logger)
+if "task.vwd" in taskp and "task.foreach" in jobp:
+ for root, dirs, files in os.walk(outdir):
+ for f in files:
+ s = os.lstat(os.path.join(root, f))
+ if stat.S_ISLNK(s.st_mode):
+ os.unlink(os.path.join(root, f))
+
+outcollection = vwd.checkin(outdir).manifest_text()
# Success if we ran any subprocess, and they all exited 0.
success = rcode and all(status == 0 for status in rcode.itervalues())
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list