[ARVADOS] created: 1.2.0-376-g340ab5273
Git user
git at public.curoverse.com
Tue Nov 20 14:37:32 EST 2018
at 340ab527303c7ca556abaa58dc923235499dcdab (commit)
commit 340ab527303c7ca556abaa58dc923235499dcdab
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 20 14:33:53 2018 -0500
14510: Perfomance fixes
* Add --collection-cache to enable users to workaround cache thrashing
* Limit task queue size. Release workflow lock when attempting to
enqueue a task (which now may block).
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 2e1ea50a3..320265afd 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -143,7 +143,8 @@ class ArvCwlRunner(object):
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
@@ -220,7 +221,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -230,7 +232,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
@@ -677,10 +680,14 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
self.start_run(runnable, runtimeContext)
else:
if (self.task_queue.in_flight + len(self.processes)) > 0:
- self.workflow_eval_lock.wait(3)
+ self.workflow_eval_lock.wait(8)
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
@@ -835,6 +842,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=None,
help="Update and commit supplied container request instead of creating a new one (containers API only).")
+ parser.add_argument("--collection-cache", type=int,
+ default=256*1024*1024,
+ help="Collection caches size.")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index b9fd09807..018172b59 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue = Queue.Queue()
+ self.task_queue = Queue.Queue(maxsize=self.thread_count)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
with self.lock:
self.in_flight -= 1
- def add(self, task):
+ def add(self, task, unlock, check_done):
with self.lock:
if self.thread_count > 1:
self.in_flight += 1
- self.task_queue.put(task)
else:
task()
+ return
+
+ while True:
+ try:
+ unlock.release()
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except Queue.Full:
+ if check_done.is_set():
+ return
+ finally:
+ unlock.acquire()
+
def drain(self):
try:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list