[ARVADOS] created: e855cd9e08c203ae05f8f64682b642b7d8caa8e1
Git user
git at public.curoverse.com
Fri Aug 19 10:50:33 EDT 2016
at e855cd9e08c203ae05f8f64682b642b7d8caa8e1 (commit)
commit e855cd9e08c203ae05f8f64682b642b7d8caa8e1
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Fri Aug 19 10:50:25 2016 -0400
9820: Directly poll job or container records that we are interested in.
Benefit: puts less load on database than log table polling, and doesn't miss
events.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 27af075..c74f613 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,7 +17,6 @@ import cwltool.main
import cwltool.workflow
import arvados
-import arvados.events
import arvados.config
from .arvcontainer import ArvadosContainer, RunnerContainer
@@ -50,6 +49,8 @@ class ArvCwlRunner(object):
self.num_retries = 4
self.uuid = None
self.work_api = work_api
+ self.stop_polling = threading.Event()
+ self.poll_api = None
if self.work_api is None:
# todo: autodetect API to use.
@@ -99,6 +100,41 @@ class ArvCwlRunner(object):
finally:
self.cond.release()
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs is a separate thread.
+ """
+
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
+
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+
def get_uploaded(self):
return self.uploaded.copy()
@@ -182,12 +218,9 @@ class ArvCwlRunner(object):
runnerjob.run()
return runnerjob.uuid
- arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
- if self.work_api == "containers":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
- if self.work_api == "jobs":
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ self.poll_api = arvados.api('v1')
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
if runnerjob:
jobiter = iter((runnerjob,))
@@ -217,7 +250,6 @@ class ArvCwlRunner(object):
while self.processes:
self.cond.wait(1)
- events.close()
except UnsupportedRequirement:
raise
except:
@@ -233,6 +265,8 @@ class ArvCwlRunner(object):
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
self.cond.release()
+ self.stop_polling.set()
+ self.polling_thread.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
diff --git a/services/api/lib/eventbus.rb b/services/api/lib/eventbus.rb
index e7f2bb1..6000c28 100644
--- a/services/api/lib/eventbus.rb
+++ b/services/api/lib/eventbus.rb
@@ -10,6 +10,8 @@ require 'load_param'
require 'set'
require 'thread'
+#ActiveRecord::Base.logger = Logger.new(STDOUT)
+
# Patch in user, last_log_id and filters fields into the Faye::Websocket class.
module Faye
class WebSocket
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list