[ARVADOS] updated: 1.1.4-27-g19da21a

Git user git at public.curoverse.com
Thu Apr 12 20:58:51 EDT 2018


Summary of changes:
 sdk/cwl/arvados_cwl/arvjob.py        | 39 ++++++++++++++++++------------------
 sdk/cwl/arvados_cwl/crunch_script.py |  3 ++-
 sdk/cwl/arvados_cwl/runner.py        |  3 +--
 3 files changed, 23 insertions(+), 22 deletions(-)

       via  19da21ab8e56154d7db15c2643524cb8348a7a8a (commit)
      from  e4c5f98f696c354638bbba22ee4a1db20a52837c (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 19da21ab8e56154d7db15c2643524cb8348a7a8a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 12 20:54:57 2018 -0400

    13108: crunch_script uses safeapi
    
    Also take workflow lock on update_pipeline_component to prevent update races.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index e222152..decd692 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -184,27 +184,28 @@ class ArvadosJob(object):
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            with Perf(metrics, "update_pipeline_component %s" % self.name):
-                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
-                    uuid=self.arvrunner.pipeline["uuid"],
-                    body={
-                        "components": self.arvrunner.pipeline["components"]
-                    }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(
-                        uuid=self.arvrunner.uuid,
+        with self.arvrunner.workflow_eval_lock:
+            if self.arvrunner.pipeline:
+                self.arvrunner.pipeline["components"][self.name] = {"job": record}
+                with Perf(metrics, "update_pipeline_component %s" % self.name):
+                    self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+                        uuid=self.arvrunner.pipeline["uuid"],
                         body={
-                            "components": components
+                            "components": self.arvrunner.pipeline["components"]
                         }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
+            if self.arvrunner.uuid:
+                try:
+                    job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                    if job:
+                        components = job["components"]
+                        components[self.name] = record["uuid"]
+                        self.arvrunner.api.jobs().update(
+                            uuid=self.arvrunner.uuid,
+                            body={
+                                "components": components
+                            }).execute(num_retries=self.arvrunner.num_retries)
+                except Exception as e:
+                    logger.info("Error adding to components: %s", e)
 
     def done(self, record):
         try:
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index aaeffea..bf940ec 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -97,7 +97,8 @@ def run():
             debug = job_order_object["arv:debug"]
             del job_order_object["arv:debug"]
 
-        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+            api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
                                           output_name=output_name, output_tags=output_tags)
 
         make_fs_access = functools.partial(CollectionFsAccess,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 053c995..bf0eb08 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -400,5 +400,4 @@ class Runner(object):
         else:
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list