Skip to content

Commit 84f1e81

Browse files
sachin-arora-cashfreearorasachin9
authored andcommitted
Added unit test for the additional config and addressed comments
1 parent 6ad3bc4 commit 84f1e81

File tree

4 files changed

+19
-2
lines changed

4 files changed

+19
-2
lines changed

chart/values.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2607,7 +2607,6 @@ config:
26072607
celery:
26082608
flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}'
26092609
worker_concurrency: 16
2610-
extra_celery_config: '{}'
26112610
scheduler:
26122611
standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}'
26132612
# statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0

providers/src/airflow/providers/celery/executors/default_celery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def _broker_supports_visibility_timeout(url):
6969
log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.")
7070
result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}'
7171

72-
extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={}) if conf.has_option("celery", "extra_celery_config") else {}
72+
extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={})
7373

7474
DEFAULT_CELERY_CONFIG = {
7575
"accept_content": ["json"],

providers/src/airflow/providers/celery/provider.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,13 @@ config:
329329
type: string
330330
example: ~
331331
default: "False"
332+
extra_celery_config:
333+
description: |
334+
Extra celery configs to include in the celery worker
335+
version_added: ~
336+
type: string
337+
example: ~
338+
default: "{}"
332339
celery_broker_transport_options:
333340
description: |
334341
This section is for specifying options which can be passed to the

providers/tests/celery/executors/test_celery_executor.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,3 +399,14 @@ def test_celery_task_acks_late_loaded_from_string():
399399
# reload celery conf to apply the new config
400400
importlib.reload(default_celery)
401401
assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False
402+
403+
404+
@conf_vars({("celery", "extra_celery_config"): '{"worker_max_tasks_per_child": 10}'})
405+
def test_celery_extra_celery_config_loaded_from_string():
406+
import importlib
407+
408+
# reload celery conf to apply the new config
409+
importlib.reload(default_celery)
410+
assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == {
411+
"worker_max_tasks_per_child": 10
412+
}

0 commit comments

Comments
 (0)