[ARVADOS] updated: d23fba84b810714ec2ab41a1501a39a5c665dcd4

git at public.curoverse.com git at public.curoverse.com
Tue Jul 8 14:57:20 EDT 2014


Summary of changes:
 crunch_scripts/run-command | 94 ++++++++++++++++++++++++++++++++--------------
 1 file changed, 66 insertions(+), 28 deletions(-)

       via  d23fba84b810714ec2ab41a1501a39a5c665dcd4 (commit)
       via  110bbb02892911796b2f0e6a7c3562cc0fe6e5d5 (commit)
       via  377dfa814e50bb021ddf2aead4bf08144d74f7da (commit)
       via  22a4724b136c749b8545de0c6dbd451030dfc43b (commit)
       via  1d387fdfc68ba8c07ab42aa104ab3a8369e7d0ec (commit)
       via  870c8d93da0a1532c752bd5feda5a91e1621e40d (commit)
       via  980e13c7716fd72eca84070d83e8586bdb22eda4 (commit)
       via  0e765d1f39a9355df4b8cdb54bfa60cb3fbe78ed (commit)
       via  744e90e697a085661d4d0b6b1ec33642199fcd43 (commit)
       via  96df15a015600cb2cf55f5a2791cf42371b2feed (commit)
       via  8445c610f4a03ac775a17ae20ac219fbbe63a6ab (commit)
       via  d0fe5e9f95b45580d1d234a5adbe34705c7be1ff (commit)
      from  4b2ab09e3ee91cb63ae42a21d0efb004c053af8a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit d23fba84b810714ec2ab41a1501a39a5c665dcd4
Merge: 4b2ab09 110bbb0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:57:14 2014 -0400

    Merge branch 'resumable-output-upload'. Adds signal handling and error recovery
    when uploading job output to run-command wrapper.  refs #2342


commit 110bbb02892911796b2f0e6a7c3562cc0fe6e5d5
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:51:18 2014 -0400

    Actually use the resume cache

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 6359ee6..e6ec889 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -49,7 +49,7 @@ subst.default_subs["node.cores"] = sub_cores
 rcode = 1
 
 def machine_progress(bytes_written, bytes_expected):
-    return "run-command: {} written {} total\n".format(
+    return "run-command: wrote {} total {}\n".format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 
 class SigHandler(object):
@@ -113,7 +113,7 @@ reporter = put.progress_writer(machine_progress)
 bytes_expected = put.expected_bytes_for(".")
 while not done:
     try:
-        out = put.ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+        out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
         out.do_queued_work()
         out.write_directory_tree(".", max_manifest_depth=0)
         outuuid = out.finish()

commit 377dfa814e50bb021ddf2aead4bf08144d74f7da
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:29:29 2014 -0400

    Turns out sys.exit runs finally: blocks.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 8341904..6359ee6 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -59,7 +59,6 @@ class SigHandler(object):
     def send_signal(self, sp, signum):
         sp.send_signal(signum)
         self.sig = signum
-        print("We get signal! %s" % signum)
 
 try:
     cmd = []
@@ -85,55 +84,51 @@ try:
     # wait for process to complete.
     rcode = sp.wait()
 
-    print("Sig is %s" % sig)
-    print("Sig.sig is %s" % sig.sig)
-
     if sig.sig != None:
         print("run-command: terminating on signal %s" % sig.sig)
-        sys.exit(rcode)
+        sys.exit(2)
     else:
         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
 
 except Exception as e:
     print("run-command: caught exception: {}".format(e))
 
-finally:
-    # restore default signal handlers.
-    signal.signal(signal.SIGINT, signal.SIG_DFL)
-    signal.signal(signal.SIGTERM, signal.SIG_DFL)
-    signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-
-    for l in links:
-        os.unlink(l)
-
-    print("run-command: the follow output files will be saved to keep:")
-
-    subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
-
-    print("run-command: start writing output to keep")
-
-    done = False
-    resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
-    reporter = put.progress_writer(machine_progress)
-    bytes_expected = put.expected_bytes_for(".")
-    while not done:
-        try:
-            out = put.ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
-            out.do_queued_work()
-            out.write_directory_tree(".", max_manifest_depth=0)
-            outuuid = out.finish()
-            api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                                 body={
-                                                     'output':outuuid,
-                                                     'success': (rcode == 0),
-                                                     'progress':1.0
-                                                 }).execute()
-            done = True
-        except KeyboardInterrupt:
-            print("run-command: terminating on signal SIGINT")
-            done = True
-        except Exception as e:
-            print("run-command: caught exception: {}".format(e))
-            time.sleep(5)
+# restore default signal handlers.
+signal.signal(signal.SIGINT, signal.SIG_DFL)
+signal.signal(signal.SIGTERM, signal.SIG_DFL)
+signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+
+for l in links:
+    os.unlink(l)
+
+print("run-command: the follow output files will be saved to keep:")
+
+subprocess.call(["find", ".", "-type", "f", "-printf", "run-command: %12.12s %h/%f\\n"])
+
+print("run-command: start writing output to keep")
+
+done = False
+resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
+reporter = put.progress_writer(machine_progress)
+bytes_expected = put.expected_bytes_for(".")
+while not done:
+    try:
+        out = put.ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+        out.do_queued_work()
+        out.write_directory_tree(".", max_manifest_depth=0)
+        outuuid = out.finish()
+        api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+                                             body={
+                                                 'output':outuuid,
+                                                 'success': (rcode == 0),
+                                                 'progress':1.0
+                                             }).execute()
+        done = True
+    except KeyboardInterrupt:
+        print("run-command: terminating on signal 2")
+        sys.exit(2)
+    except Exception as e:
+        print("run-command: caught exception: {}".format(e))
+        time.sleep(5)
 
 sys.exit(rcode)

commit 22a4724b136c749b8545de0c6dbd451030dfc43b
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:24:32 2014 -0400

    Add debugging

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 7a65f53..8341904 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -59,6 +59,7 @@ class SigHandler(object):
     def send_signal(self, sp, signum):
         sp.send_signal(signum)
         self.sig = signum
+        print("We get signal! %s" % signum)
 
 try:
     cmd = []
@@ -84,6 +85,9 @@ try:
     # wait for process to complete.
     rcode = sp.wait()
 
+    print("Sig is %s" % sig)
+    print("Sig.sig is %s" % sig.sig)
+
     if sig.sig != None:
         print("run-command: terminating on signal %s" % sig.sig)
         sys.exit(rcode)

commit 1d387fdfc68ba8c07ab42aa104ab3a8369e7d0ec
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:21:05 2014 -0400

    Wrong num args

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index ab8433d..7a65f53 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -56,7 +56,7 @@ class SigHandler(object):
     def __init__(self):
         self.sig = None
 
-    def send_signal(self, sp, signum, sig):
+    def send_signal(self, sp, signum):
         sp.send_signal(signum)
         self.sig = signum
 

commit 870c8d93da0a1532c752bd5feda5a91e1621e40d
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:19:45 2014 -0400

    SigHandler object

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 2eba372..ab8433d 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -52,9 +52,13 @@ def machine_progress(bytes_written, bytes_expected):
     return "run-command: {} written {} total\n".format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 
-def send_signal(sp, signum, sig):
-    sp.send_signal(signum)
-    sig.append(signum)
+class SigHandler(object):
+    def __init__(self):
+        self.sig = None
+
+    def send_signal(self, sp, signum, sig):
+        sp.send_signal(signum)
+        self.sig = signum
 
 try:
     cmd = []
@@ -70,18 +74,18 @@ try:
     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
-    sig = []
+    sig = SigHandler()
 
     # forward signals to the process.
-    signal.signal(signal.SIGINT, lambda signum, frame: send_signal(sp, signum, sig))
-    signal.signal(signal.SIGTERM, lambda signum, frame: send_signal(sp, signum, sig))
-    signal.signal(signal.SIGQUIT, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
+    signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
 
     # wait for process to complete.
     rcode = sp.wait()
 
-    if len(sig) > 0:
-        print("run-command: terminating on signal %s" % sig[0])
+    if sig.sig != None:
+        print("run-command: terminating on signal %s" % sig.sig)
         sys.exit(rcode)
     else:
         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))

commit 980e13c7716fd72eca84070d83e8586bdb22eda4
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:16:21 2014 -0400

    Still messing with signals.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 7a65f53..2eba372 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -52,13 +52,9 @@ def machine_progress(bytes_written, bytes_expected):
     return "run-command: {} written {} total\n".format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 
-class SigHandler(object):
-    def __init__(self):
-        self.sig = None
-
-    def send_signal(self, sp, signum):
-        sp.send_signal(signum)
-        self.sig = signum
+def send_signal(sp, signum, sig):
+    sp.send_signal(signum)
+    sig.append(signum)
 
 try:
     cmd = []
@@ -74,18 +70,18 @@ try:
     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
-    sig = SigHandler()
+    sig = []
 
     # forward signals to the process.
-    signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
-    signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
-    signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
+    signal.signal(signal.SIGINT, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGTERM, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: send_signal(sp, signum, sig))
 
     # wait for process to complete.
     rcode = sp.wait()
 
-    if sig.sig != None:
-        print("run-command: terminating on signal %s" % sig.sig)
+    if len(sig) > 0:
+        print("run-command: terminating on signal %s" % sig[0])
         sys.exit(rcode)
     else:
         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))

commit 0e765d1f39a9355df4b8cdb54bfa60cb3fbe78ed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:08:28 2014 -0400

    Fix wrong number of parameters

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 71a9f96..7a65f53 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -56,7 +56,7 @@ class SigHandler(object):
     def __init__(self):
         self.sig = None
 
-    def send_signal(self, sp, signum, sig):
+    def send_signal(self, sp, signum):
         sp.send_signal(signum)
         self.sig = signum
 
@@ -84,10 +84,8 @@ try:
     # wait for process to complete.
     rcode = sp.wait()
 
-    print("sig is %s" % sig.sig)
-
-    if len(sig) > 0:
-        print("run-command: terminating on signal %s" % sig[0])
+    if sig.sig != None:
+        print("run-command: terminating on signal %s" % sig.sig)
         sys.exit(rcode)
     else:
         print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))

commit 744e90e697a085661d4d0b6b1ec33642199fcd43
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:06:15 2014 -0400

    Record signals a different way

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 2eba372..71a9f96 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -52,9 +52,13 @@ def machine_progress(bytes_written, bytes_expected):
     return "run-command: {} written {} total\n".format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 
-def send_signal(sp, signum, sig):
-    sp.send_signal(signum)
-    sig.append(signum)
+class SigHandler(object):
+    def __init__(self):
+        self.sig = None
+
+    def send_signal(self, sp, signum, sig):
+        sp.send_signal(signum)
+        self.sig = signum
 
 try:
     cmd = []
@@ -70,16 +74,18 @@ try:
     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
-    sig = []
+    sig = SigHandler()
 
     # forward signals to the process.
-    signal.signal(signal.SIGINT, lambda signum, frame: send_signal(sp, signum, sig))
-    signal.signal(signal.SIGTERM, lambda signum, frame: send_signal(sp, signum, sig))
-    signal.signal(signal.SIGQUIT, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGINT, lambda signum, frame: sig.send_signal(sp, signum))
+    signal.signal(signal.SIGTERM, lambda signum, frame: sig.send_signal(sp, signum))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: sig.send_signal(sp, signum))
 
     # wait for process to complete.
     rcode = sp.wait()
 
+    print("sig is %s" % sig.sig)
+
     if len(sig) > 0:
         print("run-command: terminating on signal %s" % sig[0])
         sys.exit(rcode)

commit 96df15a015600cb2cf55f5a2791cf42371b2feed
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 14:02:26 2014 -0400

    More work on signal handling

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 520f516..2eba372 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -52,6 +52,10 @@ def machine_progress(bytes_written, bytes_expected):
     return "run-command: {} written {} total\n".format(
         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
 
+def send_signal(sp, signum, sig):
+    sp.send_signal(signum)
+    sig.append(signum)
+
 try:
     cmd = []
     for c in p["command"]:
@@ -66,16 +70,21 @@ try:
     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
     sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
+    sig = []
 
     # forward signals to the process.
-    signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
-    signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
-    signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+    signal.signal(signal.SIGINT, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGTERM, lambda signum, frame: send_signal(sp, signum, sig))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: send_signal(sp, signum, sig))
 
     # wait for process to complete.
     rcode = sp.wait()
 
-    print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
+    if len(sig) > 0:
+        print("run-command: terminating on signal %s" % sig[0])
+        sys.exit(rcode)
+    else:
+        print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
 
 except Exception as e:
     print("run-command: caught exception: {}".format(e))
@@ -112,6 +121,9 @@ finally:
                                                      'progress':1.0
                                                  }).execute()
             done = True
+        except KeyboardInterrupt:
+            print("run-command: terminating on signal SIGINT")
+            done = True
         except Exception as e:
             print("run-command: caught exception: {}".format(e))
             time.sleep(5)

commit 8445c610f4a03ac775a17ae20ac219fbbe63a6ab
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 13:50:58 2014 -0400

    Forward signals to process.

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 5cb17dc..520f516 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -9,6 +9,7 @@ import shutil
 import subst
 import time
 import arvados.commands.put as put
+import signal
 
 os.umask(0077)
 
@@ -64,7 +65,15 @@ try:
 
     print("run-command: {}{}".format(' '.join(cmd), (" > " + stdoutname) if stdoutname != None else ""))
 
-    rcode = subprocess.call(cmd, stdout=stdoutfile)
+    sp = subprocess.Popen(cmd, shell=False, stdout=stdoutfile)
+
+    # forward signals to the process.
+    signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+    signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+    signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+    # wait for process to complete.
+    rcode = sp.wait()
 
     print("run-command: completed with exit code %i (%s)" % (rcode, "success" if rcode == 0 else "failed"))
 
@@ -72,6 +81,11 @@ except Exception as e:
     print("run-command: caught exception: {}".format(e))
 
 finally:
+    # restore default signal handlers.
+    signal.signal(signal.SIGINT, signal.SIG_DFL)
+    signal.signal(signal.SIGTERM, signal.SIG_DFL)
+    signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+
     for l in links:
         os.unlink(l)
 

commit d0fe5e9f95b45580d1d234a5adbe34705c7be1ff
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 8 11:12:40 2014 -0400

    Use resumable upload writing output in run-command

diff --git a/crunch_scripts/run-command b/crunch_scripts/run-command
index 268c038..5cb17dc 100755
--- a/crunch_scripts/run-command
+++ b/crunch_scripts/run-command
@@ -8,6 +8,7 @@ import sys
 import shutil
 import subst
 import time
+import arvados.commands.put as put
 
 os.umask(0077)
 
@@ -46,6 +47,10 @@ subst.default_subs["node.cores"] = sub_cores
 
 rcode = 1
 
+def machine_progress(bytes_written, bytes_expected):
+    return "run-command: {} written {} total\n".format(
+        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
+
 try:
     cmd = []
     for c in p["command"]:
@@ -77,9 +82,13 @@ finally:
     print("run-command: start writing output to keep")
 
     done = False
+    resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
+    reporter = put.progress_writer(machine_progress)
+    bytes_expected = put.expected_bytes_for(".")
     while not done:
         try:
-            out = arvados.CollectionWriter()
+            out = put.ArvPutCollectionWriter(resume_cache, reporter, bytes_expected)
+            out.do_queued_work()
             out.write_directory_tree(".", max_manifest_depth=0)
             outuuid = out.finish()
             api.job_tasks().update(uuid=arvados.current_task()['uuid'],

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list