[ARVADOS] created: 1.1.4-27-g19da21a

Git user git at public.curoverse.com
Thu Apr 12 22:26:38 EDT 2018


        at  19da21ab8e56154d7db15c2643524cb8348a7a8a (commit)


commit 19da21ab8e56154d7db15c2643524cb8348a7a8a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 12 20:54:57 2018 -0400

    13108: crunch_script uses safeapi
    
    Also take workflow lock on update_pipeline_component to prevent update races.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index e222152..decd692 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -184,27 +184,28 @@ class ArvadosJob(object):
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
-        if self.arvrunner.pipeline:
-            self.arvrunner.pipeline["components"][self.name] = {"job": record}
-            with Perf(metrics, "update_pipeline_component %s" % self.name):
-                self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
-                    uuid=self.arvrunner.pipeline["uuid"],
-                    body={
-                        "components": self.arvrunner.pipeline["components"]
-                    }).execute(num_retries=self.arvrunner.num_retries)
-        if self.arvrunner.uuid:
-            try:
-                job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
-                if job:
-                    components = job["components"]
-                    components[self.name] = record["uuid"]
-                    self.arvrunner.api.jobs().update(
-                        uuid=self.arvrunner.uuid,
+        with self.arvrunner.workflow_eval_lock:
+            if self.arvrunner.pipeline:
+                self.arvrunner.pipeline["components"][self.name] = {"job": record}
+                with Perf(metrics, "update_pipeline_component %s" % self.name):
+                    self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+                        uuid=self.arvrunner.pipeline["uuid"],
                         body={
-                            "components": components
+                            "components": self.arvrunner.pipeline["components"]
                         }).execute(num_retries=self.arvrunner.num_retries)
-            except Exception as e:
-                logger.info("Error adding to components: %s", e)
+            if self.arvrunner.uuid:
+                try:
+                    job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+                    if job:
+                        components = job["components"]
+                        components[self.name] = record["uuid"]
+                        self.arvrunner.api.jobs().update(
+                            uuid=self.arvrunner.uuid,
+                            body={
+                                "components": components
+                            }).execute(num_retries=self.arvrunner.num_retries)
+                except Exception as e:
+                    logger.info("Error adding to components: %s", e)
 
     def done(self, record):
         try:
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index aaeffea..bf940ec 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -97,7 +97,8 @@ def run():
             debug = job_order_object["arv:debug"]
             del job_order_object["arv:debug"]
 
-        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+        runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+            api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
                                           output_name=output_name, output_tags=output_tags)
 
         make_fs_access = functools.partial(CollectionFsAccess,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 053c995..bf0eb08 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -400,5 +400,4 @@ class Runner(object):
         else:
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
-            if record["uuid"] in self.arvrunner.processes:
-                del self.arvrunner.processes[record["uuid"]]
+            self.arvrunner.process_done(record["uuid"])

commit e4c5f98f696c354638bbba22ee4a1db20a52837c
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Thu Apr 12 16:22:52 2018 -0400

    13108: Don't try to install signal handler in background thread
    
    Fix Docker image upload failure due to arv-put trying to install a
    signal handler somewhere other than the main thread.
    
    Also install toplevel signal handler to convert SIGTERM to
    KeyboardInterrupt.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c2f43fe..2fa6da7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,8 @@ from functools import partial
 import pkg_resources  # part of setuptools
 import Queue
 import time
+import signal
+import thread
 
 from cwltool.errors import WorkflowException
 import cwltool.main
@@ -564,7 +566,7 @@ class ArvCwlRunner(object):
             raise
         except:
             if sys.exc_info()[0] is KeyboardInterrupt:
-                logger.error("Interrupted, marking pipeline as failed")
+                logger.error("Interrupted, workflow will be cancelled")
             else:
                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
@@ -761,12 +763,16 @@ def add_arv_hints():
         "http://arvados.org/cwl#ReuseRequirement"
     ])
 
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+         install_sig_handlers=True):
     parser = arg_parser()
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
+    if install_sig_handlers:
+        signal.signal(signal.SIGTERM, lambda x, y: thread.interrupt_main())
+
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 5483ccb..68573aa 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -57,7 +57,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
             args.append(image_tag)
             logger.info("Uploading Docker image %s:%s", image_name, image_tag)
             try:
-                arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+                arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False)
             except SystemExit as e:
                 if e.code:
                     raise WorkflowException("keepdocker exited with code %s" % e.code)
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index ff7201a..16fefdb 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -345,7 +345,7 @@ def _uuid2pdh(api, uuid):
         select=['portable_data_hash'],
     ).execute()['items'][0]['portable_data_hash']
 
-def main(arguments=None, stdout=sys.stdout):
+def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True):
     args = arg_parser.parse_args(arguments)
     api = arvados.api('v1')
 
@@ -490,7 +490,8 @@ def main(arguments=None, stdout=sys.stdout):
             put_args += ['--name', collection_name]
 
         coll_uuid = arv_put.main(
-            put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
+            put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
+            install_sig_handlers=install_sig_handlers).strip()
 
         # Read the image metadata and make Arvados links from it.
         image_file.seek(0)
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 5dde8e5..af8e243 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -993,7 +993,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
     return query.execute(num_retries=num_retries)['uuid']
 
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+         install_sig_handlers=True):
     global api_client
 
     args = parse_arguments(arguments)
@@ -1014,8 +1015,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
-    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
-                            for sigcode in CAUGHT_SIGNALS}
+    orig_signal_handlers = {}
+    if install_sig_handlers:
+        orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+                                for sigcode in CAUGHT_SIGNALS}
 
     # Determine the name to use
     if args.name:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list