Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/dags/dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
source:
name: "dev_ingestion_6"
source_type: TTD
ingest_schedule: "@daily"
external_dag_id: null
start_date: "2021-01-01"
acceptable_delay_minutes: 5
gcp_project: dev-eyereturn-data-warehouse
dataset_data_name: ttd
connection: google_cloud_default
extra_options:
ttd:
partner_id: "2ui2ov3"
notification_emails:
- ''
owner: owner
version: 1
landing_zone_options:
landing_zone_dataset: staging_zone
schema_options:
schema_source_type: AUTO
location: US
tables:
- table_name: data_elements_performance
surrogate_keys: ['Date','Advertiser_ID','Campaign_ID','Ad_Group','Data_Element','metric_3rd_Party_Data_Brand','Tracking_Tag_ID']
ingestion_type: INCREMENTAL
extra_options:
ttd:
report_template_id: 1652867
dataform_options:
environment:
schedule:
tags:
- table_name: basic_stats
surrogate_keys: ['Date','Advertiser_ID','Campaign_ID','Ad_Group_ID','Campaign_Flight_ID','Ad_Group_Campaign_Flight_ID','Ad_Format','Creative']
ingestion_type: INCREMENTAL
extra_options:
ttd:
report_template_id: 1654646
26 changes: 26 additions & 0 deletions gcp_airflow_foundations/base_class/dataform_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from airflow.exceptions import AirflowException

from dacite import Config
from dataclasses import dataclass, field

from datetime import datetime
from typing import List, Optional


@dataclass
class DataformConfig:
"""
Dataform configuration class.

Attributes:
environment: Only production is currently supported.
schedule: Schedules can be used to run any section of a dataset at a user-defined frequency.
tags: Dataform operation tags

Note: the project_id is provided in the Airflow connection for Dataform.

"""

environment: str
schedule: str
tags: List[str]
4 changes: 3 additions & 1 deletion gcp_airflow_foundations/base_class/source_table_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from gcp_airflow_foundations.enums.ingestion_type import IngestionType
from gcp_airflow_foundations.enums.hds_table_type import HdsTableType
from gcp_airflow_foundations.enums.time_partitioning import TimePartitioning

from gcp_airflow_foundations.base_class.dataform_config import DataformConfig
from gcp_airflow_foundations.base_class.ods_table_config import OdsTableConfig
from gcp_airflow_foundations.base_class.hds_table_config import HdsTableConfig
from gcp_airflow_foundations.base_class.facebook_table_config import FacebookTableConfig
Expand All @@ -32,6 +32,7 @@ class SourceTableConfig:
column_mapping : Mapping used to rename columns.
ods_config : ODS table configuration. See :class:`gcp_airflow_foundations.base_class.ods_table_config.OdsTableConfig`.
hds_config : HDS table configuration. See :class:`gcp_airflow_foundations.base_class.hds_table_config.HdsTableConfig`.
dataform_options: Dataform configuration. See :class:`gcp_airflow_foundations.dataform_config.DataformConfig`.
facebook_table_config: Extra options for ingesting data from the Facebook API.
extra_options: Field for storing additional configuration options.
version : The Dag version for the table. Can be incremented if logic changes.
Expand All @@ -46,6 +47,7 @@ class SourceTableConfig:
surrogate_keys: List[str]
column_mapping: Optional[dict]
hds_config: Optional[HdsTableConfig]
dataform_options: Optional[DataformConfig]
facebook_table_config: Optional[FacebookTableConfig]
extra_options: dict = field(default_factory=dict)
ods_config: Optional[OdsTableConfig] = OdsTableConfig(ods_metadata=OdsTableMetadataConfig())
Expand Down
24 changes: 21 additions & 3 deletions gcp_airflow_foundations/common/gcp/load_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from gcp_airflow_foundations.enums.ingestion_type import IngestionType
from gcp_airflow_foundations.enums.hds_table_type import HdsTableType
from gcp_airflow_foundations.operators.gcp.delete_staging_table import BigQueryDeleteStagingTableOperator
from gcp_airflow_foundations.operators.api.operators.dataform_operator import DataformOperator
import logging


Expand Down Expand Up @@ -89,7 +90,24 @@ def load_builder(
dag=dag
)

if hds_task_group:
preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table
# dataform operator
if table_config.dataform_options is not None:
dataform = DataformOperator(
task_id=f"{data_source.name}_dataform",
dataform_conn_id='dataform_default',
environment=table_config.dataform_options.environment,
schedule=table_config.dataform_options.schedule,
tags=table_config.dataform_options.tags,
dag=dag
)

if hds_task_group:
preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table >> dataform
else:
preceding_task >> parse_schema >> ods_task_group >> delete_staging_table >> dataform

else:
preceding_task >> parse_schema >> ods_task_group >> delete_staging_table
if hds_task_group:
preceding_task >> parse_schema >> ods_task_group >> hds_task_group >> delete_staging_table
else:
preceding_task >> parse_schema >> ods_task_group >> delete_staging_table
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from airflow.models.baseoperator import BaseOperator
from dataform_hook import DataformHook
from gcp_airflow_foundations.operators.api.hooks.dataform_hook import DataformHook
from airflow.utils.decorators import apply_defaults
from typing import Any, Optional

Expand Down
6 changes: 3 additions & 3 deletions gcp_airflow_foundations/source_class/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from gcp_airflow_foundations.enums.schema_source_type import SchemaSourceType
from gcp_airflow_foundations.common.gcp.load_builder import load_builder
from gcp_airflow_foundations.source_class.schema_source_config import AutoSchemaSourceConfig, GCSSchemaSourceConfig, BQLandingZoneSchemaSourceConfig

import logging


class DagBuilder(ABC):
"""A base DAG builder for creating a list of DAGs for a given source.

Expand Down Expand Up @@ -61,9 +61,9 @@ def build_dags(self):
render_template_as_native_obj=True
) as dag:

load_to_bq_landing = self.get_bq_ingestion_task(dag, table_config)
load_to_bq_landing = self.get_bq_ingestion_task(dag, table_config) # load data to BigQuery staging table

self.get_datastore_ingestion_task(dag, load_to_bq_landing, data_source, table_config)
self.get_datastore_ingestion_task(dag, load_to_bq_landing, data_source, table_config) # load data to BigQuery ODS/HDS tables

dags.append(dag)

Expand Down