Skip to content

Commit

Permalink
Use csv file for sync_table_with_pandas schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentAntoine committed Jan 22, 2025
1 parent 2d61248 commit 792c8f1
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"source_database","query_filepath","schema","table_name","backend","geom_col","destination_database","destination_table","ddl_script_path","post_processing_script_path","final_table","cron_string"
"monitorenv_remote",,"public","amp_cacem","geopandas","geom","monitorenv","amp_cacem_tmp","monitorenv/create_amp_cacem_tmp.sql","post_process_amp_cacem.sql","amp_cacem",
"monitorenv_remote","monitorenv_remote/analytics_actions.sql",,,"pandas","geom","monitorenv","analytics_actions","monitorenv/create_analytics_actions.sql",,,
"monitorfish_remote","monitorfish_remote/analytics_controls_full_data.sql",,,"pandas","geom","monitorfish","analytics_controls_full_data","monitorfish/create_analytics_controls_full_data.sql",,,
"monitorenv_remote",,"public","amp_cacem","geopandas","geom","monitorenv","amp_cacem_tmp","monitorenv/create_amp_cacem_tmp.sql","post_process_amp_cacem.sql","amp_cacem","31 1 * * *"
"monitorenv_remote","monitorenv_remote/analytics_actions.sql",,,"pandas","geom","monitorenv","analytics_actions","monitorenv/create_analytics_actions.sql",,,"30 * * * *"
"monitorfish_remote","monitorfish_remote/analytics_controls_full_data.sql",,,"pandas","geom","monitorfish","analytics_controls_full_data","monitorfish/create_analytics_controls_full_data.sql",,,"32 1 * * *"
47 changes: 26 additions & 21 deletions forklift/forklift/pipeline/flows_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from copy import deepcopy

import pandas as pd
from dotenv import dotenv_values
from prefect.run_configs.docker import DockerRun
from prefect.schedules import CronSchedule, Schedule, clocks
Expand All @@ -10,30 +11,33 @@
FLOWS_LOCATION,
FORKLIFT_DOCKER_IMAGE,
FORKLIFT_VERSION,
LIBRARY_LOCATION,
ROOT_DIRECTORY,
)
from forklift.pipeline.flows import clean_flow_runs, reset_proxy_pg_database, sync_table
from forklift.pipeline.flows import (
clean_flow_runs,
reset_proxy_pg_database,
sync_table,
sync_table_with_pandas,
)


def make_cron_clock_from_run_param_series(s: pd.Series) -> clocks.CronClock:
d = s.dropna().to_dict()
cron_string = d.pop("cron_string")
return clocks.CronClock(cron=cron_string, parameter_defaults=d)


################################ Define flow schedules ################################
def get_flows_to_register():
clean_flow_runs_flow = deepcopy(clean_flow_runs.flow)
reset_proxy_pg_database_flow = deepcopy(reset_proxy_pg_database.flow)
sync_table_flow = deepcopy(sync_table.flow)
sync_table_with_pandas_flow = deepcopy(sync_table_with_pandas.flow)

clean_flow_runs_flow.schedule = CronSchedule("8,18,28,38,48,58 * * * *")
sync_table_flow.schedule = Schedule(
clocks=[
clocks.CronClock(
"25 4 * * *",
parameter_defaults={
"source_database": "monitorfish_proxy",
"source_table": "analytics_controls_full_data",
"destination_database": "monitorfish",
"destination_table": "analytics_controls_full_data",
"ddl_script_path": "monitorfish/create_analytics_controls_full_data.sql",
},
),
clocks.CronClock(
"30 4 * * *",
parameter_defaults={
Expand All @@ -44,16 +48,16 @@ def get_flows_to_register():
"order_by": "year",
},
),
clocks.CronClock(
"35 * * * *",
parameter_defaults={
"source_database": "monitorenv_proxy",
"source_table": "analytics_actions",
"destination_database": "monitorenv",
"destination_table": "analytics_actions",
"ddl_script_path": "monitorenv/create_analytics_actions.sql",
},
),
]
)

scheduled_runs = pd.read_csv(
LIBRARY_LOCATION / "pipeline/flow_schedules/sync_table_with_pandas.csv"
)
sync_table_with_pandas_flow.schedule = Schedule(
clocks=[
make_cron_clock_from_run_param_series(s=run[1])
for run in scheduled_runs.iterrows()
]
)

Expand All @@ -62,6 +66,7 @@ def get_flows_to_register():
clean_flow_runs_flow,
reset_proxy_pg_database_flow,
sync_table_flow,
sync_table_with_pandas_flow,
]

############################## Define flows' storage ##############################
Expand Down

0 comments on commit 792c8f1

Please sign in to comment.