From ad0bcd23cb48751c82b393162b97bd63253d48c5 Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:51:01 -0400 Subject: [PATCH 1/7] #1032 failed first attempt --- dags/gcc_layers_pull.py | 140 ++++++++++++++-------------------------- 1 file changed, 47 insertions(+), 93 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index 4126c5a41..a5d82dbbd 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -5,25 +5,21 @@ import pendulum from psycopg2 import sql -from airflow import DAG -from airflow.operators.python_operator import PythonOperator +from airflow.decorators import dag, task, task_group from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook +from airflow.operators.python import get_current_context + try: repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) sys.path.insert(0, repo_path) from dags.dag_functions import task_fail_slack_alert + from dags.common_tasks import get_variable from gis.gccview.gcc_puller_functions import get_layer except: raise ImportError("Cannot import DAG helper functions.") -# Credentials - to be passed through PythonOperator -# bigdata connection credentials -bigdata_cred = PostgresHook("gcc_bot_bigdata") -# On-prem server connection credentials -ptc_cred = PostgresHook("gcc_bot") - -DAG_NAME = 'pull_gcc_layers' +DAG_NAME = 'gcc_pull_layers' DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) @@ -36,96 +32,54 @@ 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) } -#------------------------------------------------------------------------------------------------------- -bigdata_layers = { - "city_ward": [0, 0, 'gis_core', True], # VFH Layers - "centreline": [0, 2, 'gis_core', False], # VFH Layers - "ibms_grid": [11, 25, 'gis_core', True], # VFH Layers - "centreline_intersection_point": [0, 19, 'gis_core', False], # VFH Layers - "intersection": [12, 42, 'gis_core', False], - "census_tract": [26, 7, 'gis_core', True], - "neighbourhood_improvement_area": [26, 11, 'gis_core', True], - "priority_neighbourhood_for_investment": [26, 13, 'gis_core', True], - #"bikeway": [2, 2, 'gis', True], #replaced by cycling_infrastructure - "cycling_infrastructure": [2, 49, 'gis', True], - "traffic_camera": [2, 3, 'gis', True], - "permit_parking_area": [2, 11, 'gis', True], - "prai_transit_shelter": [2, 35, 'gis', True], - "traffic_bylaw_point": [2, 38, 'gis', True], - "traffic_bylaw_line": [2, 39, 'gis', True], - "loop_detector": [2, 46, 'gis', True], - "electrical_vehicle_charging_station": [20, 1, 'gis', True], - "day_care_centre": [22, 1, 'gis', True], - "middle_childcare_centre": [22, 2, 'gis', True], - "business_improvement_area": [23, 1, 'gis', True], - "proposed_business_improvement_area": [23, 13, 'gis', True], - "film_permit_all": [23, 9, 'gis', True], - "film_permit_parking_all": [23, 10, 'gis', True], - "hotel": [23, 12, 'gis', True], - "convenience_store": [26, 1, 'gis', True], - "supermarket": [26, 4, 'gis', True], - "place_of_worship": [26, 5, 'gis', True], - "ymca": [26, 6, 'gis', True], - "aboriginal_organization": [26, 45, 'gis', True], - "attraction": [26, 46, 'gis', True], - "dropin": [26, 47, 'gis', True], - "early_years_centre": [26, 48, 'gis', True], - "family_resource_centre": [26, 49, 'gis', True], - "food_bank": [26, 50, 'gis', True], - "longterm_care": [26, 53, 'gis', True], - "parenting_family_literacy": [26, 54, 'gis', True], - "retirement_home": [26, 58, 'gis', True], - "senior_housing": [26, 59, 'gis', True], - "shelter": [26, 61, 'gis', True], - "social_housing": [26, 62, 'gis', True], - "private_road": [27, 13, 'gis', True], - "school": [28, 17, 'gis', True], - "library": [28, 28, 'gis', True], - "pavement_asset": [2, 36, 'gis', True], -} - - -ptc_layers = { - "city_ward": [0, 0, 'gis', True], - "centreline": [0, 2, 'gis', False], - "intersection": [12, 42, 'gis', False], - "centreline_intersection_point": [0, 19, 'gis', False], - "ibms_grid": [11, 25, 'gis', True], - "ibms_district": [11, 23, 'gis', True], -} - # the DAG runs at 7 am on the first day of January, April, July, and October -with DAG( +@dag( dag_id = DAG_NAME, catchup=False, default_args=DEFAULT_ARGS, schedule='0 7 1 */3 *' #'@quarterly' -) as gcc_layers_dag: - deployment = os.environ.get("DEPLOYMENT", "PROD") +) +def gcc_layers_dag(): + + @task() + def get_pullers(): + dep = os.environ.get("DEPLOYMENT", "PROD") + pullers = Variable.get('gcc_pullers', deserialize_json=True) + + #return the appropriate pullers + return [(puller, facts['conn']) for puller, facts in pullers.items() if dep in facts['deployments']] + + @task_group() + def pull_and_agg_layers(puller_details): + @task() + def get_conn_id(puller_details) -> str: + return puller_details[1] - if deployment == "DEV": - for layer, attributes in bigdata_layers.items(): - pull_bigdata_layer = PythonOperator( - task_id = 'bigdata_task_'+ str(layer), - python_callable = get_layer, - op_args = attributes + [bigdata_cred] - ) + @task() + def get_layers(puller: str): + tables = Variable.get('gcc_layers', deserialize_json=True) + return tables[puller] + + @task(map_index_template="{{ table_name }}") + def pull_layer(layer, conn_id): + #name mapped task + context = get_current_context() + context["table_name"] = tables[tbl_index] + conn = PostgresHook(conn_id).get_conn() + get_layer(layer, layer.items(), conn) + + #if layer in ['centreline', 'intersection']: + # @task + # def agg_layer(): + # sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( + # function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') + # ) + # with con.cursor() as cur: + # cur.execute(sql_refresh_mat_view) - for layer, attributes in ptc_layers.items(): - pull_ptc_layer = PythonOperator( - task_id = 'VFH_task_'+ str(layer), - python_callable = get_layer, - op_args = attributes + [ptc_cred] - ) + pull_layer.partial(conn_id = get_conn_id(puller_details)).expand(layer = get_layers(puller_details))() #>> agg_layer() - if layer in ['centreline', 'intersection']: - sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( - function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') - ) - refresh_mat_view = PythonOperator( - python_callable=lambda:ptc_cred.get_conn().cursor().execute(sql_refresh_mat_view), - task_id=f'refresh_{layer}_version_date', - retries = 0 - ) + for puller in get_pullers(): + pull_and_agg_layers(puller_details = puller) - pull_ptc_layer >> refresh_mat_view \ No newline at end of file +gcc_layers_dag() \ No newline at end of file From af3ac4f03a0fe5752ebaa4731fd204e43465c4ce Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 9 Aug 2024 09:55:03 -0400 Subject: [PATCH 2/7] #1032 dynamic dag generation --- dags/gcc_layers_pull.py | 124 ++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 56 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index a5d82dbbd..a4985c078 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -4,7 +4,6 @@ from functools import partial import pendulum -from psycopg2 import sql from airflow.decorators import dag, task, task_group from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook @@ -14,72 +13,85 @@ repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) sys.path.insert(0, repo_path) from dags.dag_functions import task_fail_slack_alert - from dags.common_tasks import get_variable from gis.gccview.gcc_puller_functions import get_layer except: raise ImportError("Cannot import DAG helper functions.") -DAG_NAME = 'gcc_pull_layers' -DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) +# the DAG runs at 7 am on the first day of January, April, July, and October +def create_gcc_puller_dag(dag_id, default_args, name, conn_id): + @dag( + dag_id=dag_id, + default_args=default_args, + catchup=False, + tags=['gcc', name], + schedule='0 7 1 */3 *' #'@quarterly' + ) + def gcc_layers_dag(): -DEFAULT_ARGS = { - 'owner': ','.join(DAG_OWNERS), - 'depends_on_past': False, - 'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"), - 'email_on_failure': False, - 'retries': 0, - 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) -} + @task() + def get_layers(name): + tables = Variable.get('gcc_layers', deserialize_json=True) + return tables #[name] + + @task_group() + def pull_and_agg_layers(layer, conn_id): + + @task() #map_index_template="{{ table_name }}" + def pull_layer(layer, conn_id): + #name mapped task + #context = get_current_context() + #context["table_name"] = layer + print(layer) + print(conn_id) + #conn = PostgresHook(conn_id).get_conn() + #get_layer(layer, layer.items(), conn) + + #if layer in ['centreline', 'intersection']: + # @task + # def agg_layer(): + # sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( + # function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') + # ) + # with con.cursor() as cur: + # cur.execute(sql_refresh_mat_view) -# the DAG runs at 7 am on the first day of January, April, July, and October -@dag( - dag_id = DAG_NAME, - catchup=False, - default_args=DEFAULT_ARGS, - schedule='0 7 1 */3 *' #'@quarterly' -) -def gcc_layers_dag(): + pull_layer(layer, conn_id) - @task() - def get_pullers(): - dep = os.environ.get("DEPLOYMENT", "PROD") - pullers = Variable.get('gcc_pullers', deserialize_json=True) + layers = get_layers(name) + #pull_and_agg_layers.override(group_id = f"{name}_pull").partial(conn_id = conn_id).expand(layer = layers) + generated_dag = gcc_layers_dag() - #return the appropriate pullers - return [(puller, facts['conn']) for puller, facts in pullers.items() if dep in facts['deployments']] + return generated_dag - @task_group() - def pull_and_agg_layers(puller_details): - @task() - def get_conn_id(puller_details) -> str: - return puller_details[1] +#get puller details from airflow variable +PULLERS = Variable.get('gcc_pullers', deserialize_json=True) - @task() - def get_layers(puller: str): - tables = Variable.get('gcc_layers', deserialize_json=True) - return tables[puller] - - @task(map_index_template="{{ table_name }}") - def pull_layer(layer, conn_id): - #name mapped task - context = get_current_context() - context["table_name"] = tables[tbl_index] - conn = PostgresHook(conn_id).get_conn() - get_layer(layer, layer.items(), conn) - - #if layer in ['centreline', 'intersection']: - # @task - # def agg_layer(): - # sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( - # function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') - # ) - # with con.cursor() as cur: - # cur.execute(sql_refresh_mat_view) +#identify the appropriate pullers based on deployment +dep = os.environ.get("DEPLOYMENT", "PROD") +filtered_pullers = [ + key for key, facts in PULLERS.items() if dep in facts['deployments'] +] - pull_layer.partial(conn_id = get_conn_id(puller_details)).expand(layer = get_layers(puller_details))() #>> agg_layer() +for puller in filtered_pullers: + DAG_NAME = 'gcc_pull_layers' + DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) - for puller in get_pullers(): - pull_and_agg_layers(puller_details = puller) + DEFAULT_ARGS = { + 'owner': ','.join(DAG_OWNERS), + 'depends_on_past': False, + 'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"), + 'email_on_failure': False, + 'retries': 0, + #'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) + } -gcc_layers_dag() \ No newline at end of file + dag_name = f"{DAG_NAME}_{puller}" + globals()[dag_name] = ( + create_gcc_puller_dag( + dag_id=dag_name, + default_args=DEFAULT_ARGS, + name=puller, + conn_id=PULLERS[puller]['conn'], + ) + ) \ No newline at end of file From 3291d86b702c352bff5a310037ad51d821f8a7cd Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 9 Aug 2024 12:31:54 -0400 Subject: [PATCH 3/7] #1032 move primary key to airflow var --- dags/gcc_layers_pull.py | 55 +++++++++++------------ gis/gccview/gcc_puller_functions.py | 67 +++++++---------------------- 2 files changed, 44 insertions(+), 78 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index a4985c078..af1b649a5 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -19,7 +19,7 @@ # the DAG runs at 7 am on the first day of January, April, July, and October -def create_gcc_puller_dag(dag_id, default_args, name, conn_id): +def create_gcc_puller_dag(dag_id, default_args, name, conn_id): @dag( dag_id=dag_id, default_args=default_args, @@ -32,34 +32,35 @@ def gcc_layers_dag(): @task() def get_layers(name): tables = Variable.get('gcc_layers', deserialize_json=True) - return tables #[name] - - @task_group() - def pull_and_agg_layers(layer, conn_id): - - @task() #map_index_template="{{ table_name }}" - def pull_layer(layer, conn_id): - #name mapped task - #context = get_current_context() - #context["table_name"] = layer - print(layer) - print(conn_id) - #conn = PostgresHook(conn_id).get_conn() - #get_layer(layer, layer.items(), conn) - - #if layer in ['centreline', 'intersection']: - # @task - # def agg_layer(): - # sql_refresh_mat_view = sql.SQL("SELECT {function_name}()").format( - # function_name=sql.Identifier('gis', f'refresh_mat_view_{layer}_version_date') - # ) - # with con.cursor() as cur: - # cur.execute(sql_refresh_mat_view) + return tables[name] + + @task(map_index_template="{{ table_name }}") + def pull_layer(layer, conn_id): + #name mapped task + context = get_current_context() + context["table_name"] = layer[0] + #get db connection + conn = PostgresHook(conn_id).get_conn() - pull_layer(layer, conn_id) + #pull and insert layer + get_layer( + mapserver_n = layer[1].get("mapserver"), + layer_id = layer[1].get("layer_id"), + schema_name = layer[1].get("schema_name"), + is_audited = layer[1].get("is_audited"), + pk = layer[1].get("pk"), + con = conn + ) + + #refresh mat views as necessary + agg_sql = layer[1].get("agg") + if agg_sql is not None: + with conn.cursor() as cur: + cur.execute(agg_sql) layers = get_layers(name) - #pull_and_agg_layers.override(group_id = f"{name}_pull").partial(conn_id = conn_id).expand(layer = layers) + pull_layer.partial(conn_id = conn_id).expand(layer = layers) + generated_dag = gcc_layers_dag() return generated_dag @@ -94,4 +95,4 @@ def pull_layer(layer, conn_id): name=puller, conn_id=PULLERS[puller]['conn'], ) - ) \ No newline at end of file + ) diff --git a/gis/gccview/gcc_puller_functions.py b/gis/gccview/gcc_puller_functions.py index 9aadd339b..38d1698f7 100644 --- a/gis/gccview/gcc_puller_functions.py +++ b/gis/gccview/gcc_puller_functions.py @@ -16,52 +16,6 @@ LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) -#------------------------------------------------------------------------------------------------------- -pk_dict = { - "city_ward": "area_id", - "census_tract": "area_id", - "cycling_infrastructure": "objectid", - "neighbourhood_improvement_area": "area_id", - "priority_neighbourhood_for_investment": "area_id", - "ibms_district": "area_id", - "ibms_grid": "area_id", - "bikeway": "centreline_id", - "traffic_camera": "rec_id", - "permit_parking_area": "area_long_code", - "prai_transit_shelter": "id", - "traffic_bylaw_point": "objectid", - "traffic_bylaw_line": "objectid", - "loop_detector": "id", - "electrical_vehicle_charging_station": "id", - "day_care_centre": "loc_id", - "middle_childcare_centre": "id", - "business_improvement_area": "area_id", - "proposed_business_improvement_area": "objectid", - "film_permit_all": "objectid", - "film_permit_parking_all": "objectid", - "hotel": "id", - "convenience_store": "objectid", - "supermarket": "objectid", - "place_of_worship": "objectid", - "ymca": "objectid", - "aboriginal_organization": "id", - "attraction": "objectid", - "dropin": "objectid", - "early_years_centre": "id", - "family_resource_centre": "objectid", - "food_bank": "objectid", - "longterm_care": "id", - "parenting_family_literacy": "id", - "retirement_home": "id", - "senior_housing": "objectid", - "shelter": "objectid", - "social_housing": "objectid", - "private_road": "objectid", - "school": "objectid", - "library": "id", - "pavement_asset": "objectid", -} -#------------------------------------------------------------------------------------------------------- def mapserver_name(mapserver_n): """ Function to return the mapserver name from integer @@ -596,7 +550,7 @@ def update_table(output_table, insert_column, excluded_column, primary_key, sche return successful_execution #------------------------------------------------------------------------------------------------------- # base main function, also compatible with Airflow -def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None): +def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = None, primary_key = None): """ This function calls to the GCCview rest API and inserts the outputs to the output table in the postgres database. @@ -639,8 +593,10 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = LOGGER.error("Invalid mapserver and/or layer Id") return #-------------------------------- - if is_audited: - primary_key = pk_dict.get(output_table) + if is_audited and primary_key is None: + LOGGER.error("Audited tables should have a primary key.") + if not(is_audited) and primary_key is not None: + LOGGER.error("Non-audited tables do not use the primary key.") #-------------------------------- keep_adding = True counter = 0 @@ -699,9 +655,11 @@ def get_layer(mapserver_n, layer_id, schema_name, is_audited, cred = None, con = , help = 'Name of destination schema') @click.option('--is-audited', '-a', is_flag=True, show_default=True, default=False, help = 'Whether the table is supposed to be audited (T) or partitioned (F)') +@click.option('--primary-key', '-pk', type = str, default=None, required = False, + help = 'Primary key. Only include if table is audited.') @click.option('--con', '-c', type = str, required = True, help = 'The path to the credential config file') -def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): +def manual_get_layer(mapserver, layer_id, schema_name, is_audited, primary_key, con): """ This script pulls a GIS layer from GCC servers into the databases of the Data and Analytics Unit. @@ -715,7 +673,14 @@ def manual_get_layer(mapserver, layer_id, schema_name, is_audited, con): dbset = CONFIG['DBSETTINGS'] connection_obj = connect(**dbset) # get_layer function - get_layer(mapserver, layer_id, schema_name, is_audited, con=connection_obj) + get_layer( + mapserver_n = mapserver, + layer_id = layer_id, + schema_name = schema_name, + is_audited = is_audited, + primary_key = primary_key, + con=connection_obj + ) if __name__ == '__main__': manual_get_layer() From 73194094deada68b5a40d3a39c8af0fe99ca5b07 Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 9 Aug 2024 13:43:27 -0400 Subject: [PATCH 4/7] #1032 revert commented out sections for PR --- dags/gcc_layers_pull.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index af1b649a5..01d601432 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -4,7 +4,7 @@ from functools import partial import pendulum -from airflow.decorators import dag, task, task_group +from airflow.decorators import dag, task from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from airflow.operators.python import get_current_context @@ -34,21 +34,21 @@ def get_layers(name): tables = Variable.get('gcc_layers', deserialize_json=True) return tables[name] - @task(map_index_template="{{ table_name }}") + @task() #add after 2.9.3 upgrade: map_index_template="{{ table_name }}" def pull_layer(layer, conn_id): - #name mapped task - context = get_current_context() - context["table_name"] = layer[0] + #name mapped task (implement after 2.9.3 upgrade) + #context = get_current_context() + #context["table_name"] = layer[0] #get db connection conn = PostgresHook(conn_id).get_conn() - + #pull and insert layer get_layer( mapserver_n = layer[1].get("mapserver"), layer_id = layer[1].get("layer_id"), schema_name = layer[1].get("schema_name"), is_audited = layer[1].get("is_audited"), - pk = layer[1].get("pk"), + primary_key = layer[1].get("pk"), con = conn ) @@ -56,6 +56,7 @@ def pull_layer(layer, conn_id): agg_sql = layer[1].get("agg") if agg_sql is not None: with conn.cursor() as cur: + print(agg_sql) cur.execute(agg_sql) layers = get_layers(name) @@ -84,7 +85,7 @@ def pull_layer(layer, conn_id): 'start_date': pendulum.datetime(2022, 11, 3, tz="America/Toronto"), 'email_on_failure': False, 'retries': 0, - #'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) + 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) } dag_name = f"{DAG_NAME}_{puller}" From 19bb95518487379b5ddbe2de238daabd9fd2fdea Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 9 Aug 2024 14:56:56 -0400 Subject: [PATCH 5/7] #1032 add gcc_dags var to test_dags --- dags/gcc_layers_pull.py | 14 +++++++------- test/integration/test_dags.py | 9 +++++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index 01d601432..7ed7f5b0c 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -67,15 +67,15 @@ def pull_layer(layer, conn_id): return generated_dag #get puller details from airflow variable -PULLERS = Variable.get('gcc_pullers', deserialize_json=True) +DAGS = Variable.get('gcc_dags', deserialize_json=True) #identify the appropriate pullers based on deployment dep = os.environ.get("DEPLOYMENT", "PROD") -filtered_pullers = [ - key for key, facts in PULLERS.items() if dep in facts['deployments'] +filtered_dags = [ + key for key, facts in DAGS.items() if dep in facts['deployments'] ] -for puller in filtered_pullers: +for dag in filtered_dags: DAG_NAME = 'gcc_pull_layers' DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) @@ -88,12 +88,12 @@ def pull_layer(layer, conn_id): 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) } - dag_name = f"{DAG_NAME}_{puller}" + dag_name = f"{DAG_NAME}_{dag}" globals()[dag_name] = ( create_gcc_puller_dag( dag_id=dag_name, default_args=DEFAULT_ARGS, - name=puller, - conn_id=PULLERS[puller]['conn'], + name=dag, + conn_id=DAGS[dag]['conn'], ) ) diff --git a/test/integration/test_dags.py b/test/integration/test_dags.py index 2f38610a7..2cdbeae20 100644 --- a/test/integration/test_dags.py +++ b/test/integration/test_dags.py @@ -25,10 +25,11 @@ 'AIRFLOW_VAR_SLACK_MEMBER_ID': '{"var": "value"}', 'AIRFLOW_VAR_SSZ_SPREADSHEET_IDS': "{"+",".join([f'"ssz{year}": "value"' for year in range(2018, datetime.now().year+1)])+"}", 'AIRFLOW_VAR_COLLISIONS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 2)])+"]", - 'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]", - 'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", - 'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}', - 'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]" + 'AIRFLOW_VAR_COUNTS_TABLES': "["+",".join([f'["src_schema.table_{i}", "dst_schema.table_{i}"]' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_HERE_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_REPLICATORS': '{"dag": {"dag_name": "value", "tables": "value", "conn": "value"}}', + 'AIRFLOW_VAR_TEST_DAG_TRIGGERS': "["+",".join([f'"dag_{i}"' for i in range(0, 3)])+"]", + 'AIRFLOW_VAR_GCC_DAGS': '{"dag": {"conn": "value", "deployments": ["value"]}}', } SAMPLE_CONN = Connection( conn_type="sample_type", From bd7968dba965658fd1a97a1d81237845a7f75895 Mon Sep 17 00:00:00 2001 From: Gabe Wolofsky <80077912+gabrielwol@users.noreply.github.com> Date: Wed, 21 Aug 2024 17:05:46 -0400 Subject: [PATCH 6/7] #1032 don't use keyword as iterator --- dags/gcc_layers_pull.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index 7ed7f5b0c..fd8ea4f87 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -28,7 +28,7 @@ def create_gcc_puller_dag(dag_id, default_args, name, conn_id): schedule='0 7 1 */3 *' #'@quarterly' ) def gcc_layers_dag(): - + @task() def get_layers(name): tables = Variable.get('gcc_layers', deserialize_json=True) @@ -75,7 +75,7 @@ def pull_layer(layer, conn_id): key for key, facts in DAGS.items() if dep in facts['deployments'] ] -for dag in filtered_dags: +for item in filtered_dags: DAG_NAME = 'gcc_pull_layers' DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"]) @@ -88,12 +88,12 @@ def pull_layer(layer, conn_id): 'on_failure_callback': partial(task_fail_slack_alert, use_proxy=True) } - dag_name = f"{DAG_NAME}_{dag}" + dag_name = f"{DAG_NAME}_{item}" globals()[dag_name] = ( create_gcc_puller_dag( dag_id=dag_name, default_args=DEFAULT_ARGS, - name=dag, - conn_id=DAGS[dag]['conn'], + name=dag_name, + conn_id=DAGS[item]['conn'], ) ) From e181e5db73b3a47a07f2a531b3833a3dc09fae28 Mon Sep 17 00:00:00 2001 From: gabrielwol <80077912+gabrielwol@users.noreply.github.com> Date: Fri, 23 Aug 2024 09:25:23 -0400 Subject: [PATCH 7/7] #1032 remove unneceesary print --- dags/gcc_layers_pull.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dags/gcc_layers_pull.py b/dags/gcc_layers_pull.py index fd8ea4f87..17174cf8c 100644 --- a/dags/gcc_layers_pull.py +++ b/dags/gcc_layers_pull.py @@ -56,7 +56,6 @@ def pull_layer(layer, conn_id): agg_sql = layer[1].get("agg") if agg_sql is not None: with conn.cursor() as cur: - print(agg_sql) cur.execute(agg_sql) layers = get_layers(name)