[ARVADOS] created: 1.1.4-27-g19da21a
Git user
git at public.curoverse.com
Thu Apr 12 22:26:38 EDT 2018
at 19da21ab8e56154d7db15c2643524cb8348a7a8a (commit)
commit 19da21ab8e56154d7db15c2643524cb8348a7a8a
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 12 20:54:57 2018 -0400
13108: crunch_script uses safeapi
Also take workflow lock on update_pipeline_component to prevent update races.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/arvjob.py b/sdk/cwl/arvados_cwl/arvjob.py
index e222152..decd692 100644
--- a/sdk/cwl/arvados_cwl/arvjob.py
+++ b/sdk/cwl/arvados_cwl/arvjob.py
@@ -184,27 +184,28 @@ class ArvadosJob(object):
self.output_callback({}, "permanentFail")
def update_pipeline_component(self, record):
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- with Perf(metrics, "update_pipeline_component %s" % self.name):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
- uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(
- uuid=self.arvrunner.uuid,
+ with self.arvrunner.workflow_eval_lock:
+ if self.arvrunner.pipeline:
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ with Perf(metrics, "update_pipeline_component %s" % self.name):
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+ uuid=self.arvrunner.pipeline["uuid"],
body={
- "components": components
+ "components": self.arvrunner.pipeline["components"]
}).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
+ if self.arvrunner.uuid:
+ try:
+ job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+ if job:
+ components = job["components"]
+ components[self.name] = record["uuid"]
+ self.arvrunner.api.jobs().update(
+ uuid=self.arvrunner.uuid,
+ body={
+ "components": components
+ }).execute(num_retries=self.arvrunner.num_retries)
+ except Exception as e:
+ logger.info("Error adding to components: %s", e)
def done(self, record):
try:
diff --git a/sdk/cwl/arvados_cwl/crunch_script.py b/sdk/cwl/arvados_cwl/crunch_script.py
index aaeffea..bf940ec 100644
--- a/sdk/cwl/arvados_cwl/crunch_script.py
+++ b/sdk/cwl/arvados_cwl/crunch_script.py
@@ -97,7 +97,8 @@ def run():
debug = job_order_object["arv:debug"]
del job_order_object["arv:debug"]
- runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+ runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+ api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
output_name=output_name, output_tags=output_tags)
make_fs_access = functools.partial(CollectionFsAccess,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index 053c995..bf0eb08 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -400,5 +400,4 @@ class Runner(object):
else:
self.arvrunner.output_callback(outputs, processStatus)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
commit e4c5f98f696c354638bbba22ee4a1db20a52837c
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Apr 12 16:22:52 2018 -0400
13108: Don't try to install signal handler in background thread
Fix Docker image upload failure due to arv-put trying to install a
signal handler somewhere other than the main thread.
Also install toplevel signal handler to convert SIGTERM to
KeyboardInterrupt.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index c2f43fe..2fa6da7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -19,6 +19,8 @@ from functools import partial
import pkg_resources # part of setuptools
import Queue
import time
+import signal
+import thread
from cwltool.errors import WorkflowException
import cwltool.main
@@ -564,7 +566,7 @@ class ArvCwlRunner(object):
raise
except:
if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
+ logger.error("Interrupted, workflow will be cancelled")
else:
logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
@@ -761,12 +763,16 @@ def add_arv_hints():
"http://arvados.org/cwl#ReuseRequirement"
])
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+ install_sig_handlers=True):
parser = arg_parser()
job_order_object = None
arvargs = parser.parse_args(args)
+ if install_sig_handlers:
+ signal.signal(signal.SIGTERM, lambda x, y: thread.interrupt_main())
+
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py
index 5483ccb..68573aa 100644
--- a/sdk/cwl/arvados_cwl/arvdocker.py
+++ b/sdk/cwl/arvados_cwl/arvdocker.py
@@ -57,7 +57,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
args.append(image_tag)
logger.info("Uploading Docker image %s:%s", image_name, image_tag)
try:
- arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+ arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False)
except SystemExit as e:
if e.code:
raise WorkflowException("keepdocker exited with code %s" % e.code)
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index ff7201a..16fefdb 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -345,7 +345,7 @@ def _uuid2pdh(api, uuid):
select=['portable_data_hash'],
).execute()['items'][0]['portable_data_hash']
-def main(arguments=None, stdout=sys.stdout):
+def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True):
args = arg_parser.parse_args(arguments)
api = arvados.api('v1')
@@ -490,7 +490,8 @@ def main(arguments=None, stdout=sys.stdout):
put_args += ['--name', collection_name]
coll_uuid = arv_put.main(
- put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
+ put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
+ install_sig_handlers=install_sig_handlers).strip()
# Read the image metadata and make Arvados links from it.
image_file.seek(0)
diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py
index 5dde8e5..af8e243 100644
--- a/sdk/python/arvados/commands/put.py
+++ b/sdk/python/arvados/commands/put.py
@@ -993,7 +993,8 @@ def desired_project_uuid(api_client, project_uuid, num_retries):
raise ValueError("Not a valid project UUID: {}".format(project_uuid))
return query.execute(num_retries=num_retries)['uuid']
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+ install_sig_handlers=True):
global api_client
args = parse_arguments(arguments)
@@ -1014,8 +1015,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
- orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
- for sigcode in CAUGHT_SIGNALS}
+ orig_signal_handlers = {}
+ if install_sig_handlers:
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+ for sigcode in CAUGHT_SIGNALS}
# Determine the name to use
if args.name:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list