[ARVADOS] created: a437c40e832b7e2038d6c3de8771328ca84491c9

git at public.curoverse.com git at public.curoverse.com
Wed Aug 13 11:06:04 EDT 2014


        at  a437c40e832b7e2038d6c3de8771328ca84491c9 (commit)


commit a437c40e832b7e2038d6c3de8771328ca84491c9
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Aug 13 11:04:52 2014 -0400

    Added virtual working directory "task.vwd" option to run-command.  Added
    "task.cwd" to set the starting directory.  Refactored upload retry logic from
    run-command into robust_put.py and modified decompress-all to use it.  no issue #

diff --git a/crunch_scripts/decompress-all.py b/crunch_scripts/decompress-all.py
index c1e1e82..261c78e 100755
--- a/crunch_scripts/decompress-all.py
+++ b/crunch_scripts/decompress-all.py
@@ -18,6 +18,7 @@ import re
 import subprocess
 import os
 import sys
+import robust_put
 
 arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True,
                                           input_as_path=True)
@@ -51,9 +52,7 @@ m = re.match(r'.*\.(gz|Z|bz2|tgz|tbz|zip|rar|7z|cab|deb|rpm|cpio|gem)$', arvados
 if m != None:
     rc = subprocess.call(["dtrx", "-r", "-n", "-q", arvados.get_task_param_mount('input')])
     if rc == 0:
-        out = arvados.CollectionWriter()
-        out.write_directory_tree(outdir, max_manifest_depth=0)
-        task.set_output(out.finish())
+        task.set_output(robust_put.upload(outdir))
     else:
         sys.exit(rc)
 else:
diff --git a/crunch_scripts/robust_put.py b/crunch_scripts/robust_put.py
new file mode 100644
index 0000000..ce05415
--- /dev/null
+++ b/crunch_scripts/robust_put.py
@@ -0,0 +1,47 @@
+import arvados
+import arvados.commands.put as put
+import os
+
+def machine_progress(bytes_written, bytes_expected):
+    return "upload wrote {} total {}\n".format(
+        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
+class Args(object):
+    def __init__(self, fn):
+        self.filename = None
+        self.paths = [fn]
+        self.max_manifest_depth = 0
+
+# Upload to Keep with error recovery.
+# Return a uuid or raise an exception if there are too many failures.
+def upload(source_dir):
+    source_dir = os.path.abspath(source_dir)
+    done = False
+    if 'TASK_WORK' in os.environ:
+        resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
+    else:
+        resume_cache = put.ResumeCache(put.ResumeCache.make_path(Args(source_dir)))
+    reporter = put.progress_writer(machine_progress)
+    bytes_expected = put.expected_bytes_for([source_dir])
+    backoff = 1
+    outuuid = None
+    while not done:
+        try:
+            out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
+            out.do_queued_work()
+            out.write_directory_tree(source_dir, max_manifest_depth=0)
+            outuuid = out.finish()
+            done = True
+        except KeyboardInterrupt as e:
+            logging.critical("caught interrupt signal 2")
+            raise e
+        except Exception as e:
+            logging.exception("caught exception:")
+            backoff *= 2
+            if backoff > 256:
+                logging.critical("Too many upload failures, giving up")
+                raise e
+            else:
+                logging.warning("Sleeping for %s seconds before trying again" % backoff)
+                time.sleep(backoff)
+    return outuuid
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 7d77248..7599f5c 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -16,6 +16,8 @@ import traceback
 import pprint
 import multiprocessing
 import logging
+import robust_put
+import vwd
 
 os.umask(0077)
 logging.basicConfig(format="run-command: %(message)s")
@@ -39,12 +41,6 @@ if len(arvados.current_task()['parameters']) > 0:
 
 links = []
 
-def sub_link(v):
-    r = os.path.join(outdir, os.path.basename(v))
-    os.symlink(v, r)
-    links.append(r)
-    return r
-
 def sub_tmpdir(v):
     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
 
@@ -63,7 +59,6 @@ def sub_taskid(v):
 def sub_jobsrc(v):
      return os.environ['CRUNCH_SRC']
 
-subst.default_subs["link "] = sub_link
 subst.default_subs["task.tmpdir"] = sub_tmpdir
 subst.default_subs["task.outdir"] = sub_outdir
 subst.default_subs["job.srcdir"] = sub_jobsrc
@@ -71,10 +66,6 @@ subst.default_subs["node.cores"] = sub_cores
 subst.default_subs["job.uuid"] = sub_jobid
 subst.default_subs["task.uuid"] = sub_taskid
 
-def machine_progress(bytes_written, bytes_expected):
-    return "run-command: wrote {} total {}\n".format(
-        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
-
 class SigHandler(object):
     def __init__(self):
         self.sig = None
@@ -142,7 +133,12 @@ try:
                         'parameters': params
                         }
                     ).execute()
-                arvados.current_task().set_output(None)
+                if "task.vwd" in jobp:
+                    # Base vwd collection will be merged with output fragments from
+                    # the other tasks by crunch.
+                    arvados.current_task().set_output(subst.do_substitution(jobp, jobp["task.vwd"]))
+                else:
+                    arvados.current_task().set_output(None)
                 sys.exit(0)
             else:
                 sys.exit(1)
@@ -151,10 +147,17 @@ try:
 
     cmd = expand_list(taskp, taskp["command"])
 
+    if "task.vwd" in taskp:
+        # Populate output directory with symlinks to files in collection
+        vwd.checkout(subst.do_substitution(taskp, taskp["task.vwd"]), outdir)
+
     if "save.stdout" in taskp:
         stdoutname = subst.do_substitution(taskp, taskp["save.stdout"])
         stdoutfile = open(stdoutname, "wb")
 
+    if "task.cwd" in taskp:
+        os.chdir(subst.do_substitution(taskp, taskp["task.cwd"]))
+
     logging.info("{}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
 except Exception as e:
@@ -198,28 +201,21 @@ subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/
 
 logging.info("start writing output to keep")
 
-done = False
-resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
-reporter = put.progress_writer(machine_progress)
-bytes_expected = put.expected_bytes_for(".")
-while not done:
-    try:
-        out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
-        out.do_queued_work()
-        out.write_directory_tree(".", max_manifest_depth=0)
-        outuuid = out.finish()
-        api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                             body={
-                                                 'output':outuuid,
-                                                 'success': (rcode == 0),
-                                                 'progress':1.0
-                                             }).execute()
-        done = True
-    except KeyboardInterrupt:
-        logging.critical("terminating on signal 2")
-        sys.exit(2)
-    except Exception as e:
-        logging.exception("caught exception:")
-        time.sleep(5)
+if "task.vwd" in taskp:
+    if "task.foreach" in jobp:
+        # This is a subtask, so don't merge with the original collection, that will happen at the end
+        outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
+    else:
+        # Just a single task, so do merge with the original collection
+        outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
+else:
+    outcollection = robust_put.upload(outdir)
+
+api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                     body={
+                                         'output': outcollection,
+                                         'success': (rcode == 0),
+                                         'progress':1.0
+                                     }).execute()
 
 sys.exit(rcode)
diff --git a/crunch_scripts/vwd.py b/crunch_scripts/vwd.py
new file mode 100644
index 0000000..ad922bc
--- /dev/null
+++ b/crunch_scripts/vwd.py
@@ -0,0 +1,53 @@
+import arvados
+import os
+import robust_put
+import stat
+
+# Implements "Virtual Working Directory"
+# Provides a way of emulating a shared writable directory in Keep based
+# on a "check out, edit, check in, merge" model.
+# At the moment, this only permits adding new files, applications
+# cannot modify or delete existing files.
+
+# Create a symlink tree rooted at target_dir mirroring arv-mounted
+# source_collection.  target_dir must be empty, and will be created if it
+# doesn't exist.
+def checkout(source_collection, target_dir, keepmount=None):
+    # create symlinks
+    if keepmount == None:
+        keepmount = os.environ['TASK_KEEPMOUNT']
+
+    if not os.path.exists(target_dir):
+        os.makedirs(target_dir)
+
+    if os.listdir(target_dir) > 0:
+        raise Exception("target_dir must be empty before checkout")
+
+    stem = os.path.join(keepmount, source_collection)
+    for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
+        rel = root[len(stem)+1:]
+        for d in dirs:
+            os.mkdir(os.path.join(target_dir, rel, d))
+        for f in files:
+            os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
+
+# Delete all symlinks and check in any remaining normal files.
+# If merge == True, merge the manifest with source_collection and return a
+# CollectionReader for the combined collection.
+def checkin(source_collection, target_dir, merge=True):
+    # delete symlinks, commit directory, merge manifests and return combined
+    # collection.
+    for root, dirs, files in os.walk(target_dir):
+        for f in files:
+            s = os.lstat(os.path.join(root, f))
+            if stat.S_ISLNK(s.st_mode):
+                os.unlink(os.path.join(root, f))
+
+    uuid = robust_put.upload(target_dir)
+    if merge:
+        cr1 = arvados.CollectionReader(source_collection)
+        cr2 = arvados.CollectionReader(uuid)
+        combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text())
+        return combined
+    else:
+        return arvados.CollectionReader(uuid)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list