[ARVADOS] updated: 1.1.3-344-g6aef698
Git user
git at public.curoverse.com
Fri Apr 6 13:06:52 EDT 2018
Summary of changes:
build/build-dev-docker-jobs-image.sh | 6 +--
sdk/cwl/arvados_cwl/__init__.py | 86 +++++++++++++-----------------------
sdk/cwl/arvados_cwl/arvcontainer.py | 24 +++++-----
sdk/cwl/arvados_cwl/arvjob.py | 4 +-
sdk/cwl/arvados_cwl/task_queue.py | 56 +++++++++++++++++++++++
5 files changed, 104 insertions(+), 72 deletions(-)
create mode 100644 sdk/cwl/arvados_cwl/task_queue.py
via 6aef698e8f1f99aa0511afd21db86d4c7cf8b5da (commit)
via 1a3de5eab1062485e6985d43bc7c3e036254efb6 (commit)
via a82b39e1849a67854cb3399a4660b77548edd580 (commit)
from 5f179cb06b6e26c3359fed97d48d13408150f6f1 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 6aef698e8f1f99aa0511afd21db86d4c7cf8b5da
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Apr 6 11:32:03 2018 -0400
13108: Refactor task queue into its own class.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 16f1bf4..7affade 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -40,6 +40,7 @@ from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
from ._version import __version__
from cwltool.pack import pack
@@ -65,11 +66,9 @@ class ArvCwlRunner(object):
"""
def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4,
- thread_count=4):
+ output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
- self.in_flight = 0
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
@@ -86,9 +85,7 @@ class ArvCwlRunner(object):
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.task_queue = Queue.Queue()
- self.task_queue_threads = []
- self.thread_count = thread_count
+ self.thread_count = 4
self.poll_interval = 12
if keep_client is not None:
@@ -146,25 +143,9 @@ class ArvCwlRunner(object):
self.final_output = out
self.workflow_eval_lock.notifyAll()
- def task_queue_func(self):
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- task()
- with self.workflow_eval_lock:
- self.in_flight -= 1
-
- def task_queue_add(self, task):
- with self.workflow_eval_lock:
- if self.thread_count > 1:
- self.in_flight += 1
- self.task_queue.put(task)
- else:
- task()
def start_run(self, runnable, kwargs):
- self.task_queue_add(partial(runnable.run, **kwargs))
+ self.task_queue.add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -197,7 +178,7 @@ class ArvCwlRunner(object):
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
with self.workflow_eval_lock:
j = self.processes[uuid]
- self.task_queue_add(partial(j.done, event["properties"]["new_attributes"]))
+ self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
@@ -405,6 +386,7 @@ class ArvCwlRunner(object):
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
self.secret_store = kwargs.get("secret_store")
+ self.thread_count = kwargs.get("thread_count", 4)
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
@@ -533,10 +515,7 @@ class ArvCwlRunner(object):
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- for r in xrange(0, self.thread_count):
- t = threading.Thread(target=self.task_queue_func)
- self.task_queue_threads.append(t)
- t.start()
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
if runnerjob:
jobiter = iter((runnerjob,))
@@ -562,11 +541,14 @@ class ArvCwlRunner(object):
if self.stop_polling.is_set():
break
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
if runnable:
with Perf(metrics, "run"):
self.start_run(runnable, kwargs)
else:
- if (self.in_flight + len(self.processes)) > 0:
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
@@ -574,7 +556,9 @@ class ArvCwlRunner(object):
loopperf.__enter__()
loopperf.__exit__()
- while (self.in_flight + len(self.processes)) > 0:
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
@@ -592,18 +576,10 @@ class ArvCwlRunner(object):
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.workflow_eval_lock.release()
- try:
- # Drain queue
- while not self.task_queue.empty():
- self.task_queue.get()
- except Queue.Empty:
- pass
+ self.task_queue.drain()
self.stop_polling.set()
self.polling_thread.join()
- for t in self.task_queue_threads:
- self.task_queue.put(None)
- for t in self.task_queue_threads:
- t.join()
+ self.task_queue.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
new file mode 100644
index 0000000..cc3e86e
--- /dev/null
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -0,0 +1,56 @@
+import Queue
+import threading
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+class TaskQueue(object):
+ def __init__(self, lock, thread_count):
+ self.thread_count = thread_count
+ self.task_queue = Queue.Queue()
+ self.task_queue_threads = []
+ self.lock = lock
+ self.in_flight = 0
+ self.error = None
+
+ for r in xrange(0, self.thread_count):
+ t = threading.Thread(target=self.task_queue_func)
+ self.task_queue_threads.append(t)
+ t.start()
+
+ def task_queue_func(self):
+
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unexpected error running task")
+ self.error = e
+
+ with self.lock:
+ self.in_flight -= 1
+
+ def add(self, task):
+ with self.lock:
+ if self.thread_count > 1:
+ self.in_flight += 1
+ self.task_queue.put(task)
+ else:
+ task()
+
+ def drain(self):
+ try:
+ # Drain queue
+ while not self.task_queue.empty():
+ self.task_queue.get()
+ except Queue.Empty:
+ pass
+
+ def join(self):
+ for t in self.task_queue_threads:
+ self.task_queue.put(None)
+ for t in self.task_queue_threads:
+ t.join()
commit 1a3de5eab1062485e6985d43bc7c3e036254efb6
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Apr 6 11:21:56 2018 -0400
13108: Fix tracking tasks in flight
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 0f916ee..16f1bf4 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -144,6 +144,7 @@ class ArvCwlRunner(object):
body={"state": "Failed"}).execute(num_retries=self.num_retries)
self.final_status = processStatus
self.final_output = out
+ self.workflow_eval_lock.notifyAll()
def task_queue_func(self):
while True:
@@ -151,22 +152,23 @@ class ArvCwlRunner(object):
if task is None:
return
task()
+ with self.workflow_eval_lock:
+ self.in_flight -= 1
def task_queue_add(self, task):
- if self.thread_count > 1:
- self.task_queue.put(task)
- else:
- task()
+ with self.workflow_eval_lock:
+ if self.thread_count > 1:
+ self.in_flight += 1
+ self.task_queue.put(task)
+ else:
+ task()
def start_run(self, runnable, kwargs):
- with self.workflow_eval_lock:
- self.in_flight += 1
self.task_queue_add(partial(runnable.run, **kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
self.processes[container.uuid] = container
- self.in_flight -= 1
def process_done(self, uuid):
with self.workflow_eval_lock:
@@ -176,6 +178,7 @@ class ArvCwlRunner(object):
def wrapped_callback(self, cb, obj, st):
with self.workflow_eval_lock:
cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
def get_wrapped_callback(self, cb):
return partial(self.wrapped_callback, cb)
@@ -183,24 +186,19 @@ class ArvCwlRunner(object):
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
with self.workflow_eval_lock:
j = self.processes[uuid]
- logger.info("%s %s is Running", self.label(j), uuid)
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
+ if j.running is False:
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ logger.info("%s %s is Running", self.label(j), uuid)
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- uuid = event["object_uuid"]
with self.workflow_eval_lock:
j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- def done_cb():
- j.done(event["properties"]["new_attributes"])
- with self.workflow_eval_lock:
- self.workflow_eval_lock.notify()
- self.task_queue_add(done_cb)
-
+ self.task_queue_add(partial(j.done, event["properties"]["new_attributes"]))
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -251,7 +249,7 @@ class ArvCwlRunner(object):
logger.exception("Fatal error in state polling thread.")
with self.workflow_eval_lock:
self.processes.clear()
- self.workflow_eval_lock.notify()
+ self.workflow_eval_lock.notifyAll()
finally:
self.stop_polling.set()
@@ -571,7 +569,7 @@ class ArvCwlRunner(object):
if (self.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
break
loopperf.__enter__()
loopperf.__exit__()
commit a82b39e1849a67854cb3399a4660b77548edd580
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Fri Apr 6 10:29:30 2018 -0400
13108: Rename parallel_submit_count to thread_count
Make sure it gets passed through to RunnerContainer.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/build/build-dev-docker-jobs-image.sh b/build/build-dev-docker-jobs-image.sh
index e1e5063..9393c1a 100755
--- a/build/build-dev-docker-jobs-image.sh
+++ b/build/build-dev-docker-jobs-image.sh
@@ -22,10 +22,8 @@ EOF
set -e
if [[ -z "$WORKSPACE" ]] ; then
- echo "$helpmessage"
- echo
- echo "Must set WORKSPACE"
- exit 1
+ export WORKSPACE=$(readlink -f $(dirname $0)/..)
+ echo "Using WORKSPACE $WORKSPACE"
fi
if [[ -z "$ARVADOS_API_HOST" || -z "$ARVADOS_API_TOKEN" ]] ; then
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index f70fa65..0f916ee 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -66,7 +66,7 @@ class ArvCwlRunner(object):
def __init__(self, api_client, work_api=None, keep_client=None,
output_name=None, output_tags=None, num_retries=4,
- parallel_submit_count=4):
+ thread_count=4):
self.api = api_client
self.processes = {}
self.in_flight = 0
@@ -88,7 +88,7 @@ class ArvCwlRunner(object):
self.trash_intermediate = False
self.task_queue = Queue.Queue()
self.task_queue_threads = []
- self.parallel_submit_count = parallel_submit_count
+ self.thread_count = thread_count
self.poll_interval = 12
if keep_client is not None:
@@ -153,7 +153,7 @@ class ArvCwlRunner(object):
task()
def task_queue_add(self, task):
- if self.parallel_submit_count > 1:
+ if self.thread_count > 1:
self.task_queue.put(task)
else:
task()
@@ -528,14 +528,14 @@ class ArvCwlRunner(object):
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run(wait=kwargs.get("wait"))
+ runnerjob.run(**kwargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
- for r in xrange(0, self.parallel_submit_count):
+ for r in xrange(0, self.thread_count):
t = threading.Thread(target=self.task_queue_func)
self.task_queue_threads.append(t)
t.start()
@@ -755,8 +755,8 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
action="store_true", default=False,
help=argparse.SUPPRESS)
- parser.add_argument("--parallel-submit-count", type=int,
- default=4, help="Submit requests in parallel (default 4)")
+ parser.add_argument("--thread-count", type=int,
+ default=4, help="Number of threads to use for job submit and output collection.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index eb333d0..afcf2db 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -400,7 +400,18 @@ class RunnerContainer(Runner):
# --api=containers means use the containers API
# --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
# --disable-validate because we already validated so don't need to do it again
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps", "--disable-validate"]
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
@@ -411,11 +422,6 @@ class RunnerContainer(Runner):
if kwargs.get("debug"):
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
if self.on_error:
command.append("--on-error=" + self.on_error)
@@ -428,8 +434,6 @@ class RunnerContainer(Runner):
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
@@ -437,9 +441,9 @@ class RunnerContainer(Runner):
return container_req
- def run(self, *args, **kwargs):
+ def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
response = self.arvrunner.api.container_requests().create(
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index e2df831..e222152 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -317,8 +317,8 @@ class RunnerJob(Runner):
}
}
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ def run(self, **kwargs):
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list