[ARVADOS] updated: 1.1.3-339-g540ecd0

Git user git at public.curoverse.com
Thu Apr 5 17:10:37 EDT 2018


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py     | 102 +++++++++++++++++++++++-------------
 sdk/cwl/arvados_cwl/arvcontainer.py |  12 ++---
 sdk/cwl/arvados_cwl/arvjob.py       |  11 ++--
 sdk/python/arvados/safeapi.py       |   6 ++-
 4 files changed, 83 insertions(+), 48 deletions(-)

       via  540ecd0ae604df1cf02a63515e6e9e8e04e6e64a (commit)
       via  69c8df415d721461135331a50e98255a625b12d1 (commit)
       via  4647e6d7fb4a33c8896caf7f60f27df52040d45d (commit)
      from  25be7a865d12707f5d2afe9300124fe4ef75145d (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 540ecd0ae604df1cf02a63515e6e9e8e04e6e64a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 5 17:10:25 2018 -0400

    13108: Work queue submit wip
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index fbef534..7390155 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -17,6 +17,7 @@ import json
 import re
 from functools import partial
 import pkg_resources  # part of setuptools
+import Queue
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -82,6 +83,7 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
+        self.runnable_queue = Queue.Queue()
 
         if keep_client is not None:
             self.keep_client = keep_client
@@ -123,23 +125,29 @@ class ArvCwlRunner(object):
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
-        if processStatus == "success":
-            logger.info("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
-        else:
-            logger.warn("Overall process status is %s", processStatus)
-            if self.pipeline:
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
-        self.final_status = processStatus
-        self.final_output = out
+        with self.workflow_eval_lock:
+            if processStatus == "success":
+                logger.info("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            else:
+                logger.warn("Overall process status is %s", processStatus)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            self.final_status = processStatus
+            self.final_output = out
+
+    def runnable_queue_thread(self):
+        while True:
+            runnable, kwargs = self.runnable_queue.get()
+            runnable.run(**kwargs)
 
     def start_run(self, runnable, kwargs):
         with self.workflow_eval_lock:
             self.in_flight += 1
-        runnable.run(**kwargs)
+        self.runnable_queue.put((runnable, kwargs))
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -151,6 +159,13 @@ class ArvCwlRunner(object):
             if uuid in self.processes:
                 del self.processes[uuid]
 
+    def wrapped_callback(self, cb, obj, st):
+        with self.workflow_eval_lock:
+            cb(obj, st)
+
+    def get_wrapped_callback(self, cb):
+        return partial(self.wrapped_callback, cb)
+
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
@@ -496,6 +511,8 @@ class ArvCwlRunner(object):
         self.polling_thread = threading.Thread(target=self.poll_states)
         self.polling_thread.start()
 
+        threading.Thread(target=self.runnable_queue_thread).start()
+
         if runnerjob:
             jobiter = iter((runnerjob,))
         else:
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index ab6dddb..eb333d0 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -248,6 +248,8 @@ class ArvadosContainer(object):
                 container_request["name"] = wfrecord["name"]
             container_request["properties"]["template_uuid"] = wfuuid
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             response = self.arvrunner.api.container_requests().create(
                 body=container_request
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 0c35115..e2df831 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -134,6 +134,8 @@ class ArvadosJob(object):
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
 
+        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
         try:
             with Perf(metrics, "create %s" % self.name):
                 response = self.arvrunner.api.jobs().create(

commit 69c8df415d721461135331a50e98255a625b12d1
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 5 16:32:19 2018 -0400

    13108: Simplify locking, add methods for recording process status
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c0dd9d3..fbef534 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -65,8 +65,8 @@ class ArvCwlRunner(object):
     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
         self.api = api_client
         self.processes = {}
-        self.lock = threading.Lock()
-        self.cond = threading.Condition(self.lock)
+        self.in_flight = 0
+        self.workflow_eval_lock = threading.Condition(threading.RLock())
         self.final_output = None
         self.final_status = None
         self.uploaded = {}
@@ -136,27 +136,39 @@ class ArvCwlRunner(object):
         self.final_status = processStatus
         self.final_output = out
 
+    def start_run(self, runnable, kwargs):
+        with self.workflow_eval_lock:
+            self.in_flight += 1
+        runnable.run(**kwargs)
+
+    def process_submitted(self, container):
+        with self.workflow_eval_lock:
+            self.processes[container.uuid] = container
+            self.in_flight -= 1
+
+    def process_done(self, uuid):
+        with self.workflow_eval_lock:
+            if uuid in self.processes:
+                del self.processes[uuid]
+
     def on_message(self, event):
         if "object_uuid" in event:
             if event["object_uuid"] in self.processes and event["event_type"] == "update":
                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
                     uuid = event["object_uuid"]
-                    with self.lock:
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         logger.info("%s %s is Running", self.label(j), uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                     uuid = event["object_uuid"]
-                    try:
-                        self.cond.acquire()
+                    with self.workflow_eval_lock:
                         j = self.processes[uuid]
                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
-                        self.cond.notify()
-                    finally:
-                        self.cond.release()
+                        self.workflow_eval_lock.notify()
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -172,8 +184,8 @@ class ArvCwlRunner(object):
                 self.stop_polling.wait(15)
                 if self.stop_polling.is_set():
                     break
-                with self.lock:
-                    keys = self.processes.keys()
+                with self.workflow_eval_lock:
+                    keys = list(self.processes.keys())
                 if not keys:
                     continue
 
@@ -198,10 +210,9 @@ class ArvCwlRunner(object):
                     })
         except:
             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
-            self.cond.acquire()
-            self.processes.clear()
-            self.cond.notify()
-            self.cond.release()
+            with workflow_eval_lock:
+                self.processes.clear()
+                self.workflow_eval_lock.notify()
         finally:
             self.stop_polling.set()
 
@@ -495,10 +506,11 @@ class ArvCwlRunner(object):
                                **kwargs)
 
         try:
-            self.cond.acquire()
-            # Will continue to hold the lock for the duration of this code
-            # except when in cond.wait(), at which point on_message can update
-            # job state and process output callbacks.
+            self.workflow_eval_lock.acquire()
+            # Holds the lock while this code runs and releases it when
+            # it is safe to do so in self.workflow_eval_lock.wait(),
+            # at which point on_message can update job state and
+            # process output callbacks.
 
             loopperf = Perf(metrics, "jobiter")
             loopperf.__enter__()
@@ -510,10 +522,10 @@ class ArvCwlRunner(object):
 
                 if runnable:
                     with Perf(metrics, "run"):
-                        runnable.run(**kwargs)
+                        self.start_run(runnable, kwargs)
                 else:
-                    if self.processes:
-                        self.cond.wait(1)
+                    if (self.in_flight + len(self.processes)) > 0:
+                        self.workflow_eval_lock.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
@@ -521,7 +533,7 @@ class ArvCwlRunner(object):
             loopperf.__exit__()
 
             while self.processes:
-                self.cond.wait(1)
+                self.workflow_eval_lock.wait(1)
 
         except UnsupportedRequirement:
             raise
@@ -537,7 +549,7 @@ class ArvCwlRunner(object):
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
-            self.cond.release()
+            self.workflow_eval_lock.release()
             self.stop_polling.set()
             self.polling_thread.join()
 
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index 5c11bab..ab6dddb 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -254,7 +254,7 @@ class ArvadosContainer(object):
             ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
-            self.arvrunner.processes[self.uuid] = self
+            self.arvrunner.process_submitted(self)
 
             if response["state"] == "Final":
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
@@ -315,8 +315,7 @@ class ArvadosContainer(object):
             processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])
 
 
 class RunnerContainer(Runner):
@@ -446,7 +445,7 @@ class RunnerContainer(Runner):
         ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
-        self.arvrunner.processes[self.uuid] = self
+        self.arvrunner.process_submitted(self)
 
         logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
 
@@ -464,5 +463,4 @@ class RunnerContainer(Runner):
         else:
             super(RunnerContainer, self).done(container)
         finally:
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index 88155b5..0c35115 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -150,7 +150,8 @@ class ArvadosJob(object):
                     find_or_create=enable_reuse
                 ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["uuid"]] = self
+            self.uuid = response["uuid"]
+            self.arvrunner.process_submitted(self)
 
             self.update_pipeline_component(response)
 
@@ -263,8 +264,8 @@ class ArvadosJob(object):
                 processStatus = "permanentFail"
         finally:
             self.output_callback(outputs, processStatus)
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])
+
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
@@ -351,7 +352,7 @@ class RunnerJob(Runner):
             return
 
         self.uuid = job["uuid"]
-        self.arvrunner.processes[self.uuid] = self
+        self.arvrunner.process_submitted(self)
 
         if job["state"] in ("Complete", "Failed", "Cancelled"):
             self.done(job)

commit 4647e6d7fb4a33c8896caf7f60f27df52040d45d
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 5 15:45:58 2018 -0400

    13108: Use ThreadSafeApiCache
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index a0b7172..c0dd9d3 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -745,7 +745,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     try:
         if api_client is None:
-            api_client=arvados.api('v1', model=OrderedJsonModel())
+            api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+            keep_client = api_client.keep
         if keep_client is None:
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
diff --git a/sdk/python/arvados/safeapi.py b/sdk/python/arvados/safeapi.py
index 5c8a836..b12c121 100644
--- a/sdk/python/arvados/safeapi.py
+++ b/sdk/python/arvados/safeapi.py
@@ -20,16 +20,18 @@ class ThreadSafeApiCache(object):
 
     """
 
-    def __init__(self, apiconfig=None, keep_params={}):
+    def __init__(self, apiconfig=None, keep_params={}, api_params={}):
         if apiconfig is None:
             apiconfig = config.settings()
         self.apiconfig = copy.copy(apiconfig)
+        self.api_params = api_params
         self.local = threading.local()
         self.keep = keep.KeepClient(api_client=self, **keep_params)
 
     def localapi(self):
         if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig)
+            self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig,
+                                                     **self.api_params)
         return self.local.api
 
     def __getattr__(self, name):

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list