Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1032 move gcc puller tables to airflow variable #1035

Merged
merged 7 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 72 additions & 105 deletions dags/gcc_layers_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from functools import partial

import pendulum
from psycopg2 import sql
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.decorators import dag, task
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import get_current_context

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
Expand All @@ -17,115 +17,82 @@
except:
raise ImportError("Cannot import DAG helper functions.")

# Credentials - to be passed through PythonOperator
# bigdata connection credentials
bigdata_cred = PostgresHook("gcc_bot_bigdata")
# On-prem server connection credentials
ptc_cred = PostgresHook("gcc_bot")

DAG_NAME = 'pull_gcc_layers'

DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])
# the DAG runs at 7 am on the first day of January, April, July, and October
def create_gcc_puller_dag(dag_id, default_args, name, conn_id):
@dag(
dag_id=dag_id,
default_args=default_args,
catchup=False,
tags=['gcc', name],
schedule='0 7 1 */3 *' #'@quarterly'
)
def gcc_layers_dag():

@task()
def get_layers(name):
tables = Variable.get('gcc_layers', deserialize_json=True)
return tables[name]

@task() #add after 2.9.3 upgrade: map_index_template="{{ table_name }}"
def pull_layer(layer, conn_id):
#name mapped task (implement after 2.9.3 upgrade)
#context = get_current_context()
#context["table_name"] = layer[0]
#get db connection
conn = PostgresHook(conn_id).get_conn()

#pull and insert layer
get_layer(
mapserver_n = layer[1].get("mapserver"),
layer_id = layer[1].get("layer_id"),
schema_name = layer[1].get("schema_name"),
is_audited = layer[1].get("is_audited"),
primary_key = layer[1].get("pk"),
con = conn
)

DEFAULT_ARGS = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"),
'email_on_failure': False,
'retries': 0,
'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True)
}
#refresh mat views as necessary
agg_sql = layer[1].get("agg")
if agg_sql is not None:
with conn.cursor() as cur:
cur.execute(agg_sql)

#-------------------------------------------------------------------------------------------------------
bigdata_layers = {
"city_ward": [0, 0, 'gis_core', True], # VFH Layers
"centreline": [0, 2, 'gis_core', False], # VFH Layers
"ibms_grid": [11, 25, 'gis_core', True], # VFH Layers
"centreline_intersection_point": [0, 19, 'gis_core', False], # VFH Layers
"intersection": [12, 42, 'gis_core', False],
"census_tract": [26, 7, 'gis_core', True],
"neighbourhood_improvement_area": [26, 11, 'gis_core', True],
"priority_neighbourhood_for_investment": [26, 13, 'gis_core', True],
#"bikeway": [2, 2, 'gis', True], #replaced by cycling_infrastructure
"cycling_infrastructure": [2, 49, 'gis', True],
"traffic_camera": [2, 3, 'gis', True],
"permit_parking_area": [2, 11, 'gis', True],
"prai_transit_shelter": [2, 35, 'gis', True],
"traffic_bylaw_point": [2, 38, 'gis', True],
"traffic_bylaw_line": [2, 39, 'gis', True],
"loop_detector": [2, 46, 'gis', True],
"electrical_vehicle_charging_station": [20, 1, 'gis', True],
"day_care_centre": [22, 1, 'gis', True],
"middle_childcare_centre": [22, 2, 'gis', True],
"business_improvement_area": [23, 1, 'gis', True],
"proposed_business_improvement_area": [23, 13, 'gis', True],
"film_permit_all": [23, 9, 'gis', True],
"film_permit_parking_all": [23, 10, 'gis', True],
"hotel": [23, 12, 'gis', True],
"convenience_store": [26, 1, 'gis', True],
"supermarket": [26, 4, 'gis', True],
"place_of_worship": [26, 5, 'gis', True],
"ymca": [26, 6, 'gis', True],
"aboriginal_organization": [26, 45, 'gis', True],
"attraction": [26, 46, 'gis', True],
"dropin": [26, 47, 'gis', True],
"early_years_centre": [26, 48, 'gis', True],
"family_resource_centre": [26, 49, 'gis', True],
"food_bank": [26, 50, 'gis', True],
"longterm_care": [26, 53, 'gis', True],
"parenting_family_literacy": [26, 54, 'gis', True],
"retirement_home": [26, 58, 'gis', True],
"senior_housing": [26, 59, 'gis', True],
"shelter": [26, 61, 'gis', True],
"social_housing": [26, 62, 'gis', True],
"private_road": [27, 13, 'gis', True],
"school": [28, 17, 'gis', True],
"library": [28, 28, 'gis', True],
"pavement_asset": [2, 36, 'gis', True],
}
layers = get_layers(name)
pull_layer.partial(conn_id = conn_id).expand(layer = layers)

generated_dag = gcc_layers_dag()

ptc_layers = {
"city_ward": [0, 0, 'gis', True],
"centreline": [0, 2, 'gis', False],
"intersection": [12, 42, 'gis', False],
"centreline_intersection_point": [0, 19, 'gis', False],
"ibms_grid": [11, 25, 'gis', True],
"ibms_district": [11, 23, 'gis', True],
}
return generated_dag

# the DAG runs at 7 am on the first day of January, April, July, and October
with DAG(
dag_id = DAG_NAME,
catchup=False,
default_args=DEFAULT_ARGS,
schedule='0 7 1 */3 *' #'@quarterly'
) as gcc_layers_dag:
deployment = os.environ.get("DEPLOYMENT", "PROD")
#get puller details from airflow variable
DAGS = Variable.get('gcc_dags', deserialize_json=True)

if deployment == "DEV":
for layer, attributes in bigdata_layers.items():
pull_bigdata_layer = PythonOperator(
task_id = 'bigdata_task_'+ str(layer),
python_callable = get_layer,
op_args = attributes + [bigdata_cred]
)
#identify the appropriate pullers based on deployment
dep = os.environ.get("DEPLOYMENT", "PROD")
filtered_dags = [
key for key, facts in DAGS.items() if dep in facts['deployments']
]

for layer, attributes in ptc_layers.items():
pull_ptc_layer = PythonOperator(
task_id = 'VFH_task_'+ str(layer),
python_callable = get_layer,
op_args = attributes + [ptc_cred]
)
for item in filtered_dags:
DAG_NAME = 'gcc_pull_layers'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

if layer in ['centreline', 'intersection']:
sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format(
function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date')
)
refresh_mat_view = PythonOperator(
python_callable=lambda:ptc_cred.get_conn().cursor().execute(sql_refresh_mat_view),
task_id=f'refresh_{layer}_version_date',
retries = 0
)
DEFAULT_ARGS = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"),
'email_on_failure': False,
'retries': 0,
'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True)
}

pull_ptc_layer >> refresh_mat_view
dag_name = f"{DAG_NAME}_{item}"
globals()[dag_name] = (
create_gcc_puller_dag(
dag_id=dag_name,
default_args=DEFAULT_ARGS,
name=dag_name,
conn_id=DAGS[item]['conn'],
)
)
67 changes: 16 additions & 51 deletions gis/gccview/gcc_puller_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,6 @@
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

#-------------------------------------------------------------------------------------------------------
pk_dict = {
"city_ward": "area_id",
"census_tract": "area_id",
"cycling_infrastructure": "objectid",
"neighbourhood_improvement_area": "area_id",
"priority_neighbourhood_for_investment": "area_id",
"ibms_district": "area_id",
"ibms_grid": "area_id",
"bikeway": "centreline_id",
"traffic_camera": "rec_id",
"permit_parking_area": "area_long_code",
"prai_transit_shelter": "id",
"traffic_bylaw_point": "objectid",
"traffic_bylaw_line": "objectid",
"loop_detector": "id",
"electrical_vehicle_charging_station": "id",
"day_care_centre": "loc_id",
"middle_childcare_centre": "id",
"business_improvement_area": "area_id",
"proposed_business_improvement_area": "objectid",
"film_permit_all": "objectid",
"film_permit_parking_all": "objectid",
"hotel": "id",
"convenience_store": "objectid",
"supermarket": "objectid",
"place_of_worship": "objectid",
"ymca": "objectid",
"aboriginal_organization": "id",
"attraction": "objectid",
"dropin": "objectid",
"early_years_centre": "id",
"family_resource_centre": "objectid",
"food_bank": "objectid",
"longterm_care": "id",
"parenting_family_literacy": "id",
"retirement_home": "id",
"senior_housing": "objectid",
"shelter": "objectid",
"social_housing": "objectid",
"private_road": "objectid",
"school": "objectid",
"library": "id",
"pavement_asset": "objectid",
}
#-------------------------------------------------------------------------------------------------------
def mapserver_name(mapserver_n):
"""
Function to return the mapserver name from integer
Expand Down Expand Up @@ -596,7 +550,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche
return successful_execution
#-------------------------------------------------------------------------------------------------------
# base main function, also compatible with Airflow
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None):
def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None):
"""
This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database.

Expand Down Expand Up @@ -639,8 +593,10 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
LOGGER.error("Invalid mapserver and/or layer Id")
return
#--------------------------------
if is_audited:
primary_key = pk_dict.get(output_table)
if is_audited and primary_key is None:
LOGGER.error("Audited tables should have a primary key.")
if not(is_audited) and primary_key is not None:
LOGGER.error("Non-audited tables do not use the primary key.")
#--------------------------------
keep_adding = True
counter = 0
Expand Down Expand Up @@ -699,9 +655,11 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con =
, help = 'Name of destination schema')
@click.option('--is-audited', '-a', is_flag=True, show_default=True, default=False,
help = 'Whether the table is supposed to be audited (T) or partitioned (F)')
@click.option('--primary-key', '-pk', type = str, default=None, required = False,
help = 'Primary key. Only include if table is audited.')
@click.option('--con', '-c', type = str, required = True,
help = 'The path to the credential config file')
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con):
def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con):
"""
This script pulls a GIS layer from GCC servers into the databases of
the Data and Analytics Unit.
Expand All @@ -715,7 +673,14 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con):
dbset = CONFIG['DBSETTINGS']
connection_obj = connect(**dbset)
# get_layer function
get_layer(mapserver, layer_id, schema_name, is_audited, con=connection_obj)
get_layer(
mapserver_n = mapserver,
layer_id = layer_id,
schema_name = schema_name,
is_audited = is_audited,
primary_key = primary_key,
con=connection_obj
)

if __name__ == '__main__':
manual_get_layer()
9 changes: 5 additions & 4 deletions test/integration/test_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
'AIRFLOW_VAR_SLACK_MEMBER_ID': '{"var": "value"}',
'AIRFLOW_VAR_SSZ_SPREADSHEET_IDS': "{"+",".join([f'"ssz{year}": "value"' for year in range(2018, datetime.now().year+1)])+"}",
'AIRFLOW_VAR_COLLISIONS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 2)])+"]",
'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]",
'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]",
'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}',
'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]"
'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]",
'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]",
'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}',
'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]",
'AIRFLOW_VAR_GCC_DAGS': '{"dag": {"conn": "value", "deployments": ["value"]}}',
}
SAMPLE_CONN = Connection(
conn_type="sample_type",
Expand Down