Skip to content

Commit

Permalink
Update sync_table_with_pandas flow and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Jan 22, 2025
1 parent 058a313 commit 2819192
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"source_database","query_filepath","schema","table_name","backend","geom_col","destination_database","destination_table","ddl_script_path","post_processing_script_path","final_table","cron_string"
"monitorenv_remote",,"public","amp_cacem","geopandas","geom","monitorenv","amp_cacem_tmp","monitorenv/create_amp_cacem_tmp.sql","post_process_amp_cacem.sql","amp_cacem",
"monitorenv_remote","monitorenv_remote/analytics_actions.sql",,,"pandas","geom","monitorenv","analytics_actions","monitorenv/create_analytics_actions.sql",,,
"monitorfish_remote","monitorfish_remote/analytics_controls_full_data.sql",,,"pandas","geom","monitorfish","analytics_controls_full_data","monitorfish/create_analytics_controls_full_data.sql",,,
99 changes: 92 additions & 7 deletions forklift/forklift/pipeline/flows/sync_table_with_pandas.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,111 @@
from pathlib import Path
from typing import Optional

import pandas as pd
import prefect
from prefect import Flow, Parameter, case, task

from forklift.pipeline.helpers.generic import extract
from forklift.pipeline.shared_tasks.control_flow import check_flow_not_running
from forklift.pipeline.helpers.generic import extract, read_table
from forklift.pipeline.shared_tasks.control_flow import (
check_flow_not_running,
parameter_is_given,
)
from forklift.pipeline.shared_tasks.generic import (
create_database_if_not_exists,
drop_table_if_exists,
load_df_to_data_warehouse,
run_data_flow_script,
run_ddl_script,
)


@task(checkpoint=False)
def extract_df(source_database: str, query_filepath: str) -> pd.DataFrame:
return extract(db_name=source_database, query_filepath=query_filepath)
def extract_df(
source_database: str,
query_filepath: str = None,
schema: str = None,
table_name: str = None,
backend: str = "pandas",
geom_col: str = "geom",
crs: Optional[int] = None,
) -> pd.DataFrame:
logger = prefect.context.get("logger")

if query_filepath:
assert isinstance(query_filepath, str)
try:
assert table_name is None and schema is None
except AssertionError:
logger.error(
(
"A `table_name` or a `schema` cannot be supplied "
"at the same time as a `query_filepath`."
)
)
raise

return extract(
db_name=source_database,
query_filepath=query_filepath,
backend=backend,
geom_col=geom_col,
crs=crs,
)

else:
try:
assert isinstance(table_name, str) and isinstance(schema, str)
except AssertionError:
logger.error(
(
"A `table_name` or a `schema` must be supplied "
"if no `query_filepath` if given."
)
)
raise

res = read_table(
db=source_database,
schema=schema,
table_name=table_name,
backend=backend,
geom_col=geom_col,
crs=crs,
)
return res


with Flow("Sync table with pandas") as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
source_database = Parameter("source_database")
query_filepath = Parameter("query_filepath", default=None)
schema = Parameter("schema", default=None)
table_name = Parameter("table_name", default=None)
backend = Parameter("backend", default="pandas")
geom_col = Parameter("geom_col", default="geom")
crs = Parameter("crs", default=4326)
destination_database = Parameter("destination_database")
destination_table = Parameter("destination_table")
ddl_script_path = Parameter("ddl_script_path", default=None)
post_processing_script_path = Parameter(
"post_processing_script_path", default=None
)
final_table = Parameter("final_table")

df = extract_df(
source_database=source_database,
query_filepath=query_filepath,
schema=schema,
table_name=table_name,
backend=backend,
geom_col=geom_col,
crs=crs,
)

create_database = create_database_if_not_exists(destination_database)

drop_table = drop_table_if_exists(
drop_intermediate_table = drop_table_if_exists(
destination_database,
destination_table,
upstream_tasks=[create_database],
Expand All @@ -43,14 +114,28 @@ def extract_df(source_database: str, query_filepath: str) -> pd.DataFrame:
ddl_script_path,
database=destination_database,
table=destination_table,
upstream_tasks=[drop_table],
upstream_tasks=[drop_intermediate_table],
)
load_df_to_data_warehouse(
loaded_df = load_df_to_data_warehouse(
df,
destination_database=destination_database,
destination_table=destination_table,
upstream_tasks=[created_table],
)

post_processing_needed = parameter_is_given(post_processing_script_path, str)

with case(post_processing_needed, True):
drop_final_table = drop_table_if_exists(
database=destination_database, table=final_table
)
post_processing = run_data_flow_script(
post_processing_script_path, upstream_tasks=[drop_final_table]
)
drop_table_if_exists(
database=destination_database,
table=destination_table,
upstream_tasks=[post_processing],
)

flow.file_name = Path(__file__).name
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE monitorenv.amp_cacem ENGINE MergeTree ORDER BY id AS
SELECT
* EXCEPT(geom),
readWKTMultiPolygon(geom) AS geom
FROM monitorenv.amp_cacem_tmp
ORDER BY id
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE {database:Identifier}.{table:Identifier} (
id Int32,
geom String,
mpa_oriname String,
des_desigfr String,
row_hash Nullable(String),
mpa_type Nullable(String),
ref_reg Nullable(String),
url_legicem Nullable(String)
)
ENGINE MergeTree
ORDER BY mpa_oriname
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
import pandas as pd
import pytest

from forklift.config import LIBRARY_LOCATION
from forklift.db_engines import create_datawarehouse_client
from forklift.pipeline.flows.sync_table_with_pandas import flow
from tests.mocks import mock_check_flow_not_running

flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running)

scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_with_pandas.csv"
).drop(columns=["cron_string"])
parameters = ",".join(scheduled_runs.columns)
try:
assert parameters == (
"source_database,query_filepath,schema,table_name,backend,geom_col,"
"destination_database,destination_table,ddl_script_path,"
"post_processing_script_path,final_table"
)
except AssertionError:
raise ValueError("Test fixtures non coherent with CSV columns")

parameter_values = [
tuple(r[1].where(r[1].notnull(), None)) for r in scheduled_runs.iterrows()
]


@pytest.mark.parametrize(
(
"source_database,"
"query_filepath,"
"destination_database,"
"destination_table,"
"ddl_script_path,"
),
[
(
"monitorenv_remote",
"monitorenv_remote/analytics_actions.sql",
"monitorenv",
"analytics_actions",
"monitorenv/create_analytics_actions.sql",
),
(
"monitorfish_remote",
"monitorfish_remote/analytics_controls_full_data.sql",
"monitorfish",
"analytics_controls_full_data",
"monitorfish/create_analytics_controls_full_data.sql",
),
],
)
def test_sync_table(
@pytest.mark.parametrize(parameters, parameter_values)
def test_sync_table_with_pandas(
add_monitorenv_proxy_database,
source_database,
query_filepath,
schema,
table_name,
backend,
geom_col,
destination_database,
destination_table,
ddl_script_path,
post_processing_script_path,
final_table,
):
print(
f"Testing syncing of {destination_database}.{destination_table} from {source_database}."
Expand All @@ -47,33 +49,46 @@ def test_sync_table(
state = flow.run(
source_database=source_database,
query_filepath=query_filepath,
schema=schema,
table_name=table_name,
backend=backend,
geom_col=geom_col,
destination_database=destination_database,
destination_table=destination_table,
ddl_script_path=ddl_script_path,
post_processing_script_path=post_processing_script_path,
final_table=final_table,
)

assert state.is_successful()

# Check data is loaded to the desired table and no other table stays lingering
expected_table = final_table or destination_table
tables_in_db = client.query_df(
"SHOW TABLES FROM {database:Identifier}",
parameters={
"database": destination_database,
},
)

pd.testing.assert_frame_equal(
tables_in_db, pd.DataFrame({"name": [expected_table]}), check_dtype=False
)

df = client.query_df(
(
"SELECT * FROM "
"{destination_database:Identifier}.{destination_table:Identifier}"
),
("SELECT * FROM " "{database:Identifier}.{table:Identifier}"),
parameters={
"destination_database": destination_database,
"destination_table": destination_table,
"database": destination_database,
"table": expected_table,
},
)

assert len(df) > 0

client.command(
(
"DROP TABLE "
"{destination_database:Identifier}.{destination_table:Identifier}"
),
("DROP TABLE " "{database:Identifier}.{table:Identifier}"),
parameters={
"destination_database": destination_database,
"destination_table": destination_table,
"database": destination_database,
"table": expected_table,
},
)

0 comments on commit 2819192

Please sign in to comment.