diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index 4126c5a41..17174cf8c 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -4,11 +4,11 @@ from functools import partial import pendulum -from psycopg2 import sql -from airflow import DAG -from airflow.operators.python_operator import PythonOperator +from airflow.decorators import dag, task from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook +from airflow.operators.python import get_current_context + try: repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) sys.path.insert(0, repo_path) @@ -17,115 +17,82 @@ except: raise ImportError("Cannot import DAG helper functions.") -# Credentials - to be passed through PythonOperator -# bigdata connection credentials -bigdata_cred = PostgresHook("gcc_bot_bigdata") -# On-prem server connection credentials -ptc_cred = PostgresHook("gcc_bot") - -DAG_NAME = 'pull_gcc_layers' -DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) +# the DAG runs at 7 am on the first day of January, April, July, and October +def create_gcc_puller_dag(dag_id, default_args, name, conn_id): + @dag( + dag_id=dag_id, + default_args=default_args, + catchup=False, + tags=['gcc', name], + schedule='0 7 1 */3 *' #'@quarterly' + ) + def gcc_layers_dag(): + + @task() + def get_layers(name): + tables = Variable.get('gcc_layers', deserialize_json=True) + return tables[name] + + @task() #add after 2.9.3 upgrade: map_index_template="{{ table_name }}" + def pull_layer(layer, conn_id): + #name mapped task (implement after 2.9.3 upgrade) + #context = get_current_context() + #context["table_name"] = layer[0] + #get db connection + conn = PostgresHook(conn_id).get_conn() + + #pull and insert layer + get_layer( + mapserver_n = layer[1].get("mapserver"), + layer_id = layer[1].get("layer_id"), + schema_name = layer[1].get("schema_name"), + is_audited = layer[1].get("is_audited"), + primary_key = layer[1].get("pk"), + con = conn + ) -DEFAULT_ARGS = { - 'owner': ','.join(DAG_OWNERS), - 'depends_on_past': False, - 'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"), - 'email_on_failure': False, - 'retries': 0, - 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) -} + #refresh mat views as necessary + agg_sql = layer[1].get("agg") + if agg_sql is not None: + with conn.cursor() as cur: + cur.execute(agg_sql) -#------------------------------------------------------------------------------------------------------- -bigdata_layers = { - "city_ward": [0, 0, 'gis_core', True], # VFH Layers - "centreline": [0, 2, 'gis_core', False], # VFH Layers - "ibms_grid": [11, 25, 'gis_core', True], # VFH Layers - "centreline_intersection_point": [0, 19, 'gis_core', False], # VFH Layers - "intersection": [12, 42, 'gis_core', False], - "census_tract": [26, 7, 'gis_core', True], - "neighbourhood_improvement_area": [26, 11, 'gis_core', True], - "priority_neighbourhood_for_investment": [26, 13, 'gis_core', True], - #"bikeway": [2, 2, 'gis', True], #replaced by cycling_infrastructure - "cycling_infrastructure": [2, 49, 'gis', True], - "traffic_camera": [2, 3, 'gis', True], - "permit_parking_area": [2, 11, 'gis', True], - "prai_transit_shelter": [2, 35, 'gis', True], - "traffic_bylaw_point": [2, 38, 'gis', True], - "traffic_bylaw_line": [2, 39, 'gis', True], - "loop_detector": [2, 46, 'gis', True], - "electrical_vehicle_charging_station": [20, 1, 'gis', True], - "day_care_centre": [22, 1, 'gis', True], - "middle_childcare_centre": [22, 2, 'gis', True], - "business_improvement_area": [23, 1, 'gis', True], - "proposed_business_improvement_area": [23, 13, 'gis', True], - "film_permit_all": [23, 9, 'gis', True], - "film_permit_parking_all": [23, 10, 'gis', True], - "hotel": [23, 12, 'gis', True], - "convenience_store": [26, 1, 'gis', True], - "supermarket": [26, 4, 'gis', True], - "place_of_worship": [26, 5, 'gis', True], - "ymca": [26, 6, 'gis', True], - "aboriginal_organization": [26, 45, 'gis', True], - "attraction": [26, 46, 'gis', True], - "dropin": [26, 47, 'gis', True], - "early_years_centre": [26, 48, 'gis', True], - "family_resource_centre": [26, 49, 'gis', True], - "food_bank": [26, 50, 'gis', True], - "longterm_care": [26, 53, 'gis', True], - "parenting_family_literacy": [26, 54, 'gis', True], - "retirement_home": [26, 58, 'gis', True], - "senior_housing": [26, 59, 'gis', True], - "shelter": [26, 61, 'gis', True], - "social_housing": [26, 62, 'gis', True], - "private_road": [27, 13, 'gis', True], - "school": [28, 17, 'gis', True], - "library": [28, 28, 'gis', True], - "pavement_asset": [2, 36, 'gis', True], -} + layers = get_layers(name) + pull_layer.partial(conn_id = conn_id).expand(layer = layers) + generated_dag = gcc_layers_dag() -ptc_layers = { - "city_ward": [0, 0, 'gis', True], - "centreline": [0, 2, 'gis', False], - "intersection": [12, 42, 'gis', False], - "centreline_intersection_point": [0, 19, 'gis', False], - "ibms_grid": [11, 25, 'gis', True], - "ibms_district": [11, 23, 'gis', True], -} + return generated_dag -# the DAG runs at 7 am on the first day of January, April, July, and October -with DAG( - dag_id = DAG_NAME, - catchup=False, - default_args=DEFAULT_ARGS, - schedule='0 7 1 */3 *' #'@quarterly' -) as gcc_layers_dag: - deployment = os.environ.get("DEPLOYMENT", "PROD") +#get puller details from airflow variable +DAGS = Variable.get('gcc_dags', deserialize_json=True) - if deployment == "DEV": - for layer, attributes in bigdata_layers.items(): - pull_bigdata_layer = PythonOperator( - task_id = 'bigdata_task_'+ str(layer), - python_callable = get_layer, - op_args = attributes + [bigdata_cred] - ) +#identify the appropriate pullers based on deployment +dep = os.environ.get("DEPLOYMENT", "PROD") +filtered_dags = [ + key for key, facts in DAGS.items() if dep in facts['deployments'] +] - for layer, attributes in ptc_layers.items(): - pull_ptc_layer = PythonOperator( - task_id = 'VFH_task_'+ str(layer), - python_callable = get_layer, - op_args = attributes + [ptc_cred] - ) +for item in filtered_dags: + DAG_NAME = 'gcc_pull_layers' + DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) - if layer in ['centreline', 'intersection']: - sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( - function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') - ) - refresh_mat_view = PythonOperator( - python_callable=lambda:ptc_cred.get_conn().cursor().execute(sql_refresh_mat_view), - task_id=f'refresh_{layer}_version_date', - retries = 0 - ) + DEFAULT_ARGS = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past': False, + 'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"), + 'email_on_failure': False, + 'retries': 0, + 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) + } - pull_ptc_layer >> refresh_mat_view \ No newline at end of file + dag_name = f"{DAG_NAME}_{item}" + globals()[dag_name] = ( + create_gcc_puller_dag( + dag_id=dag_name, + default_args=DEFAULT_ARGS, + name=dag_name, + conn_id=DAGS[item]['conn'], + ) + ) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 9aadd339b..38d1698f7 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -16,52 +16,6 @@ LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) -#------------------------------------------------------------------------------------------------------- -pk_dict = { - "city_ward": "area_id", - "census_tract": "area_id", - "cycling_infrastructure": "objectid", - "neighbourhood_improvement_area": "area_id", - "priority_neighbourhood_for_investment": "area_id", - "ibms_district": "area_id", - "ibms_grid": "area_id", - "bikeway": "centreline_id", - "traffic_camera": "rec_id", - "permit_parking_area": "area_long_code", - "prai_transit_shelter": "id", - "traffic_bylaw_point": "objectid", - "traffic_bylaw_line": "objectid", - "loop_detector": "id", - "electrical_vehicle_charging_station": "id", - "day_care_centre": "loc_id", - "middle_childcare_centre": "id", - "business_improvement_area": "area_id", - "proposed_business_improvement_area": "objectid", - "film_permit_all": "objectid", - "film_permit_parking_all": "objectid", - "hotel": "id", - "convenience_store": "objectid", - "supermarket": "objectid", - "place_of_worship": "objectid", - "ymca": "objectid", - "aboriginal_organization": "id", - "attraction": "objectid", - "dropin": "objectid", - "early_years_centre": "id", - "family_resource_centre": "objectid", - "food_bank": "objectid", - "longterm_care": "id", - "parenting_family_literacy": "id", - "retirement_home": "id", - "senior_housing": "objectid", - "shelter": "objectid", - "social_housing": "objectid", - "private_road": "objectid", - "school": "objectid", - "library": "id", - "pavement_asset": "objectid", -} -#------------------------------------------------------------------------------------------------------- def mapserver_name(mapserver_n): """ Function to return the mapserver name from integer @@ -596,7 +550,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -639,8 +593,10 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = LOGGER.error("Invalid mapserver and/or layer Id") return #-------------------------------- - if is_audited: - primary_key = pk_dict.get(output_table) + if is_audited and primary_key is None: + LOGGER.error("Audited tables should have a primary key.") + if not(is_audited) and primary_key is not None: + LOGGER.error("Non-audited tables do not use the primary key.") #-------------------------------- keep_adding = True counter = 0 @@ -699,9 +655,11 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = , help = 'Name of destination schema') @click.option('--is-audited', '-a', is_flag=True, show_default=True, default=False, help = 'Whether the table is supposed to be audited (T) or partitioned (F)') +@click.option('--primary-key', '-pk', type = str, default=None, required = False, + help = 'Primary key. Only include if table is audited.') @click.option('--con', '-c', type = str, required = True, help = 'The path to the credential config file') -def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): +def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con): """ This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. @@ -715,7 +673,14 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): dbset = CONFIG['DBSETTINGS'] connection_obj = connect(**dbset) # get_layer function - get_layer(mapserver, layer_id, schema_name, is_audited, con=connection_obj) + get_layer( + mapserver_n = mapserver, + layer_id = layer_id, + schema_name = schema_name, + is_audited = is_audited, + primary_key = primary_key, + con=connection_obj + ) if __name__ == '__main__': manual_get_layer() diff --git a/test/integration/test_dags.py b/test/integration/test_dags.py index 2f38610a7..2cdbeae20 100644 --- a/test/integration/test_dags.py +++ b/test/integration/test_dags.py @@ -25,10 +25,11 @@ 'AIRFLOW_VAR_SLACK_MEMBER_ID': '{"var": "value"}', 'AIRFLOW_VAR_SSZ_SPREADSHEET_IDS': "{"+",".join([f'"ssz{year}": "value"' for year in range(2018, datetime.now().year+1)])+"}", 'AIRFLOW_VAR_COLLISIONS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 2)])+"]", - 'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]", - 'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", - 'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}', - 'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]" + 'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}', + 'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_GCC_DAGS': '{"dag": {"conn": "value", "deployments": ["value"]}}', } SAMPLE_CONN = Connection( conn_type="sample_type",