[ARVADOS] updated: 1.1.3-347-g138fef8

Git user git at public.curoverse.com
Tue Apr 10 14:19:47 EDT 2018


Summary of changes:
 sdk/cwl/arvados_cwl/__init__.py           | 40 ++++++++++++-------------
 sdk/cwl/arvados_cwl/task_queue.py         |  4 +--
 sdk/cwl/tests/test_tq.py                  | 50 +++++++++++++++++++++++++++++++
 sdk/python/arvados/commands/keepdocker.py |  7 ++---
 4 files changed, 74 insertions(+), 27 deletions(-)
 create mode 100644 sdk/cwl/tests/test_tq.py

       via  138fef8ee97f3cbd335434ad6acd26771fd0b762 (commit)
      from  fa645932e008bc03bd4906b4e4b795f22ed78fd3 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 138fef8ee97f3cbd335434ad6acd26771fd0b762
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Apr 10 14:19:05 2018 -0400

    13108: Add test for taskqueue
    
    Also tighten up code in a few places.
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 7affade..c2f43fe 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -66,7 +66,8 @@ class ArvCwlRunner(object):
     """
 
     def __init__(self, api_client, work_api=None, keep_client=None,
-                 output_name=None, output_tags=None, num_retries=4):
+                 output_name=None, output_tags=None, num_retries=4,
+                 thread_count=4):
         self.api = api_client
         self.processes = {}
         self.workflow_eval_lock = threading.Condition(threading.RLock())
@@ -85,7 +86,7 @@ class ArvCwlRunner(object):
         self.intermediate_output_ttl = 0
         self.intermediate_output_collections = []
         self.trash_intermediate = False
-        self.thread_count = 4
+        self.thread_count = thread_count
         self.poll_interval = 12
 
         if keep_client is not None:
@@ -165,21 +166,20 @@ class ArvCwlRunner(object):
         return partial(self.wrapped_callback, cb)
 
     def on_message(self, event):
-        if "object_uuid" in event:
-            if event["object_uuid"] in self.processes and event["event_type"] == "update":
-                uuid = event["object_uuid"]
-                if event["properties"]["new_attributes"]["state"] == "Running":
-                    with self.workflow_eval_lock:
-                        j = self.processes[uuid]
-                        if j.running is False:
-                            j.running = True
-                            j.update_pipeline_component(event["properties"]["new_attributes"])
-                            logger.info("%s %s is Running", self.label(j), uuid)
-                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
-                    with self.workflow_eval_lock:
-                        j = self.processes[uuid]
-                    self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
-                    logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
+        if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+            uuid = event["object_uuid"]
+            if event["properties"]["new_attributes"]["state"] == "Running":
+                with self.workflow_eval_lock:
+                    j = self.processes[uuid]
+                    if j.running is False:
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                        logger.info("%s %s is Running", self.label(j), uuid)
+            elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+                with self.workflow_eval_lock:
+                    j = self.processes[uuid]
+                self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+                logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
 
     def label(self, obj):
         return "[%s %s]" % (self.work_api[0:-1], obj.name)
@@ -386,7 +386,6 @@ class ArvCwlRunner(object):
                                                                  collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
         self.secret_store = kwargs.get("secret_store")
-        self.thread_count = kwargs.get("thread_count", 4)
 
         self.trash_intermediate = kwargs["trash_intermediate"]
         if self.trash_intermediate and self.work_api != "containers":
@@ -551,7 +550,7 @@ class ArvCwlRunner(object):
                     if (self.task_queue.in_flight + len(self.processes)) > 0:
                         self.workflow_eval_lock.wait(3)
                     else:
-                        logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pendingjobs.")
+                        logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
                 loopperf.__enter__()
             loopperf.__exit__()
@@ -794,7 +793,8 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
                               num_retries=4, output_name=arvargs.output_name,
-                              output_tags=arvargs.output_tags)
+                              output_tags=arvargs.output_tags,
+                              thread_count=arvargs.thread_count)
     except Exception as e:
         logger.error(e)
         return 1
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index 7efb08a..b9fd098 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -31,7 +31,7 @@ class TaskQueue(object):
                 try:
                     task()
                 except Exception as e:
-                    logger.exception("Unexpected error running task")
+                    logger.exception("Unhandled exception running task")
                     self.error = e
 
                 with self.lock:
@@ -49,7 +49,7 @@ class TaskQueue(object):
         try:
             # Drain queue
             while not self.task_queue.empty():
-                self.task_queue.get()
+                self.task_queue.get(True, .1)
         except Queue.Empty:
             pass
 
diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py
new file mode 100644
index 0000000..2afbe0c
--- /dev/null
+++ b/sdk/cwl/tests/test_tq.py
@@ -0,0 +1,50 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+import threading
+
+from arvados_cwl.task_queue import TaskQueue
+
+def success_task():
+    pass
+
+def fail_task():
+    raise Exception("Testing error handling")
+
+class TestTaskQueue(unittest.TestCase):
+    def test_tq(self):
+        tq = TaskQueue(threading.Lock(), 2)
+
+        self.assertIsNone(tq.error)
+
+        tq.add(success_task)
+        tq.add(success_task)
+        tq.add(success_task)
+        tq.add(success_task)
+
+        tq.join()
+
+        self.assertIsNone(tq.error)
+
+
+    def test_tq_error(self):
+        tq = TaskQueue(threading.Lock(), 2)
+
+        self.assertIsNone(tq.error)
+
+        tq.add(success_task)
+        tq.add(success_task)
+        tq.add(fail_task)
+        tq.add(success_task)
+
+        tq.join()
+
+        self.assertIsNotNone(tq.error)
diff --git a/sdk/python/arvados/commands/keepdocker.py b/sdk/python/arvados/commands/keepdocker.py
index 6097b5d..ff7201a 100644
--- a/sdk/python/arvados/commands/keepdocker.py
+++ b/sdk/python/arvados/commands/keepdocker.py
@@ -422,11 +422,8 @@ def main(arguments=None, stdout=sys.stdout):
             # Check if this image is already in Arvados.
 
             # Project where everything should be owned
-            if args.project_uuid:
-                parent_project_uuid = args.project_uuid
-            else:
-                parent_project_uuid = api.users().current().execute(
-                    num_retries=args.retries)['uuid']
+            parent_project_uuid = args.project_uuid or api.users().current().execute(
+                num_retries=args.retries)['uuid']
 
             # Find image hash tags
             existing_links = _get_docker_links(

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list