[ARVADOS] updated: 1.1.3-347-g138fef8
Git user
git at public.curoverse.com
Tue Apr 10 14:19:47 EDT 2018
Summary of changes:
sdk/cwl/arvados_cwl/__init__.py | 40 ++++++++++++-------------
sdk/cwl/arvados_cwl/task_queue.py | 4 +--
sdk/cwl/tests/test_tq.py | 50 +++++++++++++++++++++++++++++++
sdk/python/arvados/commands/keepdocker.py | 7 ++---
4 files changed, 74 insertions(+), 27 deletions(-)
create mode 100644 sdk/cwl/tests/test_tq.py
via 138fef8ee97f3cbd335434ad6acd26771fd0b762 (commit)
from fa645932e008bc03bd4906b4e4b795f22ed78fd3 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 138fef8ee97f3cbd335434ad6acd26771fd0b762
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Apr 10 14:19:05 2018 -0400
13108: Add test for taskqueue
Also tighten up code in a few places.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7affade..c2f43fe 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -66,7 +66,8 @@ class ArvCwlRunner(object):
"""
def __init__(self, api_client, work_api=None, keep_client=None,
- output_name=None, output_tags=None, num_retries=4):
+ output_name=None, output_tags=None, num_retries=4,
+ thread_count=4):
self.api = api_client
self.processes = {}
self.workflow_eval_lock = threading.Condition(threading.RLock())
@@ -85,7 +86,7 @@ class ArvCwlRunner(object):
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
- self.thread_count = 4
+ self.thread_count = thread_count
self.poll_interval = 12
if keep_client is not None:
@@ -165,21 +166,20 @@ class ArvCwlRunner(object):
return partial(self.wrapped_callback, cb)
def on_message(self, event):
- if "object_uuid" in event:
- if event["object_uuid"] in self.processes and event["event_type"] == "update":
- uuid = event["object_uuid"]
- if event["properties"]["new_attributes"]["state"] == "Running":
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- if j.running is False:
- j.running = True
- j.update_pipeline_component(event["properties"]["new_attributes"])
- logger.info("%s %s is Running", self.label(j), uuid)
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- with self.workflow_eval_lock:
- j = self.processes[uuid]
- self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
- logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
+ if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ if j.running is False:
+ j.running = True
+ j.update_pipeline_component(event["properties"]["new_attributes"])
+ logger.info("%s %s is Running", self.label(j), uuid)
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -386,7 +386,6 @@ class ArvCwlRunner(object):
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
self.secret_store = kwargs.get("secret_store")
- self.thread_count = kwargs.get("thread_count", 4)
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
@@ -551,7 +550,7 @@ class ArvCwlRunner(object):
if (self.task_queue.in_flight + len(self.processes)) > 0:
self.workflow_eval_lock.wait(3)
else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
+ logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
loopperf.__enter__()
loopperf.__exit__()
@@ -794,7 +793,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags)
+ output_tags=arvargs.output_tags,
+ thread_count=arvargs.thread_count)
except Exception as e:
logger.error(e)
return 1
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index 7efb08a..b9fd098 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -31,7 +31,7 @@ class TaskQueue(object):
try:
task()
except Exception as e:
- logger.exception("Unexpected error running task")
+ logger.exception("Unhandled exception running task")
self.error = e
with self.lock:
@@ -49,7 +49,7 @@ class TaskQueue(object):
try:
# Drain queue
while not self.task_queue.empty():
- self.task_queue.get()
+ self.task_queue.get(True, .1)
except Queue.Empty:
pass
diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py
new file mode 100644
index 0000000..2afbe0c
--- /dev/null
+++ b/sdk/cwl/tests/test_tq.py
@@ -0,0 +1,50 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+import threading
+
+from arvados_cwl.task_queue import TaskQueue
+
+def success_task():
+ pass
+
+def fail_task():
+ raise Exception("Testing error handling")
+
+class TestTaskQueue(unittest.TestCase):
+ def test_tq(self):
+ tq = TaskQueue(threading.Lock(), 2)
+
+ self.assertIsNone(tq.error)
+
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(success_task)
+
+ tq.join()
+
+ self.assertIsNone(tq.error)
+
+
+ def test_tq_error(self):
+ tq = TaskQueue(threading.Lock(), 2)
+
+ self.assertIsNone(tq.error)
+
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(fail_task)
+ tq.add(success_task)
+
+ tq.join()
+
+ self.assertIsNotNone(tq.error)
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 6097b5d..ff7201a 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -422,11 +422,8 @@ def main(arguments=None, stdout=sys.stdout):
# Check if this image is already in Arvados.
# Project where everything should be owned
- if args.project_uuid:
- parent_project_uuid = args.project_uuid
- else:
- parent_project_uuid = api.users().current().execute(
- num_retries=args.retries)['uuid']
+ parent_project_uuid = args.project_uuid or api.users().current().execute(
+ num_retries=args.retries)['uuid']
# Find image hash tags
existing_links = _get_docker_links(
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list