[ARVADOS] updated: 1.1.3-339-g540ecd0
Git user
git at public.curoverse.com
Thu Apr 5 17:10:37 EDT 2018
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 102 +++++++++++++++++++++++-------------
sdk/cwl/arvados_cwl/arvcontainer.py | 12 ++---
sdk/cwl/arvados_cwl/arvjob.py | 11 ++--
sdk/python/arvados/safeapi.py | 6 ++-
4 files changed, 83 insertions(+), 48 deletions(-)
via 540ecd0ae604df1cf02a63515e6e9e8e04e6e64a (commit)
via 69c8df415d721461135331a50e98255a625b12d1 (commit)
via 4647e6d7fb4a33c8896caf7f60f27df52040d45d (commit)
from 25be7a865d12707f5d2afe9300124fe4ef75145d (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 540ecd0ae604df1cf02a63515e6e9e8e04e6e64a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 5 17:10:25 2018 -0400
13108: Work queue submit wip
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index fbef534..7390155 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,6 +17,7 @@ import json
import re
from functools import partial
import pkg_resources # part of setuptools
+import Queue
from cwltool.errors import WorkflowException
import cwltool.main
@@ -82,6 +83,7 @@ class ArvCwlRunner(object):
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
+ self.runnable_queue = Queue.Queue()
if keep_client is not None:
self.keep_client = keep_client
@@ -123,23 +125,29 @@ class ArvCwlRunner(object):
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
- else:
- logger.warn("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ else:
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+
+ def runnable_queue_thread(self):
+ while True:
+ runnable, kwargs = self.runnable_queue.get()
+ runnable.run(**kwargs)
def start_run(self, runnable, kwargs):
with self.workflow_eval_lock:
self.in_flight += 1
- runnable.run(**kwargs)
+ self.runnable_queue.put((runnable, kwargs))
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -151,6 +159,13 @@ class ArvCwlRunner(object):
if uuid in self.processes:
del self.processes[uuid]
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
@@ -496,6 +511,8 @@ class ArvCwlRunner(object):
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
+ threading.Thread(target=self.runnable_queue_thread).start()
+
if runnerjob:
jobiter = iter((runnerjob,))
else:
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ab6dddb..eb333d0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -248,6 +248,8 @@ class ArvadosContainer(object):
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
response = self.arvrunner.api.container_requests().create(
body=container_request
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 0c35115..e2df831 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -134,6 +134,8 @@ class ArvadosJob(object):
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
with Perf(metrics, "create %s" % self.name):
response = self.arvrunner.api.jobs().create(
commit 69c8df415d721461135331a50e98255a625b12d1
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 5 16:32:19 2018 -0400
13108: Simplify locking, add methods for recording process status
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c0dd9d3..fbef534 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -65,8 +65,8 @@ class ArvCwlRunner(object):
def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
self.api = api_client
self.processes = {}
- self.lock = threading.Lock()
- self.cond = threading.Condition(self.lock)
+ self.in_flight = 0
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.uploaded = {}
@@ -136,27 +136,39 @@ class ArvCwlRunner(object):
self.final_status = processStatus
self.final_output = out
+ def start_run(self, runnable, kwargs):
+ with self.workflow_eval_lock:
+ self.in_flight += 1
+ runnable.run(**kwargs)
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+ self.in_flight -= 1
+
+ def process_done(self, uuid):
+ with self.workflow_eval_lock:
+ if uuid in self.processes:
+ del self.processes[uuid]
+
def on_message(self, event):
if "object_uuid" in event:
if event["object_uuid"] in self.processes and event["event_type"] == "update":
if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
uuid = event["object_uuid"]
- with self.lock:
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is Running", self.label(j), uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
uuid = event["object_uuid"]
- try:
- self.cond.acquire()
+ with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ self.workflow_eval_lock.notify()
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -172,8 +184,8 @@ class ArvCwlRunner(object):
self.stop_polling.wait(15)
if self.stop_polling.is_set():
break
- with self.lock:
- keys = self.processes.keys()
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
if not keys:
continue
@@ -198,10 +210,9 @@ class ArvCwlRunner(object):
})
except:
logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- self.cond.acquire()
- self.processes.clear()
- self.cond.notify()
- self.cond.release()
+ with workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notify()
finally:
self.stop_polling.set()
@@ -495,10 +506,11 @@ class ArvCwlRunner(object):
**kwargs)
try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ self.workflow_eval_lock.acquire()
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
loopperf = Perf(metrics, "jobiter")
loopperf.__enter__()
@@ -510,10 +522,10 @@ class ArvCwlRunner(object):
if runnable:
with Perf(metrics, "run"):
- runnable.run(**kwargs)
+ self.start_run(runnable, kwargs)
else:
- if self.processes:
- self.cond.wait(1)
+ if (self.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(1)
else:
logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
break
@@ -521,7 +533,7 @@ class ArvCwlRunner(object):
loopperf.__exit__()
while self.processes:
- self.cond.wait(1)
+ self.workflow_eval_lock.wait(1)
except UnsupportedRequirement:
raise
@@ -537,7 +549,7 @@ class ArvCwlRunner(object):
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
- self.cond.release()
+ self.workflow_eval_lock.release()
self.stop_polling.set()
self.polling_thread.join()
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 5c11bab..ab6dddb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -254,7 +254,7 @@ class ArvadosContainer(object):
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
@@ -315,8 +315,7 @@ class ArvadosContainer(object):
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
class RunnerContainer(Runner):
@@ -446,7 +445,7 @@ class RunnerContainer(Runner):
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
@@ -464,5 +463,4 @@ class RunnerContainer(Runner):
else:
super(RunnerContainer, self).done(container)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88155b5..0c35115 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -150,7 +150,8 @@ class ArvadosJob(object):
find_or_create=enable_reuse
).execute(num_retries=self.arvrunner.num_retries)
- self.arvrunner.processes[response["uuid"]] = self
+ self.uuid = response["uuid"]
+ self.arvrunner.process_submitted(self)
self.update_pipeline_component(response)
@@ -263,8 +264,8 @@ class ArvadosJob(object):
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
+
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -351,7 +352,7 @@ class RunnerJob(Runner):
return
self.uuid = job["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if job["state"] in ("Complete", "Failed", "Cancelled"):
self.done(job)
commit 4647e6d7fb4a33c8896caf7f60f27df52040d45d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 5 15:45:58 2018 -0400
13108: Use ThreadSafeApiCache
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index a0b7172..c0dd9d3 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -745,7 +745,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
try:
if api_client is None:
- api_client=arvados.api('v1', model=OrderedJsonModel())
+ api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+ keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
diff --git a/sdk/python/arvados/safeapi.py b/sdk/python/arvados/safeapi.py
index 5c8a836..b12c121 100644
--- a/sdk/python/arvados/safeapi.py
+++ b/sdk/python/arvados/safeapi.py
@@ -20,16 +20,18 @@ class ThreadSafeApiCache(object):
"""
- def __init__(self, apiconfig=None, keep_params={}):
+ def __init__(self, apiconfig=None, keep_params={}, api_params={}):
if apiconfig is None:
apiconfig = config.settings()
self.apiconfig = copy.copy(apiconfig)
+ self.api_params = api_params
self.local = threading.local()
self.keep = keep.KeepClient(api_client=self, **keep_params)
def localapi(self):
if 'api' not in self.local.__dict__:
- self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig)
+ self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig,
+ **self.api_params)
return self.local.api
def __getattr__(self, name):
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list