[ARVADOS] updated: 7daef1e224ee6fb8b03cd7b71773c5381c07324e

Git user git at public.curoverse.com
Fri Aug 19 11:13:54 EDT 2016


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py | 50 ++++++++++++++++++++++++++++++++++-------
 1 file changed, 42 insertions(+), 8 deletions(-)

       via  7daef1e224ee6fb8b03cd7b71773c5381c07324e (commit)
       via  e0fc29c78d959d19c3d63d3bfb204b1c444518bd (commit)
      from  d4bb1f8a3c68288f45d2c1fa618c333825fcc8d0 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 7daef1e224ee6fb8b03cd7b71773c5381c07324e
Merge: d4bb1f8 e0fc29c
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Aug 19 11:13:51 2016 -0400

    Merge branch '9820-cwl-poll-jobs' closes #9820


commit e0fc29c78d959d19c3d63d3bfb204b1c444518bd
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Fri Aug 19 10:50:25 2016 -0400

    9820: Directly poll job or container records that we are interested in. Benefit: puts less load on database than log table polling, and doesn't miss events.

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 27af075..0d0d416 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,7 +17,6 @@ import cwltool.main
 import cwltool.workflow
 
 import arvados
-import arvados.events
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
@@ -50,6 +49,8 @@ class ArvCwlRunner(object):
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -99,6 +100,41 @@ class ArvCwlRunner(object):
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
@@ -182,12 +218,9 @@ class ArvCwlRunner(object):
             runnerjob.run()
             return runnerjob.uuid
 
-        arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
-        if self.work_api == "containers":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
-        if self.work_api == "jobs":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -217,7 +250,6 @@ class ArvCwlRunner(object):
             while self.processes:
                 self.cond.wait(1)
 
-            events.close()
         except UnsupportedRequirement:
             raise
         except:
@@ -233,6 +265,8 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list