Skip to content

Commit

Permalink
[Issue #1998] optimize the WHERE in load updates (#1999)
Browse files Browse the repository at this point in the history
## Summary
Fixes #1998

## Changes proposed
- Change the `WHERE` in the update query to use the source table
columns.
- Commit to the database after each chunk, not after each table.

## Context for reviewers
Observing the queries on the PostgreSQL RDS and Oracle sides, we saw
that the `WHERE ... IN (...)` was not being pushed down to Oracle. This
is an attempt to fix that.

## Additional information
N/A
  • Loading branch information
jamesbursa authored May 13, 2024
1 parent bb423dc commit cf4721b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
33 changes: 21 additions & 12 deletions api/src/data_migration/load/load_oracle_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ def load_data(self) -> None:
"""Load the data for all tables defined in the mapping."""
for table_name in self.foreign_tables:
try:
with self.db_session.begin():
self.load_data_for_table(table_name)
self.load_data_for_table(table_name)
except Exception:
logger.exception("table load error", extra={"table": table_name})

Expand All @@ -95,7 +94,8 @@ def do_insert(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.T
"""Determine new rows by primary key, and copy them into the staging table."""

select_sql = sql.build_select_new_rows_sql(foreign_table, staging_table)
new_ids = self.db_session.execute(select_sql).all()
with self.db_session.begin():
new_ids = self.db_session.execute(select_sql).all()

t0 = time.monotonic()
insert_chunk_count = []
Expand All @@ -105,12 +105,17 @@ def do_insert(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.T
)

# Execute the INSERT.
self.db_session.execute(insert_from_select_sql)
with self.db_session.begin():
self.db_session.execute(insert_from_select_sql)

insert_chunk_count.append(len(batch_of_new_ids))
logger.info(
"insert chunk done",
extra={"count": sum(insert_chunk_count), "total": len(new_ids)},
extra={
"table": foreign_table.name,
"count": sum(insert_chunk_count),
"total": len(new_ids),
},
)

t1 = time.monotonic()
Expand All @@ -130,7 +135,8 @@ def do_update(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.T
"""Find updated rows using last_upd_date, copy them, and reset transformed_at to NULL."""

select_sql = sql.build_select_updated_rows_sql(foreign_table, staging_table)
update_ids = self.db_session.execute(select_sql).all()
with self.db_session.begin():
update_ids = self.db_session.execute(select_sql).all()

t0 = time.monotonic()
update_chunk_count = []
Expand All @@ -139,7 +145,8 @@ def do_update(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.T
foreign_table, staging_table, batch_of_update_ids
).values(transformed_at=None)

self.db_session.execute(update_sql)
with self.db_session.begin():
self.db_session.execute(update_sql)

update_chunk_count.append(len(batch_of_update_ids))
logger.info(
Expand Down Expand Up @@ -171,7 +178,8 @@ def do_mark_deleted(
)

t0 = time.monotonic()
result = self.db_session.execute(update_sql)
with self.db_session.begin():
result = self.db_session.execute(update_sql)
t1 = time.monotonic()
delete_count = result.rowcount

Expand All @@ -184,10 +192,11 @@ def do_mark_deleted(
def log_row_count(self, message: str, *tables: sqlalchemy.Table) -> None:
"""Log the number of rows in each of the tables using SQL COUNT()."""
extra = {}
for table in tables:
count = self.db_session.query(table).count()
extra[f"count.{table.schema}.{table.name}"] = count
self.set_metrics({f"count.{message}.{table.schema}.{table.name}": count})
with self.db_session.begin():
for table in tables:
count = self.db_session.query(table).count()
extra[f"count.{table.schema}.{table.name}"] = count
self.set_metrics({f"count.{message}.{table.schema}.{table.name}": count})
logger.info(f"row count {message}", extra=extra, stacklevel=2)


Expand Down
2 changes: 1 addition & 1 deletion api/src/data_migration/load/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def build_update_sql(
.where(
sqlalchemy.tuple_(*destination_table.primary_key.columns)
== sqlalchemy.tuple_(*source_table.primary_key.columns),
sqlalchemy.tuple_(*destination_table.primary_key.columns).in_(ids),
sqlalchemy.tuple_(*source_table.primary_key.columns).in_(ids),
)
)

Expand Down
2 changes: 1 addition & 1 deletion api/tests/src/data_migration/load/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_build_update_sql(source_table, destination_table):
"last_upd_date=test_source_table.last_upd_date FROM test_source_table "
"WHERE (test_destination_table.id1, test_destination_table.id2) = "
"(test_source_table.id1, test_source_table.id2) AND "
"(test_destination_table.id1, test_destination_table.id2) "
"(test_source_table.id1, test_source_table.id2) "
"IN (__[POSTCOMPILE_param_1])"
)

Expand Down

1 comment on commit cf4721b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage report for ./frontend

St.
Category Percentage Covered / Total
🟢 Statements 84.14% 870/1034
🟡 Branches 65.01% 223/343
🟡 Functions 75.58% 164/217
🟢 Lines 84.18% 809/961

Test suite run success

164 tests passing in 56 suites.

Report generated by 🧪jest coverage report action from cf4721b

Please sign in to comment.