From db07eb1957a729f333c14ce3c26b5a819736295b Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Wed, 6 Mar 2024 21:26:59 +0400 Subject: [PATCH] Make current working directory as templated field in BashOperator (#37928) * Make current working directory as templated field in BashOperator * Update airflow/operators/bash.py Co-authored-by: Tzu-ping Chung --------- Co-authored-by: Tzu-ping Chung --- airflow/operators/bash.py | 6 ++++-- tests/operators/test_bash.py | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py index ff8edcba51aa7..f1edb0d46d91c 100644 --- a/airflow/operators/bash.py +++ b/airflow/operators/bash.py @@ -59,8 +59,10 @@ class BashOperator(BaseOperator): :param skip_on_exit_code: If task exits with this exit code, leave the task in ``skipped`` state (default: 99). If set to ``None``, any non-zero exit code will be treated as a failure. - :param cwd: Working directory to execute the command in. + :param cwd: Working directory to execute the command in (templated). If None (default), the command is run in a temporary directory. + To use current DAG folder as the working directory, + you might set template ``{{ dag_run.dag.folder }}``. Airflow will evaluate the exit code of the Bash command. In general, a non-zero exit code will result in task failure and zero will result in task success. @@ -130,7 +132,7 @@ class BashOperator(BaseOperator): """ - template_fields: Sequence[str] = ("bash_command", "env") + template_fields: Sequence[str] = ("bash_command", "env", "cwd") template_fields_renderers = {"bash_command": "bash", "env": "json"} template_ext: Sequence[str] = (".sh", ".bash") ui_color = "#f0ede4" diff --git a/tests/operators/test_bash.py b/tests/operators/test_bash.py index d00477b11837a..7a52790bcb0b7 100644 --- a/tests/operators/test_bash.py +++ b/tests/operators/test_bash.py @@ -20,6 +20,7 @@ import os import signal from datetime import datetime, timedelta +from pathlib import Path from time import sleep from unittest import mock @@ -244,3 +245,22 @@ def test_bash_operator_kill(self, dag_maker): os.kill(proc.pid, signal.SIGTERM) assert False, "BashOperator's subprocess still running after stopping on timeout!" break + + @pytest.mark.db_test + def test_templated_fields(self, create_task_instance_of_operator): + ti = create_task_instance_of_operator( + BashOperator, + # Templated fields + bash_command='echo "{{ dag_run.dag_id }}"', + env={"FOO": "{{ ds }}"}, + cwd="{{ dag_run.dag.folder }}", + # Other parameters + dag_id="test_templated_fields_dag", + task_id="test_templated_fields_task", + execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + ) + ti.render_templates() + task: BashOperator = ti.task + assert task.bash_command == 'echo "test_templated_fields_dag"' + assert task.env == {"FOO": "2024-02-01"} + assert task.cwd == Path(__file__).absolute().parent.as_posix()