[ARVADOS] created: 1.2.0-452-g8d8847e07
Git user
git at public.curoverse.com
Tue Nov 27 18:13:49 EST 2018
at 8d8847e070f588b5d85ac2d7123fd929b4d417cd (commit)
commit 8d8847e070f588b5d85ac2d7123fd929b4d417cd
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 27 18:13:30 2018 -0500
14510: Estimate collection cache size wip
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index caf954fcb..29f1582aa 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -29,7 +29,7 @@ from .arvjob import RunnerJob, RunnerTemplate
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool, validate_cluster_target
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from .task_queue import TaskQueue
@@ -37,7 +37,7 @@ from .context import ArvLoadingContext, ArvRuntimeContext
from ._version import __version__
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
from cwltool.command_line_tool import compute_checksums
logger = logging.getLogger('arvados.cwl-runner')
@@ -593,6 +593,17 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
raise Exception("--priority must be in the range 1..1000.")
+ visited = set()
+ estimated_size = [0]
+ def estimate_collection_cache(obj):
+ if obj.get("location", "").startswith("keep:"):
+ m = pdh_size.match(obj["location"][5:])
+ if m and m.group(1) not in visited:
+ visited.add(m.group(1))
+ estimated_size[0] += int(m.group(2))
+ visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
+ logger.info("Estimated size %s", estimated_size)
+
runnerjob = None
if runtimeContext.submit:
# Submit a runner job to run the workflow for us.
diff --git a/sdk/cwl/arvados_cwl/fsaccess.py b/sdk/cwl/arvados_cwl/fsaccess.py
index 598126812..c3713b21e 100644
--- a/sdk/cwl/arvados_cwl/fsaccess.py
+++ b/sdk/cwl/arvados_cwl/fsaccess.py
@@ -28,6 +28,8 @@ from schema_salad.ref_resolver import DefaultFetcher
logger = logging.getLogger('arvados.cwl-runner')
+pdh_size = re.compile(r'([0-9a-f]{32})\+(\d+)(\+\S+)*')
+
class CollectionCache(object):
def __init__(self, api_client, keep_client, num_retries,
cap=256*1024*1024,
@@ -41,20 +43,26 @@ class CollectionCache(object):
self.cap = cap
self.min_entries = min_entries
- def cap_cache(self):
- if self.total > self.cap:
- # ordered list iterates from oldest to newest
- for pdh, v in self.collections.items():
- if self.total < self.cap or len(self.collections) < self.min_entries:
- break
- # cut it loose
- logger.debug("Evicting collection reader %s from cache", pdh)
- del self.collections[pdh]
- self.total -= v[1]
+ def set_cap(self, cap):
+ self.cap = cap
+
+ def cap_cache(self, required):
+ # ordered dict iterates from oldest to newest
+ for pdh, v in self.collections.items():
+ available = self.cap - self.total
+ if available >= required or len(self.collections) < self.min_entries:
+ return
+ # cut it loose
+ logger.debug("Evicting collection reader %s from cache", pdh)
+ del self.collections[pdh]
+ self.total -= v[1]
def get(self, pdh):
with self.lock:
if pdh not in self.collections:
+ m = pdh_size.match(pdh)
+ if m:
+ self.cap_cache(int(m.group(2)) * 128)
logger.debug("Creating collection reader for %s", pdh)
cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
keep_client=self.keep_client,
@@ -62,7 +70,6 @@ class CollectionCache(object):
sz = len(cr.manifest_text()) * 128
self.collections[pdh] = (cr, sz)
self.total += sz
- self.cap_cache()
else:
cr, sz = self.collections[pdh]
# bump it to the back
commit 42cb6a6d7679c5dc90adc14da57bb5691930e0f0
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 27 17:26:07 2018 -0500
14510: Update tests/docs for --collection-cache-size
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/doc/user/cwl/cwl-extensions.html.textile.liquid b/doc/user/cwl/cwl-extensions.html.textile.liquid
index f2dd937d9..d62002237 100644
--- a/doc/user/cwl/cwl-extensions.html.textile.liquid
+++ b/doc/user/cwl/cwl-extensions.html.textile.liquid
@@ -138,7 +138,8 @@ table(table table-bordered table-condensed).
|_. Field |_. Type |_. Description |
|ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB|
|coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.|
-|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB|
+|keep_cache|int|Size of collection metadata cache for the workflow runner, in MiB. Default 256 MiB. Will be added on to the RAM request when determining node size to request.|
+
h2(#clustertarget). arv:ClusterTarget
Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request.
diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
index 902b1ffba..dce1bd4d0 100644
--- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
+++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml
@@ -233,6 +233,13 @@ $graph:
type: int?
doc: Minimum cores allocated to cwl-runner
jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+ keep_cache:
+ type: int?
+ doc: |
+ Size of collection metadata cache for the workflow runner, in
+ MiB. Default 256 MiB. Will be added on to the RAM request
+ when determining node size to request.
+ jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
- name: ClusterTarget
type: record
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 3589fad27..caf954fcb 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -95,6 +95,7 @@ class ArvCwlExecutor(object):
arvargs.output_name = None
arvargs.output_tags = None
arvargs.thread_count = 1
+ arvargs.collection_cache_size = None
self.api = api_client
self.processes = {}
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index 018172b59..1c233fac0 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -23,36 +23,36 @@ class TaskQueue(object):
t.start()
def task_queue_func(self):
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unhandled exception running task")
+ self.error = e
- while True:
- task = self.task_queue.get()
- if task is None:
- return
- try:
- task()
- except Exception as e:
- logger.exception("Unhandled exception running task")
- self.error = e
-
- with self.lock:
- self.in_flight -= 1
+ with self.lock:
+ self.in_flight -= 1
def add(self, task, unlock, check_done):
- with self.lock:
- if self.thread_count > 1:
+ if self.thread_count > 1:
+ with self.lock:
self.in_flight += 1
- else:
- task()
- return
+ else:
+ task()
+ return
while True:
try:
unlock.release()
+ if check_done.is_set():
+ return
self.task_queue.put(task, block=True, timeout=3)
return
except Queue.Full:
- if check_done.is_set():
- return
+ pass
finally:
unlock.acquire()
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index bf2791d72..a7a21e709 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -274,7 +274,7 @@ def stubs(func):
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': '999999999999999999999999999999d3+99',
@@ -283,7 +283,7 @@ def stubs(func):
'runtime_constraints': {
'API': True,
'vcpus': 1,
- 'ram': 1024*1024*1024
+ 'ram': (1024+256)*1024*1024
},
'use_existing': True,
'properties': {},
@@ -559,7 +559,8 @@ class TestSubmit(unittest.TestCase):
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--disable-reuse', '--debug', '--on-error=continue',
+ '--disable-reuse', "--collection-cache-size=256",
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
@@ -584,7 +585,7 @@ class TestSubmit(unittest.TestCase):
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--disable-reuse', '--debug', '--on-error=continue',
+ '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container["name"] = "submit_wf_no_reuse.cwl"
@@ -621,7 +622,8 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=stop',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--debug', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
@@ -647,7 +649,7 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse',
+ '--enable-reuse', "--collection-cache-size=256",
"--output-name="+output_name, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["output_name"] = output_name
@@ -673,7 +675,7 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', "--debug",
+ '--enable-reuse', "--collection-cache-size=256", "--debug",
"--storage-classes=foo", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -740,7 +742,8 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--on-error=continue',
"--intermediate-output-ttl=3600",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -765,7 +768,8 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--debug', '--on-error=continue',
"--trash-intermediate",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -792,7 +796,7 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse',
+ '--enable-reuse', "--collection-cache-size=256",
"--output-tags="+output_tags, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -814,7 +818,7 @@ class TestSubmit(unittest.TestCase):
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+ expect_container["runtime_constraints"]["ram"] = (2048+256)*1024*1024
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -877,13 +881,13 @@ class TestSubmit(unittest.TestCase):
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'API': True,
'vcpus': 1,
- 'ram': 1073741824
+ 'ram': 1342177280
},
'use_existing': True,
'properties': {},
@@ -999,13 +1003,13 @@ class TestSubmit(unittest.TestCase):
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'API': True,
'vcpus': 1,
- 'ram': 1073741824
+ 'ram': 1342177280
},
'use_existing': True,
'properties': {
@@ -1059,7 +1063,8 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
"--eval-timeout=20", "--thread-count=4",
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--on-error=continue',
'--project-uuid='+project_uuid,
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -1085,7 +1090,34 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=60.0', '--thread-count=4',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @stubs
+ def test_submit_container_collection_cache(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=60.0', '--thread-count=4',
+ '--enable-reuse', "--collection-cache-size=500",
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
@@ -1111,7 +1143,8 @@ class TestSubmit(unittest.TestCase):
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=20',
- '--enable-reuse', '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
@@ -1197,7 +1230,7 @@ class TestSubmit(unittest.TestCase):
expect_container["runtime_constraints"] = {
"API": True,
"vcpus": 2,
- "ram": 2000 * 2**20
+ "ram": (2000+512) * 2**20
}
expect_container["name"] = "submit_wf_runner_resources.cwl"
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
@@ -1211,6 +1244,11 @@ class TestSubmit(unittest.TestCase):
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
"arv": "http://arvados.org/cwl#",
}
+ expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
@@ -1280,6 +1318,7 @@ class TestSubmit(unittest.TestCase):
"--eval-timeout=20",
'--thread-count=4',
"--enable-reuse",
+ "--collection-cache-size=256",
'--debug',
"--on-error=continue",
"/var/lib/cwl/workflow.json#main",
@@ -1407,7 +1446,7 @@ class TestSubmit(unittest.TestCase):
"properties": {},
"runtime_constraints": {
"API": True,
- "ram": 1073741824,
+ "ram": 1342177280,
"vcpus": 1
},
"secret_mounts": {
diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py
index 2afbe0cff..a09489065 100644
--- a/sdk/cwl/tests/test_tq.py
+++ b/sdk/cwl/tests/test_tq.py
@@ -22,29 +22,37 @@ def fail_task():
class TestTaskQueue(unittest.TestCase):
def test_tq(self):
tq = TaskQueue(threading.Lock(), 2)
+ try:
+ self.assertIsNone(tq.error)
- self.assertIsNone(tq.error)
-
- tq.add(success_task)
- tq.add(success_task)
- tq.add(success_task)
- tq.add(success_task)
+ unlock = threading.Lock()
+ unlock.acquire()
+ check_done = threading.Event()
- tq.join()
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ finally:
+ tq.join()
self.assertIsNone(tq.error)
def test_tq_error(self):
tq = TaskQueue(threading.Lock(), 2)
-
- self.assertIsNone(tq.error)
-
- tq.add(success_task)
- tq.add(success_task)
- tq.add(fail_task)
- tq.add(success_task)
-
- tq.join()
+ try:
+ self.assertIsNone(tq.error)
+
+ unlock = threading.Lock()
+ unlock.acquire()
+ check_done = threading.Event()
+
+ tq.add(success_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ tq.add(fail_task, unlock, check_done)
+ tq.add(success_task, unlock, check_done)
+ finally:
+ tq.join()
self.assertIsNotNone(tq.error)
commit eb58fd945645f5a670c761f7046b10885941167e
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 27 15:53:07 2018 -0500
14510: Setting collection cache wip
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/apps/workbench/app/controllers/work_units_controller.rb b/apps/workbench/app/controllers/work_units_controller.rb
index 8527b4d48..767762c81 100644
--- a/apps/workbench/app/controllers/work_units_controller.rb
+++ b/apps/workbench/app/controllers/work_units_controller.rb
@@ -85,12 +85,6 @@ class WorkUnitsController < ApplicationController
attrs['state'] = "Uncommitted"
# required
- attrs['command'] = ["arvados-cwl-runner",
- "--local",
- "--api=containers",
- "--project-uuid=#{params['work_unit']['owner_uuid']}",
- "/var/lib/cwl/workflow.json#main",
- "/var/lib/cwl/cwl.input.json"]
attrs['container_image'] = "arvados/jobs"
attrs['cwd'] = "/var/spool/cwl"
attrs['output_path'] = "/var/spool/cwl"
@@ -102,6 +96,7 @@ class WorkUnitsController < ApplicationController
"API" => true
}
+ keep_cache = 256
input_defaults = {}
if wf_json
main = get_cwl_main(wf_json)
@@ -119,11 +114,22 @@ class WorkUnitsController < ApplicationController
if hint[:ramMin]
runtime_constraints["ram"] = hint[:ramMin] * 1024 * 1024
end
+ if hint[:keep_cache]
+ keep_cache = hint[:keep_cache]
+ end
end
end
end
end
+ attrs['command'] = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--project-uuid=#{params['work_unit']['owner_uuid']}",
+ "--collection-keep-cache=#{keep_cache}",
+ "/var/lib/cwl/workflow.json#main",
+ "/var/lib/cwl/cwl.input.json"]
+
# mounts
mounts = {
"/var/lib/cwl/cwl.input.json" => {
diff --git a/doc/user/cwl/cwl-extensions.html.textile.liquid b/doc/user/cwl/cwl-extensions.html.textile.liquid
index 7abc794e1..f2dd937d9 100644
--- a/doc/user/cwl/cwl-extensions.html.textile.liquid
+++ b/doc/user/cwl/cwl-extensions.html.textile.liquid
@@ -43,6 +43,7 @@ hints:
arv:WorkflowRunnerResources:
ramMin: 2048
coresMin: 2
+ keep_cache: 512
arv:ClusterTarget:
cluster_id: clsr1
project_uuid: clsr1-j7d0g-qxc4jcji7n4lafx
@@ -137,7 +138,7 @@ table(table table-bordered table-condensed).
|_. Field |_. Type |_. Description |
|ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB|
|coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.|
-
+|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB|
h2(#clustertarget). arv:ClusterTarget
Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request.
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index ce22219d7..225741f94 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -159,9 +159,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=None,
metavar="CLUSTER_ID")
- parser.add_argument("--collection-cache", type=int,
- default=256*1024*1024,
- help="Collection caches size.")
+ parser.add_argument("--collection-cache-size", type=int,
+ default=None,
+ help="Collection cache size (in MiB, default 256).")
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
index f1ae65fc0..4c49a449b 100644
--- a/sdk/cwl/arvados_cwl/arvcontainer.py
+++ b/sdk/cwl/arvados_cwl/arvcontainer.py
@@ -407,7 +407,7 @@ class RunnerContainer(Runner):
"secret_mounts": secret_mounts,
"runtime_constraints": {
"vcpus": math.ceil(self.submit_runner_cores),
- "ram": math.ceil(1024*1024 * self.submit_runner_ram),
+ "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
"API": True
},
"use_existing": self.enable_reuse,
@@ -441,6 +441,7 @@ class RunnerContainer(Runner):
# --eval-timeout is the timeout for javascript invocation
# --parallel-task-count is the number of threads to use for job submission
# --enable/disable-reuse sets desired job reuse
+ # --collection-cache-size sets aside memory to store collections
command = ["arvados-cwl-runner",
"--local",
"--api=containers",
@@ -448,7 +449,8 @@ class RunnerContainer(Runner):
"--disable-validate",
"--eval-timeout=%s" % self.arvrunner.eval_timeout,
"--thread-count=%s" % self.arvrunner.thread_count,
- "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse",
+ "--collection-cache-size=%s" % self.collection_cache_size]
if self.output_name:
command.append("--output-name=" + self.output_name)
diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py
index 7831e1cfd..8cfe22ad7 100644
--- a/sdk/cwl/arvados_cwl/context.py
+++ b/sdk/cwl/arvados_cwl/context.py
@@ -34,6 +34,7 @@ class ArvRuntimeContext(RuntimeContext):
self.submit_runner_cluster = None
self.cluster_target_id = 0
self.always_submit_runner = False
+ self.collection_cache_size = 256
super(ArvRuntimeContext, self).__init__(kwargs)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index ff8ff6ff8..3589fad27 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -122,8 +122,13 @@ class ArvCwlExecutor(object):
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+ if arvargs.collection_cache_size:
+ collection_cache_size = arvargs.collection_cache_size*1024*1024
+ else:
+ collection_cache_size = 256*1024*1024
+
self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
- cap=arvargs.collection_cache)
+ cap=collection_cache_size)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
@@ -607,7 +612,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
merged_map=merged_map,
priority=runtimeContext.priority,
- secret_store=self.secret_store)
+ secret_store=self.secret_store,
+ collection_cache_size=runtimeContext.collection_cache_size)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
self.output_name,
diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py
index a846f2b00..c1a98e745 100644
--- a/sdk/cwl/arvados_cwl/runner.py
+++ b/sdk/cwl/arvados_cwl/runner.py
@@ -364,7 +364,7 @@ class Runner(object):
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
- priority=None, secret_store=None):
+ priority=None, secret_store=None, collection_cache_size=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
@@ -389,6 +389,7 @@ class Runner(object):
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
+ self.collection_cache_size = 256
runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
if runner_resource_req:
@@ -396,11 +397,17 @@ class Runner(object):
self.submit_runner_cores = runner_resource_req["coresMin"]
if runner_resource_req.get("ramMin"):
self.submit_runner_ram = runner_resource_req["ramMin"]
+ if runner_resource_req.get("keep_cache"):
+ self.collection_cache_size = runner_resource_req["keep_cache"]
if submit_runner_ram:
# Command line / initializer overrides default and/or spec from workflow
self.submit_runner_ram = submit_runner_ram
+ if collection_cache_size:
+ # Command line / initializer overrides default and/or spec from workflow
+ self.collection_cache_size = collection_cache_size
+
if self.submit_runner_ram <= 0:
raise Exception("Value of submit-runner-ram must be greater than zero")
diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py
index 1b892a983..bf2791d72 100644
--- a/sdk/cwl/tests/test_submit.py
+++ b/sdk/cwl/tests/test_submit.py
@@ -1204,7 +1204,8 @@ class TestSubmit(unittest.TestCase):
{
"class": "http://arvados.org/cwl#WorkflowRunnerResources",
"coresMin": 2,
- "ramMin": 2000
+ "ramMin": 2000,
+ "keep_cache": 512
}
]
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
diff --git a/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl b/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
index 9e2712194..814cd07ab 100644
--- a/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
+++ b/sdk/cwl/tests/wf/submit_wf_runner_resources.cwl
@@ -15,6 +15,7 @@ hints:
arv:WorkflowRunnerResources:
ramMin: 2000
coresMin: 2
+ keep_cache: 512
inputs:
- id: x
type: File
commit 00bb1461d14cfc02e6ec2c74d622b7b6b716e775
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Tue Nov 20 14:33:53 2018 -0500
14510: Perfomance fixes
* Add --collection-cache to enable users to workaround cache thrashing
* Limit task queue size. Release workflow lock when attempting to
enqueue a task (which now may block).
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py
index 9b814f534..ce22219d7 100644
--- a/sdk/cwl/arvados_cwl/__init__.py
+++ b/sdk/cwl/arvados_cwl/__init__.py
@@ -159,6 +159,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
default=None,
metavar="CLUSTER_ID")
+ parser.add_argument("--collection-cache", type=int,
+ default=256*1024*1024,
+ help="Collection caches size.")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py
index 6cac70926..ff8ff6ff8 100644
--- a/sdk/cwl/arvados_cwl/executor.py
+++ b/sdk/cwl/arvados_cwl/executor.py
@@ -122,7 +122,8 @@ class ArvCwlExecutor(object):
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
- self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
+ cap=arvargs.collection_cache)
self.fetcher_constructor = partial(CollectionFetcher,
api_client=self.api,
@@ -206,7 +207,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
def start_run(self, runnable, runtimeContext):
- self.task_queue.add(partial(runnable.run, runtimeContext))
+ self.task_queue.add(partial(runnable.run, runtimeContext),
+ self.workflow_eval_lock, self.stop_polling)
def process_submitted(self, container):
with self.workflow_eval_lock:
@@ -216,7 +218,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
with self.workflow_eval_lock:
j = self.processes[uuid]
logger.info("%s %s is %s", self.label(j), uuid, record["state"])
- self.task_queue.add(partial(j.done, record))
+ self.task_queue.add(partial(j.done, record),
+ self.workflow_eval_lock, self.stop_polling)
del self.processes[uuid]
def runtime_status_update(self, kind, message, detail=None):
@@ -676,6 +679,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
else:
logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
+
+ if self.stop_polling.is_set():
+ break
+
loopperf.__enter__()
loopperf.__exit__()
diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py
index b9fd09807..018172b59 100644
--- a/sdk/cwl/arvados_cwl/task_queue.py
+++ b/sdk/cwl/arvados_cwl/task_queue.py
@@ -11,7 +11,7 @@ logger = logging.getLogger('arvados.cwl-runner')
class TaskQueue(object):
def __init__(self, lock, thread_count):
self.thread_count = thread_count
- self.task_queue = Queue.Queue()
+ self.task_queue = Queue.Queue(maxsize=self.thread_count)
self.task_queue_threads = []
self.lock = lock
self.in_flight = 0
@@ -37,13 +37,25 @@ class TaskQueue(object):
with self.lock:
self.in_flight -= 1
- def add(self, task):
+ def add(self, task, unlock, check_done):
with self.lock:
if self.thread_count > 1:
self.in_flight += 1
- self.task_queue.put(task)
else:
task()
+ return
+
+ while True:
+ try:
+ unlock.release()
+ self.task_queue.put(task, block=True, timeout=3)
+ return
+ except Queue.Full:
+ if check_done.is_set():
+ return
+ finally:
+ unlock.acquire()
+
def drain(self):
try:
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list