Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
RedundantQueriesRunSkipHandler,
RedundantUsageRunSkipHandler,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -145,7 +146,10 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
redundant_lineage_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = (
None
)
if self.config.enable_stateful_lineage_ingestion:
if (
self.config.enable_stateful_lineage_ingestion
and not self.config.use_queries_v2
):
redundant_lineage_run_skip_handler = RedundantLineageRunSkipHandler(
source=self,
config=self.config,
Expand Down Expand Up @@ -296,6 +300,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
):
return

redundant_queries_run_skip_handler: Optional[
RedundantQueriesRunSkipHandler
] = None
if self.config.enable_stateful_time_window:
redundant_queries_run_skip_handler = RedundantQueriesRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

with (
self.report.new_stage(f"*: {QUERIES_EXTRACTION}"),
BigQueryQueriesExtractor(
Expand All @@ -315,6 +330,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_queries_run_skip_handler,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
StatefulProfilingConfigMixin,
StatefulTimeWindowConfigMixin,
StatefulUsageConfigMixin,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
Expand Down Expand Up @@ -271,6 +272,7 @@ class BigQueryV2Config(
SQLCommonConfig,
StatefulUsageConfigMixin,
StatefulLineageConfigMixin,
StatefulTimeWindowConfigMixin,
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
):
Expand Down Expand Up @@ -527,6 +529,20 @@ def validate_upstream_lineage_in_report(cls, v: bool, values: Dict) -> bool:

return v

@root_validator(pre=False, skip_on_failure=True)
def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict:
Comment on lines +532 to +533
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm replacing these v1 validators in #15057

if values.get("use_queries_v2"):
if values.get("enable_stateful_lineage_ingestion") or values.get(
"enable_stateful_usage_ingestion"
):
logger.warning(
"enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated "
"when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. "
"For queries v2, use enable_stateful_time_window instead to enable stateful ingestion "
"for the unified time window extraction (lineage + usage + operations + queries)."
Comment on lines +539 to +542
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False

)
return values

def get_table_pattern(self, pattern: List[str]) -> str:
return "|".join(pattern) if pattern else ""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
BigQueryFilter,
BigQueryIdentifierBuilder,
)
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantQueriesRunSkipHandler,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
Expand Down Expand Up @@ -135,6 +138,7 @@ def __init__(
structured_report: SourceReport,
filters: BigQueryFilter,
identifiers: BigQueryIdentifierBuilder,
redundant_run_skip_handler: Optional[RedundantQueriesRunSkipHandler] = None,
graph: Optional[DataHubGraph] = None,
schema_resolver: Optional[SchemaResolver] = None,
discovered_tables: Optional[Collection[str]] = None,
Expand All @@ -158,6 +162,9 @@ def __init__(
)

self.structured_report = structured_report
self.redundant_run_skip_handler = redundant_run_skip_handler

self.start_time, self.end_time = self._get_time_window()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are not tracking them already, we should log or add to the report these effective start/end time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We log it


self.aggregator = SqlParsingAggregator(
platform=self.identifiers.platform,
Expand All @@ -172,8 +179,8 @@ def __init__(
generate_query_usage_statistics=self.config.include_query_usage_statistics,
usage_config=BaseUsageConfig(
bucket_duration=self.config.window.bucket_duration,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
start_time=self.start_time,
end_time=self.end_time,
user_email_pattern=self.config.user_email_pattern,
top_n_queries=self.config.top_n_queries,
),
Expand All @@ -199,6 +206,34 @@ def local_temp_path(self) -> pathlib.Path:
logger.info(f"Using local temp path: {path}")
return path

def _get_time_window(self) -> tuple[datetime, datetime]:
if self.redundant_run_skip_handler:
start_time, end_time = (
self.redundant_run_skip_handler.suggest_run_time_window(
self.config.window.start_time,
self.config.window.end_time,
)
)
else:
start_time = self.config.window.start_time
end_time = self.config.window.end_time

# Usage statistics are aggregated per bucket (typically per day).
# To ensure accurate aggregated metrics, we need to align the start_time
# to the beginning of a bucket so that we include complete bucket periods.
if self.config.include_usage_statistics:
start_time = get_time_bucket(start_time, self.config.window.bucket_duration)

return start_time, end_time

def _update_state(self) -> None:
if self.redundant_run_skip_handler:
self.redundant_run_skip_handler.update_state(
self.config.window.start_time,
self.config.window.end_time,
self.config.window.bucket_duration,
)

def is_temp_table(self, name: str) -> bool:
try:
table = BigqueryTableIdentifier.from_string_name(name)
Expand Down Expand Up @@ -299,6 +334,8 @@ def get_workunits_internal(
shared_connection.close()
audit_log_file.unlink(missing_ok=True)

self._update_state()

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> FileBackedDict[Dict[int, ObservedQuery]]:
Expand Down Expand Up @@ -355,8 +392,8 @@ def fetch_region_query_log(
query_log_query = _build_enriched_query_log_query(
project_id=project.id,
region=region,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
start_time=self.start_time,
end_time=self.end_time,
)

logger.info(f"Fetching query log from BQ Project {project.id} for {region}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
StatefulProfilingConfigMixin,
StatefulTimeWindowConfigMixin,
StatefulUsageConfigMixin,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
Expand Down Expand Up @@ -199,6 +200,7 @@ class SnowflakeV2Config(
SnowflakeUsageConfig,
StatefulLineageConfigMixin,
StatefulUsageConfigMixin,
StatefulTimeWindowConfigMixin,
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
IncrementalPropertiesConfigMixin,
Expand Down Expand Up @@ -477,6 +479,20 @@ def validate_shares(

return shares

@root_validator(pre=False, skip_on_failure=True)
def validate_queries_v2_stateful_ingestion(cls, values: Dict) -> Dict:
Comment on lines +482 to +483
Copy link
Contributor

@sgomezvillamor sgomezvillamor Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I'm replacing these v1 validators in #15057

if values.get("use_queries_v2"):
if values.get("enable_stateful_lineage_ingestion") or values.get(
"enable_stateful_usage_ingestion"
):
logger.warning(
"enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion are deprecated "
"when using use_queries_v2=True. These configs only work with the legacy (non-queries v2) extraction path. "
"For queries v2, use enable_stateful_time_window instead to enable stateful ingestion "
"for the unified time window extraction (lineage + usage + operations + queries)."
Comment on lines +489 to +492
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additionally, docs for enable_stateful_lineage_ingestion and enable_stateful_usage_ingestion should be updated to mention that they only work with use_queries_v2=False

)
return values

def outbounds(self) -> Dict[str, Set[DatabaseId]]:
"""
Returns mapping of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
BucketDuration,
get_time_bucket,
)
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -50,6 +51,9 @@
StoredProcLineageReport,
StoredProcLineageTracker,
)
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantQueriesRunSkipHandler,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.metadata.urns import CorpUserUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
Expand Down Expand Up @@ -180,6 +184,7 @@ def __init__(
structured_report: SourceReport,
filters: SnowflakeFilter,
identifiers: SnowflakeIdentifierBuilder,
redundant_run_skip_handler: Optional[RedundantQueriesRunSkipHandler] = None,
graph: Optional[DataHubGraph] = None,
schema_resolver: Optional[SchemaResolver] = None,
discovered_tables: Optional[List[str]] = None,
Expand All @@ -191,9 +196,13 @@ def __init__(
self.filters = filters
self.identifiers = identifiers
self.discovered_tables = set(discovered_tables) if discovered_tables else None
self.redundant_run_skip_handler = redundant_run_skip_handler

self._structured_report = structured_report

# Adjust time window based on stateful ingestion state
self.start_time, self.end_time = self._get_time_window()

# The exit stack helps ensure that we close all the resources we open.
self._exit_stack = contextlib.ExitStack()

Expand All @@ -211,8 +220,8 @@ def __init__(
generate_query_usage_statistics=self.config.include_query_usage_statistics,
usage_config=BaseUsageConfig(
bucket_duration=self.config.window.bucket_duration,
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
start_time=self.start_time,
end_time=self.end_time,
user_email_pattern=self.config.user_email_pattern,
# TODO make the rest of the fields configurable
),
Expand All @@ -228,6 +237,34 @@ def __init__(
def structured_reporter(self) -> SourceReport:
return self._structured_report

def _get_time_window(self) -> tuple[datetime, datetime]:
if self.redundant_run_skip_handler:
start_time, end_time = (
self.redundant_run_skip_handler.suggest_run_time_window(
self.config.window.start_time,
self.config.window.end_time,
)
)
else:
start_time = self.config.window.start_time
end_time = self.config.window.end_time

# Usage statistics are aggregated per bucket (typically per day).
# To ensure accurate aggregated metrics, we need to align the start_time
# to the beginning of a bucket so that we include complete bucket periods.
if self.config.include_usage_statistics:
start_time = get_time_bucket(start_time, self.config.window.bucket_duration)

return start_time, end_time

def _update_state(self) -> None:
if self.redundant_run_skip_handler:
self.redundant_run_skip_handler.update_state(
self.config.window.start_time,
self.config.window.end_time,
self.config.window.bucket_duration,
)

@functools.cached_property
def local_temp_path(self) -> pathlib.Path:
if self.config.local_temp_path:
Expand Down Expand Up @@ -355,6 +392,9 @@ def get_workunits_internal(
with self.report.aggregator_generate_timer:
yield from auto_workunit(self.aggregator.gen_metadata())

# Update the stateful ingestion state after successful extraction
self._update_state()

def fetch_users(self) -> UsersMapping:
users: UsersMapping = dict()
with self.structured_reporter.report_exc("Error fetching users from Snowflake"):
Expand All @@ -378,8 +418,8 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]:
# Derived from _populate_external_lineage_from_copy_history.

query: str = SnowflakeQuery.copy_lineage_history(
start_time_millis=int(self.config.window.start_time.timestamp() * 1000),
end_time_millis=int(self.config.window.end_time.timestamp() * 1000),
start_time_millis=int(self.start_time.timestamp() * 1000),
end_time_millis=int(self.end_time.timestamp() * 1000),
downstreams_deny_pattern=self.config.temporary_tables_pattern,
)

Expand Down Expand Up @@ -414,8 +454,8 @@ def fetch_query_log(
Union[PreparsedQuery, TableRename, TableSwap, ObservedQuery, StoredProcCall]
]:
query_log_query = QueryLogQueryBuilder(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
start_time=self.start_time,
end_time=self.end_time,
bucket_duration=self.config.window.bucket_duration,
deny_usernames=self.config.pushdown_deny_usernames,
allow_usernames=self.config.pushdown_allow_usernames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantLineageRunSkipHandler,
RedundantQueriesRunSkipHandler,
RedundantUsageRunSkipHandler,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -207,7 +208,7 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
)
self.report.sql_aggregator = self.aggregator.report

if self.config.include_table_lineage:
if self.config.include_table_lineage and not self.config.use_queries_v2:
redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
] = None
Expand Down Expand Up @@ -589,6 +590,17 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
schema_resolver = self.aggregator._schema_resolver

redundant_queries_run_skip_handler: Optional[
RedundantQueriesRunSkipHandler
] = None
if self.config.enable_stateful_time_window:
redundant_queries_run_skip_handler = RedundantQueriesRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

queries_extractor = SnowflakeQueriesExtractor(
connection=self.connection,
# TODO: this should be its own section in main recipe
Expand All @@ -614,6 +626,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
redundant_run_skip_handler=redundant_queries_run_skip_handler,
schema_resolver=schema_resolver,
discovered_tables=self.discovered_datasets,
graph=self.ctx.graph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,24 @@ def update_state(
cur_state.begin_timestamp_millis = datetime_to_ts_millis(start_time)
cur_state.end_timestamp_millis = datetime_to_ts_millis(end_time)
cur_state.bucket_duration = bucket_duration


class RedundantQueriesRunSkipHandler(RedundantRunSkipHandler):
"""
Handler for stateful ingestion of queries v2 extraction.
Manages the time window for audit log extraction that combines
lineage, usage, operations, and queries.
"""

def get_job_name_suffix(self):
return "_audit_window"

def update_state(
self, start_time: datetime, end_time: datetime, bucket_duration: BucketDuration
) -> None:
cur_checkpoint = self.get_current_checkpoint()
if cur_checkpoint:
cur_state = cast(BaseTimeWindowCheckpointState, cur_checkpoint.state)
cur_state.begin_timestamp_millis = datetime_to_ts_millis(start_time)
cur_state.end_timestamp_millis = datetime_to_ts_millis(end_time)
cur_state.bucket_duration = bucket_duration
Loading
Loading