[ARVADOS] created: 1.2.0-452-g8d8847e07

Git user git at public.curoverse.com
Tue Nov 27 18:13:49 EST 2018


        at  8d8847e070f588b5d85ac2d7123fd929b4d417cd (commit)


commit 8d8847e070f588b5d85ac2d7123fd929b4d417cd
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 27 18:13:30 2018 -0500

    14510: Estimate collection cache size wip
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index caf954fcb..29f1582aa 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -29,7 +29,7 @@ from .arvjob import RunnerJob, RunnerTemplate
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool, validate_cluster_target
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from .task_queue import TaskQueue
@@ -37,7 +37,7 @@ from .context import ArvLoadingContext, ArvRuntimeContext
 from ._version import __version__
 
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
 from cwltool.command_line_tool import compute_checksums
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -593,6 +593,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
             raise Exception("--priority must be in the range 1..1000.")
 
+        visited = set()
+        estimated_size = [0]
+        def estimate_collection_cache(obj):
+            if obj.get("location", "").startswith("keep:"):
+                m = pdh_size.match(obj["location"][5:])
+                if m and m.group(1) not in visited:
+                    visited.add(m.group(1))
+                    estimated_size[0] += int(m.group(2))
+        visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+        logger.info("Estimated size %s", estimated_size)
+
         runnerjob = None
         if runtimeContext.submit:
             # Submit a runner job to run the workflow for us.
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 598126812..c3713b21e 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -28,6 +28,8 @@ from schema_salad.ref_resolver import DefaultFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
+
 class CollectionCache(object):
     def __init__(self, api_client, keep_client, num_retries,
                  cap=256*1024*1024,
@@ -41,20 +43,26 @@ class CollectionCache(object):
         self.cap = cap
         self.min_entries = min_entries
 
-    def cap_cache(self):
-        if self.total > self.cap:
-            # ordered list iterates from oldest to newest
-            for pdh, v in self.collections.items():
-                if self.total < self.cap or len(self.collections) < self.min_entries:
-                    break
-                # cut it loose
-                logger.debug("Evicting collection reader %s from cache", pdh)
-                del self.collections[pdh]
-                self.total -= v[1]
+    def set_cap(self, cap):
+        self.cap = cap
+
+    def cap_cache(self, required):
+        # ordered dict iterates from oldest to newest
+        for pdh, v in self.collections.items():
+            available = self.cap - self.total
+            if available >= required or len(self.collections) < self.min_entries:
+                return
+            # cut it loose
+            logger.debug("Evicting collection reader %s from cache", pdh)
+            del self.collections[pdh]
+            self.total -= v[1]
 
     def get(self, pdh):
         with self.lock:
             if pdh not in self.collections:
+                m = pdh_size.match(pdh)
+                if m:
+                    self.cap_cache(int(m.group(2)) * 128)
                 logger.debug("Creating collection reader for %s", pdh)
                 cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
                                                          keep_client=self.keep_client,
@@ -62,7 +70,6 @@ class CollectionCache(object):
                 sz = len(cr.manifest_text()) * 128
                 self.collections[pdh] = (cr, sz)
                 self.total += sz
-                self.cap_cache()
             else:
                 cr, sz = self.collections[pdh]
                 # bump it to the back

commit 42cb6a6d7679c5dc90adc14da57bb5691930e0f0
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 27 17:26:07 2018 -0500

    14510: Update tests/docs for --collection-cache-size
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/doc/user/cwl/cwl-extensions.html.textile.liquid b/doc/user/cwl/cwl-extensions.html.textile.liquid
index f2dd937d9..d62002237 100644
--- a/doc/user/cwl/cwl-extensions.html.textile.liquid
+++ b/doc/user/cwl/cwl-extensions.html.textile.liquid
@@ -138,7 +138,8 @@ table(table table-bordered table-condensed).
 |_. Field |_. Type |_. Description |
 |ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB|
 |coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.|
-|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB|
+|keep_cache|int|Size of collection metadata cache for the workflow runner, in MiB.  Default 256 MiB.  Will be added on to the RAM request when determining node size to request.|
+
 h2(#clustertarget). arv:ClusterTarget
 
 Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request.
diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
index 902b1ffba..dce1bd4d0 100644
--- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
+++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
@@ -233,6 +233,13 @@ $graph:
       type: int?
       doc: Minimum cores allocated to cwl-runner
       jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+    keep_cache:
+      type: int?
+      doc: |
+        Size of collection metadata cache for the workflow runner, in
+        MiB.  Default 256 MiB.  Will be added on to the RAM request
+        when determining node size to request.
+      jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
 
 - name: ClusterTarget
   type: record
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 3589fad27..caf954fcb 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -95,6 +95,7 @@ class ArvCwlExecutor(object):
             arvargs.output_name = None
             arvargs.output_tags = None
             arvargs.thread_count = 1
+            arvargs.collection_cache_size = None
 
         self.api = api_client
         self.processes = {}
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index 018172b59..1c233fac0 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -23,36 +23,36 @@ class TaskQueue(object):
             t.start()
 
     def task_queue_func(self):
+        while True:
+            task = self.task_queue.get()
+            if task is None:
+                return
+            try:
+                task()
+            except Exception as e:
+                logger.exception("Unhandled exception running task")
+                self.error = e
 
-            while True:
-                task = self.task_queue.get()
-                if task is None:
-                    return
-                try:
-                    task()
-                except Exception as e:
-                    logger.exception("Unhandled exception running task")
-                    self.error = e
-
-                with self.lock:
-                    self.in_flight -= 1
+            with self.lock:
+                self.in_flight -= 1
 
     def add(self, task, unlock, check_done):
-        with self.lock:
-            if self.thread_count > 1:
+        if self.thread_count > 1:
+            with self.lock:
                 self.in_flight += 1
-            else:
-                task()
-                return
+        else:
+            task()
+            return
 
         while True:
             try:
                 unlock.release()
+                if check_done.is_set():
+                    return
                 self.task_queue.put(task, block=True, timeout=3)
                 return
             except Queue.Full:
-                if check_done.is_set():
-                    return
+                pass
             finally:
                 unlock.acquire()
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index bf2791d72..a7a21e709 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -274,7 +274,7 @@ def stubs(func):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': '999999999999999999999999999999d3+99',
@@ -283,7 +283,7 @@ def stubs(func):
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1024*1024*1024
+                'ram': (1024+256)*1024*1024
             },
             'use_existing': True,
             'properties': {},
@@ -559,7 +559,8 @@ class TestSubmit(unittest.TestCase):
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
             '--eval-timeout=20', '--thread-count=4',
-            '--disable-reuse', '--debug', '--on-error=continue',
+            '--disable-reuse', "--collection-cache-size=256",
+            '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
 
@@ -584,7 +585,7 @@ class TestSubmit(unittest.TestCase):
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
             '--eval-timeout=20', '--thread-count=4',
-            '--disable-reuse', '--debug', '--on-error=continue',
+            '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
         expect_container["name"] = "submit_wf_no_reuse.cwl"
@@ -621,7 +622,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=stop',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=stop',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -647,7 +649,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse',
+                                       '--enable-reuse', "--collection-cache-size=256",
                                        "--output-name="+output_name, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["output_name"] = output_name
@@ -673,7 +675,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', "--debug",
+                                       '--enable-reuse', "--collection-cache-size=256", "--debug",
                                        "--storage-classes=foo", '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -740,7 +742,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256", '--debug',
+                                       '--on-error=continue',
                                        "--intermediate-output-ttl=3600",
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -765,7 +768,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
                                        "--trash-intermediate",
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -792,7 +796,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse',
+                                       '--enable-reuse', "--collection-cache-size=256",
                                        "--output-tags="+output_tags, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -814,7 +818,7 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+        expect_container["runtime_constraints"]["ram"] = (2048+256)*1024*1024
 
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -877,13 +881,13 @@ class TestSubmit(unittest.TestCase):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1073741824
+                'ram': 1342177280
             },
             'use_existing': True,
             'properties': {},
@@ -999,13 +1003,13 @@ class TestSubmit(unittest.TestCase):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1073741824
+                'ram': 1342177280
             },
             'use_existing': True,
             'properties': {
@@ -1059,7 +1063,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        "--eval-timeout=20", "--thread-count=4",
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256", '--debug',
+                                       '--on-error=continue',
                                        '--project-uuid='+project_uuid,
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -1085,7 +1090,34 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=60.0', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_collection_cache(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+                                       '--no-log-timestamps', '--disable-validate',
+                                       '--eval-timeout=60.0', '--thread-count=4',
+                                       '--enable-reuse', "--collection-cache-size=500",
+                                       '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -1111,7 +1143,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=20',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -1197,7 +1230,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["runtime_constraints"] = {
             "API": True,
             "vcpus": 2,
-            "ram": 2000 * 2**20
+            "ram": (2000+512) * 2**20
         }
         expect_container["name"] = "submit_wf_runner_resources.cwl"
         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
@@ -1211,6 +1244,11 @@ class TestSubmit(unittest.TestCase):
         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
             "arv": "http://arvados.org/cwl#",
         }
+        expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
+                        '--no-log-timestamps', '--disable-validate',
+                        '--eval-timeout=20', '--thread-count=4',
+                        '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
+                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -1280,6 +1318,7 @@ class TestSubmit(unittest.TestCase):
                 "--eval-timeout=20",
                 '--thread-count=4',
                 "--enable-reuse",
+                "--collection-cache-size=256",
                 '--debug',
                 "--on-error=continue",
                 "/var/lib/cwl/workflow.json#main",
@@ -1407,7 +1446,7 @@ class TestSubmit(unittest.TestCase):
             "properties": {},
             "runtime_constraints": {
                 "API": True,
-                "ram": 1073741824,
+                "ram": 1342177280,
                 "vcpus": 1
             },
             "secret_mounts": {
diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py
index 2afbe0cff..a09489065 100644
--- a/sdk/cwl/tests/test_tq.py
+++ b/sdk/cwl/tests/test_tq.py
@@ -22,29 +22,37 @@ def fail_task():
 class TestTaskQueue(unittest.TestCase):
     def test_tq(self):
         tq = TaskQueue(threading.Lock(), 2)
+        try:
+            self.assertIsNone(tq.error)
 
-        self.assertIsNone(tq.error)
-
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(success_task)
+            unlock = threading.Lock()
+            unlock.acquire()
+            check_done = threading.Event()
 
-        tq.join()
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+        finally:
+            tq.join()
 
         self.assertIsNone(tq.error)
 
 
     def test_tq_error(self):
         tq = TaskQueue(threading.Lock(), 2)
-
-        self.assertIsNone(tq.error)
-
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(fail_task)
-        tq.add(success_task)
-
-        tq.join()
+        try:
+            self.assertIsNone(tq.error)
+
+            unlock = threading.Lock()
+            unlock.acquire()
+            check_done = threading.Event()
+
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(fail_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+        finally:
+            tq.join()
 
         self.assertIsNotNone(tq.error)

commit eb58fd945645f5a670c761f7046b10885941167e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 27 15:53:07 2018 -0500

    14510: Setting collection cache wip
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/apps/workbench/app/controllers/work_units_controller.rb b/apps/workbench/app/controllers/work_units_controller.rb
index 8527b4d48..767762c81 100644
--- a/apps/workbench/app/controllers/work_units_controller.rb
+++ b/apps/workbench/app/controllers/work_units_controller.rb
@@ -85,12 +85,6 @@ class WorkUnitsController < ApplicationController
       attrs['state'] = "Uncommitted"
 
       # required
-      attrs['command'] = ["arvados-cwl-runner",
-                          "--local",
-                          "--api=containers",
-                          "--project-uuid=#{params['work_unit']['owner_uuid']}",
-                          "/var/lib/cwl/workflow.json#main",
-                          "/var/lib/cwl/cwl.input.json"]
       attrs['container_image'] = "arvados/jobs"
       attrs['cwd'] = "/var/spool/cwl"
       attrs['output_path'] = "/var/spool/cwl"
@@ -102,6 +96,7 @@ class WorkUnitsController < ApplicationController
         "API" => true
       }
 
+      keep_cache = 256
       input_defaults = {}
       if wf_json
         main = get_cwl_main(wf_json)
@@ -119,11 +114,22 @@ class WorkUnitsController < ApplicationController
               if hint[:ramMin]
                 runtime_constraints["ram"] = hint[:ramMin] * 1024 * 1024
               end
+              if hint[:keep_cache]
+                keep_cache = hint[:keep_cache]
+              end
             end
           end
         end
       end
 
+      attrs['command'] = ["arvados-cwl-runner",
+                          "--local",
+                          "--api=containers",
+                          "--project-uuid=#{params['work_unit']['owner_uuid']}",
+                          "--collection-keep-cache=#{keep_cache}",
+                          "/var/lib/cwl/workflow.json#main",
+                          "/var/lib/cwl/cwl.input.json"]
+
       # mounts
       mounts = {
         "/var/lib/cwl/cwl.input.json" => {
diff --git a/doc/user/cwl/cwl-extensions.html.textile.liquid b/doc/user/cwl/cwl-extensions.html.textile.liquid
index 7abc794e1..f2dd937d9 100644
--- a/doc/user/cwl/cwl-extensions.html.textile.liquid
+++ b/doc/user/cwl/cwl-extensions.html.textile.liquid
@@ -43,6 +43,7 @@ hints:
   arv:WorkflowRunnerResources:
     ramMin: 2048
     coresMin: 2
+    keep_cache: 512
   arv:ClusterTarget:
     cluster_id: clsr1
     project_uuid: clsr1-j7d0g-qxc4jcji7n4lafx
@@ -137,7 +138,7 @@ table(table table-bordered table-condensed).
 |_. Field |_. Type |_. Description |
 |ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB|
 |coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.|
-
+|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB|
 h2(#clustertarget). arv:ClusterTarget
 
 Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index ce22219d7..225741f94 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -159,9 +159,9 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                          default=None,
                          metavar="CLUSTER_ID")
 
-    parser.add_argument("--collection-cache", type=int,
-                        default=256*1024*1024,
-                        help="Collection caches size.")
+    parser.add_argument("--collection-cache-size", type=int,
+                        default=None,
+                        help="Collection cache size (in MiB, default 256).")
 
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index f1ae65fc0..4c49a449b 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -407,7 +407,7 @@ class RunnerContainer(Runner):
             "secret_mounts": secret_mounts,
             "runtime_constraints": {
                 "vcpus": math.ceil(self.submit_runner_cores),
-                "ram": math.ceil(1024*1024 * self.submit_runner_ram),
+                "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
             "use_existing": self.enable_reuse,
@@ -441,6 +441,7 @@ class RunnerContainer(Runner):
         # --eval-timeout is the timeout for javascript invocation
         # --parallel-task-count is the number of threads to use for job submission
         # --enable/disable-reuse sets desired job reuse
+        # --collection-cache-size sets aside memory to store collections
         command = ["arvados-cwl-runner",
                    "--local",
                    "--api=containers",
@@ -448,7 +449,8 @@ class RunnerContainer(Runner):
                    "--disable-validate",
                    "--eval-timeout=%s" % self.arvrunner.eval_timeout,
                    "--thread-count=%s" % self.arvrunner.thread_count,
-                   "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+                   "--enable-reuse" if self.enable_reuse else "--disable-reuse",
+                   "--collection-cache-size=%s" % self.collection_cache_size]
 
         if self.output_name:
             command.append("--output-name=" + self.output_name)
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 7831e1cfd..8cfe22ad7 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -34,6 +34,7 @@ class ArvRuntimeContext(RuntimeContext):
         self.submit_runner_cluster = None
         self.cluster_target_id = 0
         self.always_submit_runner = False
+        self.collection_cache_size = 256
 
         super(ArvRuntimeContext, self).__init__(kwargs)
 
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ff8ff6ff8..3589fad27 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -122,8 +122,13 @@ class ArvCwlExecutor(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
+        if arvargs.collection_cache_size:
+            collection_cache_size = arvargs.collection_cache_size*1024*1024
+        else:
+            collection_cache_size = 256*1024*1024
+
         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
-                                                cap=arvargs.collection_cache)
+                                                cap=collection_cache_size)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -607,7 +612,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
                                                 merged_map=merged_map,
                                                 priority=runtimeContext.priority,
-                                                secret_store=self.secret_store)
+                                                secret_store=self.secret_store,
+                                                collection_cache_size=runtimeContext.collection_cache_size)
             elif self.work_api == "jobs":
                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
                                       self.output_name,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index a846f2b00..c1a98e745 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -364,7 +364,7 @@ class Runner(object):
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
-                 priority=None, secret_store=None):
+                 priority=None, secret_store=None, collection_cache_size=None):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -389,6 +389,7 @@ class Runner(object):
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
+        self.collection_cache_size = 256
 
         runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
         if runner_resource_req:
@@ -396,11 +397,17 @@ class Runner(object):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
             if runner_resource_req.get("ramMin"):
                 self.submit_runner_ram = runner_resource_req["ramMin"]
+            if runner_resource_req.get("keep_cache"):
+                self.collection_cache_size = runner_resource_req["keep_cache"]
 
         if submit_runner_ram:
             # Command line / initializer overrides default and/or spec from workflow
             self.submit_runner_ram = submit_runner_ram
 
+        if collection_cache_size:
+            # Command line / initializer overrides default and/or spec from workflow
+            self.collection_cache_size = collection_cache_size
+
         if self.submit_runner_ram <= 0:
             raise Exception("Value of submit-runner-ram must be greater than zero")
 
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 1b892a983..bf2791d72 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1204,7 +1204,8 @@ class TestSubmit(unittest.TestCase):
             {
                 "class": "http://arvados.org/cwl#WorkflowRunnerResources",
                 "coresMin": 2,
-                "ramMin": 2000
+                "ramMin": 2000,
+                "keep_cache": 512
             }
         ]
         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
diff --git a/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl b/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
index 9e2712194..814cd07ab 100644
--- a/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
+++ b/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
@@ -15,6 +15,7 @@ hints:
   arv:WorkflowRunnerResources:
     ramMin: 2000
     coresMin: 2
+    keep_cache: 512
 inputs:
   - id: x
     type: File

commit 00bb1461d14cfc02e6ec2c74d622b7b6b716e775
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date:   Tue Nov 20 14:33:53 2018 -0500

    14510: Perfomance fixes
    
    * Add --collection-cache to enable users to workaround cache thrashing
    
    * Limit task queue size.  Release workflow lock when attempting to
      enqueue a task (which now may block).
    
    Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>

diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 9b814f534..ce22219d7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -159,6 +159,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                          default=None,
                          metavar="CLUSTER_ID")
 
+    parser.add_argument("--collection-cache", type=int,
+                        default=256*1024*1024,
+                        help="Collection caches size.")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 6cac70926..ff8ff6ff8 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -122,7 +122,8 @@ class ArvCwlExecutor(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+                                                cap=arvargs.collection_cache)
 
         self.fetcher_constructor = partial(CollectionFetcher,
                                            api_client=self.api,
@@ -206,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
 
     def start_run(self, runnable, runtimeContext):
-        self.task_queue.add(partial(runnable.run, runtimeContext))
+        self.task_queue.add(partial(runnable.run, runtimeContext),
+                            self.workflow_eval_lock, self.stop_polling)
 
     def process_submitted(self, container):
         with self.workflow_eval_lock:
@@ -216,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         with self.workflow_eval_lock:
             j = self.processes[uuid]
             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
-            self.task_queue.add(partial(j.done, record))
+            self.task_queue.add(partial(j.done, record),
+                                self.workflow_eval_lock, self.stop_polling)
             del self.processes[uuid]
 
     def runtime_status_update(self, kind, message, detail=None):
@@ -676,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     else:
                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
                         break
+
+                if self.stop_polling.is_set():
+                    break
+
                 loopperf.__enter__()
             loopperf.__exit__()
 
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index b9fd09807..018172b59 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
 class TaskQueue(object):
     def __init__(self, lock, thread_count):
         self.thread_count = thread_count
-        self.task_queue = Queue.Queue()
+        self.task_queue = Queue.Queue(maxsize=self.thread_count)
         self.task_queue_threads = []
         self.lock = lock
         self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
                 with self.lock:
                     self.in_flight -= 1
 
-    def add(self, task):
+    def add(self, task, unlock, check_done):
         with self.lock:
             if self.thread_count > 1:
                 self.in_flight += 1
-                self.task_queue.put(task)
             else:
                 task()
+                return
+
+        while True:
+            try:
+                unlock.release()
+                self.task_queue.put(task, block=True, timeout=3)
+                return
+            except Queue.Full:
+                if check_done.is_set():
+                    return
+            finally:
+                unlock.acquire()
+
 
     def drain(self):
         try:

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list