[ARVADOS] created: f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8

git at public.curoverse.com git at public.curoverse.com
Fri May 1 11:34:10 EDT 2015


        at  f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8 (commit)


commit f6fcf9f51d326fa0a42e9ebdd1343a6a6ef6d8e8
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 1 10:49:18 2015 -0400

    5787: Copy to the right destination name, add docstring, fix syntax errors

diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py
index 9d5578a..4a97546 100644
--- a/crunch_scripts/crunchutil/vwd.py
+++ b/crunch_scripts/crunchutil/vwd.py
@@ -2,7 +2,7 @@ import arvados
 import os
 import robust_put
 import stat
-import arvados.command.run
+import arvados.commands.run
 
 # Implements "Virtual Working Directory"
 # Provides a way of emulating a shared writable directory in Keep based
@@ -33,13 +33,17 @@ def checkout(source_collection, target_dir, keepmount=None):
         for f in files:
             os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
 
-def is_collection(fn):
-    if os.path.exists
-
-# Delete all symlinks and check in any remaining normal files.
-# If merge == True, merge the manifest with source_collection and return a
-# CollectionReader for the combined collection.
 def checkin(target_dir):
+    """Write files in the target_dir to Keep.
+
+    Symlinks into the keep mount in the output dir are efficiently added to the
+    collection with no data copying.
+
+    Returns a new Collection object, with data flushed but the collection record
+    not saved to the API.
+
+    """
+
     # delete symlinks, commit directory, merge manifests and return combined
     # collection.
 
@@ -56,7 +60,7 @@ def checkin(target_dir):
             if stat.S_ISLNK(s.st_mode):
                 # 1. check if it is a link into a collection
                 real = os.path.split(os.path.realpath(os.path.join(root, f)))
-                (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1])
+                (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
                 if pdh is not None:
                     # 2. load collection
                     if pdh not in collections:
@@ -65,7 +69,7 @@ def checkin(target_dir):
                                                                                keep_client=outputcollection._my_keep(),
                                                                                num_retries=5)
                     # 3. copy arvfile to new collection
-                    outputcollection.copy(branch, branch, source_collection=collections[pdh])
+                    outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
 
             elif stat.S_ISREG(s.st_mode):
                 reldir = root[len(target_dir):]
@@ -76,4 +80,4 @@ def checkin(target_dir):
                             writer.write(dat)
                             dat = reader.read(64*1024)
 
-    return outputcollection.manifest_text()
+    return outputcollection

commit 1281ecab8f2396739ee9232c36796e46cd551426
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri May 1 10:32:01 2015 -0400

    5787: run-command uploading uses new collection API and supports symlinks into Keep

diff --git a/crunch_scripts/crunchutil/vwd.py b/crunch_scripts/crunchutil/vwd.py
index 3d54c9c..9d5578a 100644
--- a/crunch_scripts/crunchutil/vwd.py
+++ b/crunch_scripts/crunchutil/vwd.py
@@ -2,6 +2,7 @@ import arvados
 import os
 import robust_put
 import stat
+import arvados.command.run
 
 # Implements "Virtual Working Directory"
 # Provides a way of emulating a shared writable directory in Keep based
@@ -32,23 +33,47 @@ def checkout(source_collection, target_dir, keepmount=None):
         for f in files:
             os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
 
+def is_collection(fn):
+    if os.path.exists
+
 # Delete all symlinks and check in any remaining normal files.
 # If merge == True, merge the manifest with source_collection and return a
 # CollectionReader for the combined collection.
-def checkin(source_collection, target_dir, merge=True):
+def checkin(target_dir):
     # delete symlinks, commit directory, merge manifests and return combined
     # collection.
+
+    outputcollection = arvados.collection.Collection(num_retries=5)
+
+    if target_dir[-1:] != '/':
+        target_dir += '/'
+
+    collections = {}
+
     for root, dirs, files in os.walk(target_dir):
         for f in files:
             s = os.lstat(os.path.join(root, f))
             if stat.S_ISLNK(s.st_mode):
-                os.unlink(os.path.join(root, f))
-
-    uuid = robust_put.upload(target_dir)
-    if merge:
-        cr1 = arvados.CollectionReader(source_collection)
-        cr2 = arvados.CollectionReader(uuid)
-        combined = arvados.CollectionReader(cr1.manifest_text() + cr2.manifest_text())
-        return combined
-    else:
-        return arvados.CollectionReader(uuid)
+                # 1. check if it is a link into a collection
+                real = os.path.split(os.path.realpath(os.path.join(root, f)))
+                (pdh, branch) = arvados.command.run.is_in_collection(real[0], real[1])
+                if pdh is not None:
+                    # 2. load collection
+                    if pdh not in collections:
+                        collections[pdh] = arvados.collection.CollectionReader(pdh,
+                                                                               api_client=outputcollection._my_api(),
+                                                                               keep_client=outputcollection._my_keep(),
+                                                                               num_retries=5)
+                    # 3. copy arvfile to new collection
+                    outputcollection.copy(branch, branch, source_collection=collections[pdh])
+
+            elif stat.S_ISREG(s.st_mode):
+                reldir = root[len(target_dir):]
+                with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
+                    with open(os.path.join(root, f), "rb") as reader:
+                        dat = reader.read(64*1024)
+                        while dat:
+                            writer.write(dat)
+                            dat = reader.read(64*1024)
+
+    return outputcollection.manifest_text()
diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 1fcdb40..ae2233e 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -61,8 +61,6 @@ else:
     if 'TASK_KEEPMOUNT' not in os.environ:
         os.environ['TASK_KEEPMOUNT'] = '/keep'
 
-links = []
-
 def sub_tmpdir(v):
     return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
 
@@ -415,24 +413,20 @@ signal.signal(signal.SIGINT, signal.SIG_DFL)
 signal.signal(signal.SIGTERM, signal.SIG_DFL)
 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
 
-for l in links:
-    os.unlink(l)
-
 logger.info("the following output files will be saved to keep:")
 
-subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
+subprocess.call(["find", ".", "-printf", "run-command: %12.12s %h/%f\\n"], stdout=sys.stderr, cwd=outdir)
 
 logger.info("start writing output to keep")
 
-if "task.vwd" in taskp:
-    if "task.foreach" in jobp:
-        # This is a subtask, so don't merge with the original collection, that will happen at the end
-        outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
-    else:
-        # Just a single task, so do merge with the original collection
-        outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
-else:
-    outcollection = robust_put.upload(outdir, logger)
+if "task.vwd" in taskp and "task.foreach" in jobp:
+    for root, dirs, files in os.walk(outdir):
+        for f in files:
+            s = os.lstat(os.path.join(root, f))
+            if stat.S_ISLNK(s.st_mode):
+                os.unlink(os.path.join(root, f))
+
+outcollection = vwd.checkin(outdir).manifest_text()
 
 # Success if we ran any subprocess, and they all exited 0.
 success = rcode and all(status == 0 for status in rcode.itervalues())

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list