[ARVADOS] created: 1.2.0-376-g340ab5273

Git user git at public.curoverse.com
Tue Nov 20 14:37:32 EST 2018


        at  340ab527303c7ca556abaa58dc923235499dcdab (commit)


commit 340ab527303c7ca556abaa58dc923235499dcdab
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 20 14:33:53 2018 -0500

    14510: Perfomance fixes
    
    * Add --collection-cache to enable users to workaround cache thrashing
    
    * Limit task queue size.  Release workflow lock when attempting to
      enqueue a task (which now may block).
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 2e1ea50a3..320265afd 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -143,7 +143,8 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+                                                cap=arvargs.collection_cache)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -220,7 +221,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
 
     def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
+        self.task_queue.add(partial(runnable.run, runtimeContext),
+                            self.workflow_eval_lock, self.stop_polling)
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -230,7 +232,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         with self.workflow_eval_lock:
             j = self.processes[uuid]
             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
+            self.task_queue.add(partial(j.done, record),
+                                self.workflow_eval_lock, self.stop_polling)
             del self.processes[uuid]
 
     def runtime_status_update(self, kind, message, detail=None):
@@ -677,10 +680,14 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                         self.start_run(runnable, runtimeContext)
                 else:
                     if (self.task_queue.in_flight + len(self.processes)) > 0:
-                        self.workflow_eval_lock.wait(3)
+                        self.workflow_eval_lock.wait(8)
                     else:
                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
+
+                if self.stop_polling.is_set():
+                    break
+
                 loopperf.__enter__()
             loopperf.__exit__()
 
@@ -835,6 +842,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=None,
                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
 
+    parser.add_argument("--collection-cache", type=int,
+                        default=256*1024*1024,
+                        help="Collection caches size.")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index b9fd09807..018172b59 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
 class TaskQueue(object):
     def __init__(self, lock, thread_count):
         self.thread_count = thread_count
-        self.task_queue = Queue.Queue()
+        self.task_queue = Queue.Queue(maxsize=self.thread_count)
         self.task_queue_threads = []
         self.lock = lock
         self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
                 with self.lock:
                     self.in_flight -= 1
 
-    def add(self, task):
+    def add(self, task, unlock, check_done):
         with self.lock:
             if self.thread_count > 1:
                 self.in_flight += 1
-                self.task_queue.put(task)
             else:
                 task()
+                return
+
+        while True:
+            try:
+                unlock.release()
+                self.task_queue.put(task, block=True, timeout=3)
+                return
+            except Queue.Full:
+                if check_done.is_set():
+                    return
+            finally:
+                unlock.acquire()
+
 
     def drain(self):
         try:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list