DbApiHook.insert_rows
unnecessarily restarting connections
#40608
-
Apache Airflow Provider(s)common-sql Versions of Apache Airflow Providers
Apache Airflow version2.9.2 Operating SystemMacOS Sonoma 14.5 (docker host) DeploymentDocker-Compose Deployment detailsI'm using the official Airflow What happenedThe database connection is restarted multiple times during a single What you think should happen instead
How to reproduceCreating a temporary test project mkdir /tmp/airflow/
cd /tmp/airflow/
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml' Add the following mysql db to the docker-compose file mysql:
image: mysql:latest
environment:
MYSQL_DATABASE: 'db'
MYSQL_ROOT_PASSWORD: 'airflow'
ports:
- '3306:3306' Run the docker compose docker compose up -d Add the following connections to Airflow using airflow connections add postgres_default --conn-uri postgresql://airflow:airflow@postgres
airflow connections add mysql_default --conn-uri mysql://root:airflow@mysql/db Then open a python shell and execute the following scripts: from airflow.providers.postgres.hooks.postgres import PostgresHook
pg = PostgresHook()
pg.run("CREATE TABLE IF NOT EXISTS t (a int)")
pg.insert_rows(
table="t",
rows=[[i] for i in range(10_000)],
target_fields="a",
) And for MySQL from airflow.providers.mysql.hooks.mysql import MySqlHook
mysql = MySqlHook()
mysql.run("CREATE TABLE IF NOT EXISTS t (a int)")
mysql.insert_rows(
table="t",
rows=[[i] for i in range(100)],
target_fields="a",
) Both scripts will open up multiple connections to database while inserting, instead of maintaining just one. Postgres seems to recreate the connection every 1000 inserts, mysql does it after every insert. Postgres: >>> pg.insert_rows(
... table="t",
... rows=[[i] for i in range(10_000)],
... target_fields="a",
... )
[2024-07-04T15:08:13.940+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.942+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:13.996+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:13.997+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.043+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.044+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.090+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.091+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.145+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.146+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.200+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.201+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.245+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.246+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.290+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.291+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.341+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.342+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.393+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.394+0000] {base.py:84} INFO - Using connection ID 'postgres_default' for task execution.
[2024-07-04T15:08:14.441+0000] {sql.py:598} INFO - Loaded 1000 rows into t so far
[2024-07-04T15:08:14.441+0000] {sql.py:611} INFO - Done loading. Loaded a total of 10000 rows into t MySQL >>> mysql.insert_rows(
... table="t",
... rows=[[i] for i in range(100)],
... target_fields="a",
... )
[2024-07-04T15:08:54.551+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.554+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.555+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.556+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.557+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.558+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
...
[2024-07-04T15:08:54.616+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.617+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.618+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.619+0000] {base.py:84} INFO - Using connection ID 'mysql_default' for task execution.
[2024-07-04T15:08:54.620+0000] {sql.py:611} INFO - Done loading. Loaded a total of 100 rows into t Anything elseNo response Are you willing to submit PR?
Code of Conduct
|
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 3 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
The fact that something "uses" connection does not mean that this connection is restarted. Have you actually looked at the code and you checked what's going on there? I suggest you look at insert_rows implementation it handles various scenarios - one of them is when driver supports executemany and another when not - the second is far less efficient of course, but there is not much we can do about it. Please look at the code and see if you can pin-point it to actual error. I am not telling there is no error, but from your original description is not at all clear if there is an error at all. Please elaborate if you think otherwise. Converting it to a discussion. |
Beta Was this translation helpful? Give feedback.
-
Discussion continued here #40609 |
Beta Was this translation helpful? Give feedback.
Discussion continued here #40609