[ARVADOS] created: 94810160b78dde4735b02edc8d5426ef4b5a9cc0
Git user
git at public.curoverse.com
Fri Aug 19 11:09:06 EDT 2016
at 94810160b78dde4735b02edc8d5426ef4b5a9cc0 (commit)
commit 94810160b78dde4735b02edc8d5426ef4b5a9cc0
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Aug 19 10:50:25 2016 -0400
9820: Directly poll job or container records that we are interested in. Benefit: puts less load on database than log table polling, and doesn't miss events.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 27af075..c74f613 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,7 +17,6 @@ import cwltool.main
import cwltool.workflow
import arvados
-import arvados.events
import arvados.config
from .arvcontainer import ArvadosContainer, RunnerContainer
@@ -50,6 +49,8 @@ class ArvCwlRunner(object):
self.num_retries = 4
self.uuid = None
self.work_api = work_api
+ self.stop_polling = threading.Event()
+ self.poll_api = None
if self.work_api is None:
# todo: autodetect API to use.
@@ -99,6 +100,41 @@ class ArvCwlRunner(object):
finally:
self.cond.release()
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs is a separate thread.
+ """
+
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
+
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+
def get_uploaded(self):
return self.uploaded.copy()
@@ -182,12 +218,9 @@ class ArvCwlRunner(object):
runnerjob.run()
return runnerjob.uuid
- arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
- if self.work_api == "containers":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
- if self.work_api == "jobs":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.poll_api = arvados.api('v1')
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
if runnerjob:
jobiter = iter((runnerjob,))
@@ -217,7 +250,6 @@ class ArvCwlRunner(object):
while self.processes:
self.cond.wait(1)
- events.close()
except UnsupportedRequirement:
raise
except:
@@ -233,6 +265,8 @@ class ArvCwlRunner(object):
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
+ self.stop_polling.set()
+ self.polling_thread.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list