diff --git a/examples/dags/dag.yaml b/examples/dags/dag.yaml new file mode 100644 index 00000000..f3883c1c --- /dev/null +++ b/examples/dags/dag.yaml @@ -0,0 +1,40 @@ +--- +source: + name: "dev_ingestion_6" + source_type: TTD + ingest_schedule: "@daily" + external_dag_id: null + start_date: "2021-01-01" + acceptable_delay_minutes: 5 + gcp_project: dev-eyereturn-data-warehouse + dataset_data_name: ttd + connection: google_cloud_default + extra_options: + ttd: + partner_id: "2ui2ov3" + notification_emails: + - '' + owner: owner + version: 1 + landing_zone_options: + landing_zone_dataset: staging_zone + schema_options: + schema_source_type: AUTO + location: US +tables: + - table_name: data_elements_performance + surrogate_keys: ['Date','Advertiser_ID','Campaign_ID','Ad_Group','Data_Element','metric_3rd_Party_Data_Brand','Tracking_Tag_ID'] + ingestion_type: INCREMENTAL + extra_options: + ttd: + report_template_id: 1652867 + dataform_options: + environment: + schedule: + tags: + - table_name: basic_stats + surrogate_keys: ['Date','Advertiser_ID','Campaign_ID','Ad_Group_ID','Campaign_Flight_ID','Ad_Group_Campaign_Flight_ID','Ad_Format','Creative'] + ingestion_type: INCREMENTAL + extra_options: + ttd: + report_template_id: 1654646 \ No newline at end of file diff --git a/gcp_airflow_foundations/base_class/dataform_config.py b/gcp_airflow_foundations/base_class/dataform_config.py new file mode 100644 index 00000000..1caa6eba --- /dev/null +++ b/gcp_airflow_foundations/base_class/dataform_config.py @@ -0,0 +1,26 @@ +from airflow.exceptions import AirflowException + +from dacite import Config +from dataclasses import dataclass, field + +from datetime import datetime +from typing import List, Optional + + +@dataclass +class DataformConfig: + """ + Dataform configuration class. + + Attributes: + environment: Only production is currently supported. + schedule: Schedules can be used to run any section of a dataset at a user-defined frequency. + tags: Dataform operation tags + + Note: the project_id is provided in the Airflow connection for Dataform. + + """ + + environment: str + schedule: str + tags: List[str] diff --git a/gcp_airflow_foundations/base_class/source_table_config.py b/gcp_airflow_foundations/base_class/source_table_config.py index 0917bc5c..4bd0468a 100644 --- a/gcp_airflow_foundations/base_class/source_table_config.py +++ b/gcp_airflow_foundations/base_class/source_table_config.py @@ -13,7 +13,7 @@ from gcp_airflow_foundations.enums.ingestion_type import IngestionType from gcp_airflow_foundations.enums.hds_table_type import HdsTableType from gcp_airflow_foundations.enums.time_partitioning import TimePartitioning - +from gcp_airflow_foundations.base_class.dataform_config import DataformConfig from gcp_airflow_foundations.base_class.ods_table_config import OdsTableConfig from gcp_airflow_foundations.base_class.hds_table_config import HdsTableConfig from gcp_airflow_foundations.base_class.facebook_table_config import FacebookTableConfig @@ -32,6 +32,7 @@ class SourceTableConfig: column_mapping : Mapping used to rename columns. ods_config : ODS table configuration. See :class:`gcp_airflow_foundations.base_class.ods_table_config.OdsTableConfig`. hds_config : HDS table configuration. See :class:`gcp_airflow_foundations.base_class.hds_table_config.HdsTableConfig`. + dataform_options: Dataform configuration. See :class:`gcp_airflow_foundations.dataform_config.DataformConfig`. facebook_table_config: Extra options for ingesting data from the Facebook API. extra_options: Field for storing additional configuration options. version : The Dag version for the table. Can be incremented if logic changes. @@ -46,6 +47,7 @@ class SourceTableConfig: surrogate_keys: List[str] column_mapping: Optional[dict] hds_config: Optional[HdsTableConfig] + dataform_options: Optional[DataformConfig] facebook_table_config: Optional[FacebookTableConfig] extra_options: dict = field(default_factory=dict) ods_config: Optional[OdsTableConfig] = OdsTableConfig(ods_metadata=OdsTableMetadataConfig()) diff --git a/gcp_airflow_foundations/common/gcp/load_builder.py b/gcp_airflow_foundations/common/gcp/load_builder.py index 81390e2e..b0b09232 100644 --- a/gcp_airflow_foundations/common/gcp/load_builder.py +++ b/gcp_airflow_foundations/common/gcp/load_builder.py @@ -5,6 +5,7 @@ from gcp_airflow_foundations.enums.ingestion_type import IngestionType from gcp_airflow_foundations.enums.hds_table_type import HdsTableType from gcp_airflow_foundations.operators.gcp.delete_staging_table import BigQueryDeleteStagingTableOperator +from gcp_airflow_foundations.operators.api.operators.dataform_operator import DataformOperator import logging @@ -89,7 +90,24 @@ def load_builder( dag=dag ) - if hds_task_group: - preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table + # dataform operator + if table_config.dataform_options is not None: + dataform = DataformOperator( + task_id=f"{data_source.name}_dataform", + dataform_conn_id='dataform_default', + environment=table_config.dataform_options.environment, + schedule=table_config.dataform_options.schedule, + tags=table_config.dataform_options.tags, + dag=dag + ) + + if hds_task_group: + preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table >> dataform + else: + preceding_task >> parse_schema >> ods_task_group >> delete_staging_table >> dataform + else: - preceding_task >> parse_schema >> ods_task_group >> delete_staging_table + if hds_task_group: + preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table + else: + preceding_task >> parse_schema >> ods_task_group >> delete_staging_table \ No newline at end of file diff --git a/gcp_airflow_foundations/operators/api/operators/dataform_operator.py b/gcp_airflow_foundations/operators/api/operators/dataform_operator.py index ac8d9ef0..c332169b 100644 --- a/gcp_airflow_foundations/operators/api/operators/dataform_operator.py +++ b/gcp_airflow_foundations/operators/api/operators/dataform_operator.py @@ -1,5 +1,5 @@ from airflow.models.baseoperator import BaseOperator -from dataform_hook import DataformHook +from gcp_airflow_foundations.operators.api.hooks.dataform_hook import DataformHook from airflow.utils.decorators import apply_defaults from typing import Any, Optional diff --git a/gcp_airflow_foundations/source_class/source.py b/gcp_airflow_foundations/source_class/source.py index 7cc7e7a3..733609dd 100644 --- a/gcp_airflow_foundations/source_class/source.py +++ b/gcp_airflow_foundations/source_class/source.py @@ -8,9 +8,9 @@ from gcp_airflow_foundations.enums.schema_source_type import SchemaSourceType from gcp_airflow_foundations.common.gcp.load_builder import load_builder from gcp_airflow_foundations.source_class.schema_source_config import AutoSchemaSourceConfig, GCSSchemaSourceConfig, BQLandingZoneSchemaSourceConfig - import logging + class DagBuilder(ABC): """A base DAG builder for creating a list of DAGs for a given source. @@ -61,9 +61,9 @@ def build_dags(self): render_template_as_native_obj=True ) as dag: - load_to_bq_landing = self.get_bq_ingestion_task(dag, table_config) + load_to_bq_landing = self.get_bq_ingestion_task(dag, table_config) # load data to BigQuery staging table - self.get_datastore_ingestion_task(dag, load_to_bq_landing, data_source, table_config) + self.get_datastore_ingestion_task(dag, load_to_bq_landing, data_source, table_config) # load data to BigQuery ODS/HDS tables dags.append(dag)