-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(ingest/snowfle,bigquery): Stateful time window ingestion for queries v2 with bucket alignment #15040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/snowfle,bigquery): Stateful time window ingestion for queries v2 with bucket alignment #15040
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| from datahub.ingestion.source.state.stateful_ingestion_base import ( | ||
| StatefulLineageConfigMixin, | ||
| StatefulProfilingConfigMixin, | ||
| StatefulTimeWindowConfigMixin, | ||
| StatefulUsageConfigMixin, | ||
| ) | ||
| from datahub.ingestion.source.usage.usage_common import BaseUsageConfig | ||
|
|
@@ -271,6 +272,7 @@ class BigQueryV2Config( | |
| SQLCommonConfig, | ||
| StatefulUsageConfigMixin, | ||
| StatefulLineageConfigMixin, | ||
| StatefulTimeWindowConfigMixin, | ||
| StatefulProfilingConfigMixin, | ||
| ClassificationSourceConfigMixin, | ||
| ): | ||
|
|
@@ -527,6 +529,20 @@ def validate_upstream_lineage_in_report(cls, v: bool, values: Dict) -> bool: | |
|
|
||
| return v | ||
|
|
||
| @root_validator(pre=False, skip_on_failure=True) | ||
| def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict: | ||
| if values.get("use_queries_v2"): | ||
| if values.get("enable_stateful_lineage_ingestion") or values.get( | ||
| "enable_stateful_usage_ingestion" | ||
| ): | ||
| logger.warning( | ||
| "enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated " | ||
| "when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. " | ||
| "For queries v2, use enable_stateful_time_window instead to enable stateful ingestion " | ||
| "for the unified time window extraction (lineage + usage + operations + queries)." | ||
|
Comment on lines
+539
to
+542
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False |
||
| ) | ||
| return values | ||
|
|
||
| def get_table_pattern(self, pattern: List[str]) -> str: | ||
| return "|".join(pattern) if pattern else "" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,9 @@ | |
| BigQueryFilter, | ||
| BigQueryIdentifierBuilder, | ||
| ) | ||
| from datahub.ingestion.source.state.redundant_run_skip_handler import ( | ||
| RedundantQueriesRunSkipHandler, | ||
| ) | ||
| from datahub.ingestion.source.usage.usage_common import BaseUsageConfig | ||
| from datahub.metadata.urns import CorpUserUrn | ||
| from datahub.sql_parsing.schema_resolver import SchemaResolver | ||
|
|
@@ -135,6 +138,7 @@ def __init__( | |
| structured_report: SourceReport, | ||
| filters: BigQueryFilter, | ||
| identifiers: BigQueryIdentifierBuilder, | ||
| redundant_run_skip_handler: Optional[RedundantQueriesRunSkipHandler] = None, | ||
| graph: Optional[DataHubGraph] = None, | ||
| schema_resolver: Optional[SchemaResolver] = None, | ||
| discovered_tables: Optional[Collection[str]] = None, | ||
|
|
@@ -158,6 +162,9 @@ def __init__( | |
| ) | ||
|
|
||
| self.structured_report = structured_report | ||
| self.redundant_run_skip_handler = redundant_run_skip_handler | ||
|
|
||
| self.start_time, self.end_time = self._get_time_window() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we are not tracking them already, we should log or add to the report these effective start/end time
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We log it |
||
|
|
||
| self.aggregator = SqlParsingAggregator( | ||
| platform=self.identifiers.platform, | ||
|
|
@@ -172,8 +179,8 @@ def __init__( | |
| generate_query_usage_statistics=self.config.include_query_usage_statistics, | ||
| usage_config=BaseUsageConfig( | ||
| bucket_duration=self.config.window.bucket_duration, | ||
| start_time=self.config.window.start_time, | ||
| end_time=self.config.window.end_time, | ||
| start_time=self.start_time, | ||
| end_time=self.end_time, | ||
| user_email_pattern=self.config.user_email_pattern, | ||
| top_n_queries=self.config.top_n_queries, | ||
| ), | ||
|
|
@@ -199,6 +206,34 @@ def local_temp_path(self) -> pathlib.Path: | |
| logger.info(f"Using local temp path: {path}") | ||
| return path | ||
|
|
||
| def _get_time_window(self) -> tuple[datetime, datetime]: | ||
| if self.redundant_run_skip_handler: | ||
| start_time, end_time = ( | ||
| self.redundant_run_skip_handler.suggest_run_time_window( | ||
| self.config.window.start_time, | ||
| self.config.window.end_time, | ||
| ) | ||
| ) | ||
| else: | ||
| start_time = self.config.window.start_time | ||
| end_time = self.config.window.end_time | ||
|
|
||
| # Usage statistics are aggregated per bucket (typically per day). | ||
| # To ensure accurate aggregated metrics, we need to align the start_time | ||
| # to the beginning of a bucket so that we include complete bucket periods. | ||
| if self.config.include_usage_statistics: | ||
| start_time = get_time_bucket(start_time, self.config.window.bucket_duration) | ||
|
|
||
| return start_time, end_time | ||
|
|
||
| def _update_state(self) -> None: | ||
| if self.redundant_run_skip_handler: | ||
| self.redundant_run_skip_handler.update_state( | ||
| self.config.window.start_time, | ||
| self.config.window.end_time, | ||
| self.config.window.bucket_duration, | ||
| ) | ||
|
|
||
| def is_temp_table(self, name: str) -> bool: | ||
| try: | ||
| table = BigqueryTableIdentifier.from_string_name(name) | ||
|
|
@@ -299,6 +334,8 @@ def get_workunits_internal( | |
| shared_connection.close() | ||
| audit_log_file.unlink(missing_ok=True) | ||
|
|
||
| self._update_state() | ||
|
|
||
| def deduplicate_queries( | ||
| self, queries: FileBackedList[ObservedQuery] | ||
| ) -> FileBackedDict[Dict[int, ObservedQuery]]: | ||
|
|
@@ -355,8 +392,8 @@ def fetch_region_query_log( | |
| query_log_query = _build_enriched_query_log_query( | ||
| project_id=project.id, | ||
| region=region, | ||
| start_time=self.config.window.start_time, | ||
| end_time=self.config.window.end_time, | ||
| start_time=self.start_time, | ||
| end_time=self.end_time, | ||
| ) | ||
|
|
||
| logger.info(f"Fetching query log from BQ Project {project.id} for {region}") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| from datahub.ingestion.source.state.stateful_ingestion_base import ( | ||
| StatefulLineageConfigMixin, | ||
| StatefulProfilingConfigMixin, | ||
| StatefulTimeWindowConfigMixin, | ||
| StatefulUsageConfigMixin, | ||
| ) | ||
| from datahub.ingestion.source.usage.usage_common import BaseUsageConfig | ||
|
|
@@ -199,6 +200,7 @@ class SnowflakeV2Config( | |
| SnowflakeUsageConfig, | ||
| StatefulLineageConfigMixin, | ||
| StatefulUsageConfigMixin, | ||
| StatefulTimeWindowConfigMixin, | ||
| StatefulProfilingConfigMixin, | ||
| ClassificationSourceConfigMixin, | ||
| IncrementalPropertiesConfigMixin, | ||
|
|
@@ -477,6 +479,20 @@ def validate_shares( | |
|
|
||
| return shares | ||
|
|
||
| @root_validator(pre=False, skip_on_failure=True) | ||
| def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict: | ||
|
Comment on lines
+482
to
+483
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI I'm replacing these v1 validators in #15057 |
||
| if values.get("use_queries_v2"): | ||
| if values.get("enable_stateful_lineage_ingestion") or values.get( | ||
| "enable_stateful_usage_ingestion" | ||
| ): | ||
| logger.warning( | ||
| "enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated " | ||
| "when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. " | ||
| "For queries v2, use enable_stateful_time_window instead to enable stateful ingestion " | ||
| "for the unified time window extraction (lineage + usage + operations + queries)." | ||
|
Comment on lines
+489
to
+492
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False |
||
| ) | ||
| return values | ||
|
|
||
| def outbounds(self) -> Dict[str, Set[DatabaseId]]: | ||
| """ | ||
| Returns mapping of | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI I'm replacing these v1 validators in #15057