[ARVADOS] updated: 1.1.3-341-g5f179cb

Git user git at public.curoverse.com
Fri Apr 6 09:56:45 EDT 2018


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 72 ++++++++++++++++++++++++++++++++---------
 1 file changed, 56 insertions(+), 16 deletions(-)

       via  5f179cb06b6e26c3359fed97d48d13408150f6f1 (commit)
       via  8c81e6c09228a9d7a3e8036624c60367615ddfc6 (commit)
      from  540ecd0ae604df1cf02a63515e6e9e8e04e6e64a (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 5f179cb06b6e26c3359fed97d48d13408150f6f1
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Fri Apr 6 08:44:23 2018 -0400

    13108: Rename runnable_queue -> task_queue
    
    Execute synchronously when parallel_submit_count == 1
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c0e919c..f70fa65 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -86,8 +86,8 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.runnable_queue = Queue.Queue()
-        self.runnable_queue_threads = []
+        self.task_queue = Queue.Queue()
+        self.task_queue_threads = []
         self.parallel_submit_count = parallel_submit_count
         self.poll_interval = 12
 
@@ -145,17 +145,23 @@ class ArvCwlRunner(object):
             self.final_status = processStatus
             self.final_output = out
 
-    def runnable_queue_func(self):
+    def task_queue_func(self):
         while True:
-            task = self.runnable_queue.get()
+            task = self.task_queue.get()
             if task is None:
                 return
             task()
 
+    def task_queue_add(self, task):
+        if self.parallel_submit_count > 1:
+            self.task_queue.put(task)
+        else:
+            task()
+
     def start_run(self, runnable, kwargs):
         with self.workflow_eval_lock:
             self.in_flight += 1
-        self.runnable_queue.put(partial(runnable.run, **kwargs))
+        self.task_queue_add(partial(runnable.run, **kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -193,7 +199,7 @@ class ArvCwlRunner(object):
                         j.done(event["properties"]["new_attributes"])
                         with self.workflow_eval_lock:
                             self.workflow_eval_lock.notify()
-                    self.runnable_queue.put(done_cb)
+                    self.task_queue_add(done_cb)
 
 
     def label(self, obj):
@@ -530,8 +536,8 @@ class ArvCwlRunner(object):
         self.polling_thread.start()
 
         for r in xrange(0, self.parallel_submit_count):
-            t = threading.Thread(target=self.runnable_queue_func)
-            self.runnable_queue_threads.append(t)
+            t = threading.Thread(target=self.task_queue_func)
+            self.task_queue_threads.append(t)
             t.start()
 
         if runnerjob:
@@ -570,7 +576,7 @@ class ArvCwlRunner(object):
                 loopperf.__enter__()
             loopperf.__exit__()
 
-            while self.processes:
+            while (self.in_flight + len(self.processes)) > 0:
                 self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
@@ -590,15 +596,15 @@ class ArvCwlRunner(object):
             self.workflow_eval_lock.release()
             try:
                 # Drain queue
-                while not self.runnable_queue.empty():
-                    self.runnable_queue.get()
+                while not self.task_queue.empty():
+                    self.task_queue.get()
             except Queue.Empty:
                 pass
             self.stop_polling.set()
             self.polling_thread.join()
-            for t in self.runnable_queue_threads:
-                self.runnable_queue.put(None)
-            for t in self.runnable_queue_threads:
+            for t in self.task_queue_threads:
+                self.task_queue.put(None)
+            for t in self.task_queue_threads:
                 t.join()
 
         if self.final_status == "UnsupportedRequirement":

commit 8c81e6c09228a9d7a3e8036624c60367615ddfc6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 5 18:20:24 2018 -0400

    13108: Collect outputs in separate threads as well
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7390155..c0e919c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -18,6 +18,7 @@ import re
 from functools import partial
 import pkg_resources  # part of setuptools
 import Queue
+import time
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -63,7 +64,9 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+    def __init__(self, api_client, work_api=None, keep_client=None,
+                 output_name=None, output_tags=None, num_retries=4,
+                 parallel_submit_count=4):
         self.api = api_client
         self.processes = {}
         self.in_flight = 0
@@ -84,6 +87,9 @@ class ArvCwlRunner(object):
         self.intermediate_output_collections = []
         self.trash_intermediate = False
         self.runnable_queue = Queue.Queue()
+        self.runnable_queue_threads = []
+        self.parallel_submit_count = parallel_submit_count
+        self.poll_interval = 12
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -139,15 +145,17 @@ class ArvCwlRunner(object):
             self.final_status = processStatus
             self.final_output = out
 
-    def runnable_queue_thread(self):
+    def runnable_queue_func(self):
         while True:
-            runnable, kwargs = self.runnable_queue.get()
-            runnable.run(**kwargs)
+            task = self.runnable_queue.get()
+            if task is None:
+                return
+            task()
 
     def start_run(self, runnable, kwargs):
         with self.workflow_eval_lock:
             self.in_flight += 1
-        self.runnable_queue.put((runnable, kwargs))
+        self.runnable_queue.put(partial(runnable.run, **kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -181,9 +189,12 @@ class ArvCwlRunner(object):
                     with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
-                        with Perf(metrics, "done %s" % j.name):
-                            j.done(event["properties"]["new_attributes"])
-                        self.workflow_eval_lock.notify()
+                    def done_cb():
+                        j.done(event["properties"]["new_attributes"])
+                        with self.workflow_eval_lock:
+                            self.workflow_eval_lock.notify()
+                    self.runnable_queue.put(done_cb)
+
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -195,15 +206,19 @@ class ArvCwlRunner(object):
         """
 
         try:
+            remain_wait = self.poll_interval
             while True:
-                self.stop_polling.wait(15)
+                if remain_wait > 0:
+                    self.stop_polling.wait(remain_wait)
                 if self.stop_polling.is_set():
                     break
                 with self.workflow_eval_lock:
                     keys = list(self.processes.keys())
                 if not keys:
+                    remain_wait = self.poll_interval
                     continue
 
+                begin_poll = time.time()
                 if self.work_api == "containers":
                     table = self.poll_api.container_requests()
                 elif self.work_api == "jobs":
@@ -213,6 +228,7 @@ class ArvCwlRunner(object):
                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
                 except Exception as e:
                     logger.warn("Error checking states on API server: %s", e)
+                    remain_wait = self.poll_interval
                     continue
 
                 for p in proc_states["items"]:
@@ -223,9 +239,11 @@ class ArvCwlRunner(object):
                             "new_attributes": p
                         }
                     })
+                finish_poll = time.time()
+                remain_wait = self.poll_interval - (finish_poll - begin_poll)
         except:
-            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            with workflow_eval_lock:
+            logger.exception("Fatal error in state polling thread.")
+            with self.workflow_eval_lock:
                 self.processes.clear()
                 self.workflow_eval_lock.notify()
         finally:
@@ -511,7 +529,10 @@ class ArvCwlRunner(object):
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
-        threading.Thread(target=self.runnable_queue_thread).start()
+        for r in xrange(0, self.parallel_submit_count):
+            t = threading.Thread(target=self.runnable_queue_func)
+            self.runnable_queue_threads.append(t)
+            t.start()
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -542,7 +563,7 @@ class ArvCwlRunner(object):
                         self.start_run(runnable, kwargs)
                 else:
                     if (self.in_flight + len(self.processes)) > 0:
-                        self.workflow_eval_lock.wait(1)
+                        self.workflow_eval_lock.wait(3)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
@@ -550,7 +571,7 @@ class ArvCwlRunner(object):
             loopperf.__exit__()
 
             while self.processes:
-                self.workflow_eval_lock.wait(1)
+                self.workflow_eval_lock.wait(3)
 
         except UnsupportedRequirement:
             raise
@@ -567,8 +588,18 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.workflow_eval_lock.release()
+            try:
+                # Drain queue
+                while not self.runnable_queue.empty():
+                    self.runnable_queue.get()
+            except Queue.Empty:
+                pass
             self.stop_polling.set()
             self.polling_thread.join()
+            for t in self.runnable_queue_threads:
+                self.runnable_queue.put(None)
+            for t in self.runnable_queue_threads:
+                t.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -718,6 +749,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         action="store_true", default=False,
                         help=argparse.SUPPRESS)
 
+    parser.add_argument("--parallel-submit-count", type=int,
+                        default=4, help="Submit requests in parallel (default 4)")
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--trash-intermediate", action="store_true",
                         default=False, dest="trash_intermediate",

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list