[ARVADOS] created: a437c40e832b7e2038d6c3de8771328ca84491c9
git at public.curoverse.com
git at public.curoverse.com
Wed Aug 13 11:06:04 EDT 2014
at a437c40e832b7e2038d6c3de8771328ca84491c9 (commit)
commit a437c40e832b7e2038d6c3de8771328ca84491c9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Aug 13 11:04:52 2014 -0400
Added virtual working directory "task.vwd" option to run-command. Added
"task.cwd" to set the starting directory. Refactored upload retry logic from
run-command into robust_put.py and modified decompress-all to use it. no issue #
diff --git a/crunch_scripts/decompress-all.py b/crunch_scripts/decompress-all.py
index c1e1e82..261c78e 100755
--- a/crunch_scripts/decompress-all.py
+++ b/crunch_scripts/decompress-all.py
@@ -18,6 +18,7 @@ import re
import subprocess
import os
import sys
+import robust_put
arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True,
input_as_path=True)
@@ -51,9 +52,7 @@ m = re.match(r'.*\.(gz|Z|bz2|tgz|tbz|zip|rar|7z|cab|deb|rpm|cpio|gem)$', arvados
if m != None:
rc = subprocess.call(["dtrx", "-r", "-n", "-q", arvados.get_task_param_mount('input')])
if rc == 0:
- out = arvados.CollectionWriter()
- out.write_directory_tree(outdir, max_manifest_depth=0)
- task.set_output(out.finish())
+ task.set_output(robust_put.upload(outdir))
else:
sys.exit(rc)
else:
diff --git a/crunch_scripts/robust_put.py b/crunch_scripts/robust_put.py
new file mode 100644
index 0000000..ce05415
--- /dev/null
+++ b/crunch_scripts/robust_put.py
@@ -0,0 +1,47 @@
+import arvados
+import arvados.commands.put as put
+import os
+
+def machine_progress(bytes_written, bytes_expected):
+ return "upload wrote {} total {}\n".format(
+ bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+class Args(object):
+ def __init__(self, fn):
+ self.filename = None
+ self.paths = [fn]
+ self.max_manifest_depth = 0
+
+# Upload to Keep with error recovery.
+# Return a uuid or raise an exception if there are too many failures.
+def upload(source_dir):
+ source_dir = os.path.abspath(source_dir)
+ done = False
+ if 'TASK_WORK' in os.environ:
+ resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
+ else:
+ resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
+ reporter = put.progress_writer(machine_progress)
+ bytes_expected = put.expected_bytes_for([source_dir])
+ backoff = 1
+ outuuid = None
+ while not done:
+ try:
+ out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
+ out.do_queued_work()
+ out.write_directory_tree(source_dir, max_manifest_depth=0)
+ outuuid = out.finish()
+ done = True
+ except KeyboardInterrupt as e:
+ logging.critical("caught interrupt signal 2")
+ raise e
+ except Exception as e:
+ logging.exception("caught exception:")
+ backoff *= 2
+ if backoff > 256:
+ logging.critical("Too many upload failures, giving up")
+ raise e
+ else:
+ logging.warning("Sleeping for %s seconds before trying again" % backoff)
+ time.sleep(backoff)
+ return outuuid
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 7d77248..7599f5c 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -16,6 +16,8 @@ import traceback
import pprint
import multiprocessing
import logging
+import robust_put
+import vwd
os.umask(0077)
logging.basicConfig(format="run-command: %(message)s")
@@ -39,12 +41,6 @@ if len(arvados.current_task()['parameters']) > 0:
links = []
-def sub_link(v):
- r = os.path.join(outdir, os.path.basename(v))
- os.symlink(v, r)
- links.append(r)
- return r
-
def sub_tmpdir(v):
return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
@@ -63,7 +59,6 @@ def sub_taskid(v):
def sub_jobsrc(v):
return os.environ['CRUNCH_SRC']
-subst.default_subs["link "] = sub_link
subst.default_subs["task.tmpdir"] = sub_tmpdir
subst.default_subs["task.outdir"] = sub_outdir
subst.default_subs["job.srcdir"] = sub_jobsrc
@@ -71,10 +66,6 @@ subst.default_subs["node.cores"] = sub_cores
subst.default_subs["job.uuid"] = sub_jobid
subst.default_subs["task.uuid"] = sub_taskid
-def machine_progress(bytes_written, bytes_expected):
- return "run-command: wrote {} total {}\n".format(
- bytes_written, -1 if (bytes_expected is None) else bytes_expected)
-
class SigHandler(object):
def __init__(self):
self.sig = None
@@ -142,7 +133,12 @@ try:
'parameters': params
}
).execute()
- arvados.current_task().set_output(None)
+ if "task.vwd" in jobp:
+ # Base vwd collection will be merged with output fragments from
+ # the other tasks by crunch.
+ arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+ else:
+ arvados.current_task().set_output(None)
sys.exit(0)
else:
sys.exit(1)
@@ -151,10 +147,17 @@ try:
cmd = expand_list(taskp, taskp["command"])
+ if "task.vwd" in taskp:
+ # Populate output directory with symlinks to files in collection
+ vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
+
if "save.stdout" in taskp:
stdoutname = subst.do_substitution(taskp, taskp["save.stdout"])
stdoutfile = open(stdoutname, "wb")
+ if "task.cwd" in taskp:
+ os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
+
logging.info("{}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
except Exception as e:
@@ -198,28 +201,21 @@ subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/
logging.info("start writing output to keep")
-done = False
-resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
-reporter = put.progress_writer(machine_progress)
-bytes_expected = put.expected_bytes_for(".")
-while not done:
- try:
- out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
- out.do_queued_work()
- out.write_directory_tree(".", max_manifest_depth=0)
- outuuid = out.finish()
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output':outuuid,
- 'success': (rcode == 0),
- 'progress':1.0
- }).execute()
- done = True
- except KeyboardInterrupt:
- logging.critical("terminating on signal 2")
- sys.exit(2)
- except Exception as e:
- logging.exception("caught exception:")
- time.sleep(5)
+if "task.vwd" in taskp:
+ if "task.foreach" in jobp:
+ # This is a subtask, so don't merge with the original collection, that will happen at the end
+ outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
+ else:
+ # Just a single task, so do merge with the original collection
+ outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
+else:
+ outcollection = robust_put.upload(outdir)
+
+api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': outcollection,
+ 'success': (rcode == 0),
+ 'progress':1.0
+ }).execute()
sys.exit(rcode)
diff --git a/crunch_scripts/vwd.py b/crunch_scripts/vwd.py
new file mode 100644
index 0000000..ad922bc
--- /dev/null
+++ b/crunch_scripts/vwd.py
@@ -0,0 +1,53 @@
+import arvados
+import os
+import robust_put
+import stat
+
+# Implements "Virtual Working Directory"
+# Provides a way of emulating a shared writable directory in Keep based
+# on a "check out, edit, check in, merge" model.
+# At the moment, this only permits adding new files, applications
+# cannot modify or delete existing files.
+
+# Create a symlink tree rooted at target_dir mirroring arv-mounted
+# source_collection. target_dir must be empty, and will be created if it
+# doesn't exist.
+def checkout(source_collection, target_dir, keepmount=None):
+ # create symlinks
+ if keepmount == None:
+ keepmount = os.environ['TASK_KEEPMOUNT']
+
+ if not os.path.exists(target_dir):
+ os.makedirs(target_dir)
+
+ if os.listdir(target_dir) > 0:
+ raise Exception("target_dir must be empty before checkout")
+
+ stem = os.path.join(keepmount, source_collection)
+ for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
+ rel = root[len(stem)+1:]
+ for d in dirs:
+ os.mkdir(os.path.join(target_dir, rel, d))
+ for f in files:
+ os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
+
+# Delete all symlinks and check in any remaining normal files.
+# If merge == True, merge the manifest with source_collection and return a
+# CollectionReader for the combined collection.
+def checkin(source_collection, target_dir, merge=True):
+ # delete symlinks, commit directory, merge manifests and return combined
+ # collection.
+ for root, dirs, files in os.walk(target_dir):
+ for f in files:
+ s = os.lstat(os.path.join(root, f))
+ if stat.S_ISLNK(s.st_mode):
+ os.unlink(os.path.join(root, f))
+
+ uuid = robust_put.upload(target_dir)
+ if merge:
+ cr1 = arvados.CollectionReader(source_collection)
+ cr2 = arvados.CollectionReader(uuid)
+ combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text())
+ return combined
+ else:
+ return arvados.CollectionReader(uuid)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list