[ARVADOS] updated: 1.1.3-341-g5f179cb
Git user
git at public.curoverse.com
Fri Apr 6 09:56:45 EDT 2018
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 72 ++++++++++++++++++++++++++++++++---------
1 file changed, 56 insertions(+), 16 deletions(-)
via 5f179cb06b6e26c3359fed97d48d13408150f6f1 (commit)
via 8c81e6c09228a9d7a3e8036624c60367615ddfc6 (commit)
from 540ecd0ae604df1cf02a63515e6e9e8e04e6e64a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 5f179cb06b6e26c3359fed97d48d13408150f6f1
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Apr 6 08:44:23 2018 -0400
13108: Rename runnable_queue -> task_queue
Execute synchronously when parallel_submit_count == 1
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c0e919c..f70fa65 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -86,8 +86,8 @@ class ArvCwlRunner(object):
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.runnable_queue = Queue.Queue()
- self.runnable_queue_threads = []
+ self.task_queue = Queue.Queue()
+ self.task_queue_threads = []
self.parallel_submit_count = parallel_submit_count
self.poll_interval = 12
@@ -145,17 +145,23 @@ class ArvCwlRunner(object):
self.final_status = processStatus
self.final_output = out
- def runnable_queue_func(self):
+ def task_queue_func(self):
while True:
- task = self.runnable_queue.get()
+ task = self.task_queue.get()
if task is None:
return
task()
+ def task_queue_add(self, task):
+ if self.parallel_submit_count > 1:
+ self.task_queue.put(task)
+ else:
+ task()
+
def start_run(self, runnable, kwargs):
with self.workflow_eval_lock:
self.in_flight += 1
- self.runnable_queue.put(partial(runnable.run, **kwargs))
+ self.task_queue_add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -193,7 +199,7 @@ class ArvCwlRunner(object):
j.done(event["properties"]["new_attributes"])
with self.workflow_eval_lock:
self.workflow_eval_lock.notify()
- self.runnable_queue.put(done_cb)
+ self.task_queue_add(done_cb)
def label(self, obj):
@@ -530,8 +536,8 @@ class ArvCwlRunner(object):
self.polling_thread.start()
for r in xrange(0, self.parallel_submit_count):
- t = threading.Thread(target=self.runnable_queue_func)
- self.runnable_queue_threads.append(t)
+ t = threading.Thread(target=self.task_queue_func)
+ self.task_queue_threads.append(t)
t.start()
if runnerjob:
@@ -570,7 +576,7 @@ class ArvCwlRunner(object):
loopperf.__enter__()
loopperf.__exit__()
- while self.processes:
+ while (self.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
@@ -590,15 +596,15 @@ class ArvCwlRunner(object):
self.workflow_eval_lock.release()
try:
# Drain queue
- while not self.runnable_queue.empty():
- self.runnable_queue.get()
+ while not self.task_queue.empty():
+ self.task_queue.get()
except Queue.Empty:
pass
self.stop_polling.set()
self.polling_thread.join()
- for t in self.runnable_queue_threads:
- self.runnable_queue.put(None)
- for t in self.runnable_queue_threads:
+ for t in self.task_queue_threads:
+ self.task_queue.put(None)
+ for t in self.task_queue_threads:
t.join()
if self.final_status == "UnsupportedRequirement":
commit 8c81e6c09228a9d7a3e8036624c60367615ddfc6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 5 18:20:24 2018 -0400
13108: Collect outputs in separate threads as well
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7390155..c0e919c 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -18,6 +18,7 @@ import re
from functools import partial
import pkg_resources # part of setuptools
import Queue
+import time
from cwltool.errors import WorkflowException
import cwltool.main
@@ -63,7 +64,9 @@ class ArvCwlRunner(object):
"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+ def __init__(self, api_client, work_api=None, keep_client=None,
+ output_name=None, output_tags=None, num_retries=4,
+ parallel_submit_count=4):
self.api = api_client
self.processes = {}
self.in_flight = 0
@@ -84,6 +87,9 @@ class ArvCwlRunner(object):
self.intermediate_output_collections = []
self.trash_intermediate = False
self.runnable_queue = Queue.Queue()
+ self.runnable_queue_threads = []
+ self.parallel_submit_count = parallel_submit_count
+ self.poll_interval = 12
if keep_client is not None:
self.keep_client = keep_client
@@ -139,15 +145,17 @@ class ArvCwlRunner(object):
self.final_status = processStatus
self.final_output = out
- def runnable_queue_thread(self):
+ def runnable_queue_func(self):
while True:
- runnable, kwargs = self.runnable_queue.get()
- runnable.run(**kwargs)
+ task = self.runnable_queue.get()
+ if task is None:
+ return
+ task()
def start_run(self, runnable, kwargs):
with self.workflow_eval_lock:
self.in_flight += 1
- self.runnable_queue.put((runnable, kwargs))
+ self.runnable_queue.put(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -181,9 +189,12 @@ class ArvCwlRunner(object):
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- with Perf(metrics, "done %s" % j.name):
- j.done(event["properties"]["new_attributes"])
- self.workflow_eval_lock.notify()
+ def done_cb():
+ j.done(event["properties"]["new_attributes"])
+ with self.workflow_eval_lock:
+ self.workflow_eval_lock.notify()
+ self.runnable_queue.put(done_cb)
+
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -195,15 +206,19 @@ class ArvCwlRunner(object):
"""
try:
+ remain_wait = self.poll_interval
while True:
- self.stop_polling.wait(15)
+ if remain_wait > 0:
+ self.stop_polling.wait(remain_wait)
if self.stop_polling.is_set():
break
with self.workflow_eval_lock:
keys = list(self.processes.keys())
if not keys:
+ remain_wait = self.poll_interval
continue
+ begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
elif self.work_api == "jobs":
@@ -213,6 +228,7 @@ class ArvCwlRunner(object):
proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
except Exception as e:
logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
continue
for p in proc_states["items"]:
@@ -223,9 +239,11 @@ class ArvCwlRunner(object):
"new_attributes": p
}
})
+ finish_poll = time.time()
+ remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
- logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- with workflow_eval_lock:
+ logger.exception("Fatal error in state polling thread.")
+ with self.workflow_eval_lock:
self.processes.clear()
self.workflow_eval_lock.notify()
finally:
@@ -511,7 +529,10 @@ class ArvCwlRunner(object):
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- threading.Thread(target=self.runnable_queue_thread).start()
+ for r in xrange(0, self.parallel_submit_count):
+ t = threading.Thread(target=self.runnable_queue_func)
+ self.runnable_queue_threads.append(t)
+ t.start()
if runnerjob:
jobiter = iter((runnerjob,))
@@ -542,7 +563,7 @@ class ArvCwlRunner(object):
self.start_run(runnable, kwargs)
else:
if (self.in_flight + len(self.processes)) > 0:
- self.workflow_eval_lock.wait(1)
+ self.workflow_eval_lock.wait(3)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
@@ -550,7 +571,7 @@ class ArvCwlRunner(object):
loopperf.__exit__()
while self.processes:
- self.workflow_eval_lock.wait(1)
+ self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
raise
@@ -567,8 +588,18 @@ class ArvCwlRunner(object):
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
+ try:
+ # Drain queue
+ while not self.runnable_queue.empty():
+ self.runnable_queue.get()
+ except Queue.Empty:
+ pass
self.stop_polling.set()
self.polling_thread.join()
+ for t in self.runnable_queue_threads:
+ self.runnable_queue.put(None)
+ for t in self.runnable_queue_threads:
+ t.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
@@ -718,6 +749,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
action="store_true", default=False,
help=argparse.SUPPRESS)
+ parser.add_argument("--parallel-submit-count", type=int,
+ default=4, help="Submit requests in parallel (default 4)")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list