diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index ef43510611bbcc..53c4137348fc97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -228,10 +228,18 @@ def all_lookml_models(self) -> Sequence[LookmlModel]: transport_options=self.transport_options, ) - def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore: + def lookml_model_explore( + self, + model: str, + explore_name: str, + fields: Optional[List[str]] = None, + ) -> LookmlModelExplore: self.client_stats.explore_calls += 1 return self.client.lookml_model_explore( - model, explore_name, transport_options=self.transport_options + model, + explore_name, + fields=self.__fields_mapper(fields) if fields else None, + transport_options=self.transport_options, ) @lru_cache(maxsize=1000) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index c22aa9c011e2f8..a4fcaf7167ae5d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -210,6 +210,33 @@ class LookMLSourceConfig( "All if comments are evaluated to true for configured looker_environment value", ) + field_threshold_for_splitting: int = Field( + 100, + description="When the total number of fields returned by Looker API exceeds this threshold, " + "the fields will be split into multiple API calls to avoid SQL parsing failures. " + "This helps provide partial column and table lineage when dealing with large field sets.", + ) + + allow_partial_lineage_results: bool = Field( + True, + description="When enabled, allows partial lineage results to be returned even when some field chunks " + "fail or when there are SQL parsing errors. This provides better resilience for large field sets " + "and ensures some lineage information is available rather than complete failure.", + ) + + enable_individual_field_fallback: bool = Field( + True, + description="When enabled, if a field chunk fails, the system will attempt to process each field " + "individually to maximize information and isolate problematic fields. This helps identify " + "which specific fields are causing issues while still getting lineage for working fields.", + ) + + max_workers_for_parallel_processing: int = Field( + 10, + description="Maximum number of worker threads to use for parallel processing of field chunks and individual fields. " + "Set to 1 to process everything sequentially. Higher values can improve performance but may increase memory usage.", + ) + @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index eca713efc2d1ab..7b500b2e828b7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -1,7 +1,7 @@ import logging import pathlib import tempfile -from collections import OrderedDict +from collections import OrderedDict, defaultdict from dataclasses import dataclass from datetime import datetime, timezone from typing import Dict, Iterable, List, Optional, Set, Tuple, Union @@ -709,10 +709,14 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: # Value: Tuple(model file name, connection name) view_connection_map: Dict[str, Tuple[str, str]] = {} - # Map of view name to explore name for API-based view lineage - # A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API - # Key: view_name, Value: explore_name - view_to_explore_map: Dict[str, str] = {} + # Map of view name to all possible explores for API-based view lineage + # A view can be referenced by multiple explores, we'll optimize the assignment + # Key: view_name, Value: set of explore_names + view_to_explores: Dict[str, Set[str]] = defaultdict(set) + + # Temporary map to keep track of the views in an explore + # Key: explore_name, Value: set of view_names + explore_to_views: Dict[str, Set[str]] = defaultdict(set) # The ** means "this directory and all subdirectories", and hence should # include all the files we want. @@ -789,8 +793,9 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: for view_name in explore.upstream_views: if self.source_config.emit_reachable_views_only: explore_reachable_views.add(view_name.include) - # Build view to explore mapping for API-based view lineage - view_to_explore_map[view_name.include] = explore.name + + view_to_explores[view_name.include].add(explore.name) + explore_to_views[explore.name].add(view_name.include) except Exception as e: self.reporter.report_warning( title="Failed to process explores", @@ -804,6 +809,16 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: model.connection, set() ) + view_to_explore_map = {} + if view_to_explores and explore_to_views: + view_to_explore_map = self._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + else: + logger.warning( + f"Either view_to_explores: {view_to_explores} or explore_to_views: {explore_to_views} is empty" + ) + project_name = self.get_project_name(model_name) looker_view_id_cache: LookerViewIdCache = LookerViewIdCache( @@ -888,9 +903,7 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: config=self.source_config, ctx=self.ctx, looker_client=self.looker_client, - view_to_explore_map=view_to_explore_map - if view_to_explore_map - else None, + view_to_explore_map=view_to_explore_map, ) except Exception as e: self.reporter.report_warning( @@ -1040,5 +1053,61 @@ def report_skipped_unreachable_views( context=(f"Project: {project}, View File Path: {path}"), ) + def _optimize_views_by_common_explore( + self, + view_to_explores: Dict[str, Set[str]], + explore_to_views: Dict[str, Set[str]], + ) -> Dict[str, str]: + """ + Optimize view-to-explore mapping by grouping views to minimize API calls. + + This uses a greedy algorithm that prioritizes explores that appear in the most views, + maximizing the number of views assigned to the same explore. + + Args: + view_to_explores: Dict mapping view_name -> set of explore_names + explore_to_views: Dict mapping explore_name -> set of view_names + + Returns: + Dict mapping view_name -> explore_name (optimized assignment) + """ + + # Pre-compute explore sizes + explore_sizes = { + explore: len(views) for explore, views in explore_to_views.items() + } + + # Build view-to-explore mapping using dynamic programming approach + view_to_explore: Dict[str, str] = {} + + # For each view, find the explore with maximum size that contains it + for view_name, candidate_explores in view_to_explores.items(): + if candidate_explores: + # Find explore with maximum size using max() with key function + # This assings the view to the explore with the most views that contains it + best_explore = max( + candidate_explores, key=lambda explore: explore_sizes[explore] + ) + view_to_explore[view_name] = best_explore + + # Log optimization results + unique_explores_used = len(set(view_to_explore.values())) + total_views = len(view_to_explore) + total_explores = len(explore_to_views) + + if total_explores > 0: + efficiency = (1 - unique_explores_used / total_explores) * 100 + logger.info( + f"View-explore optimization: Using {unique_explores_used}/{total_explores} " + f"explores for {total_views} views (efficiency: {efficiency:.1f}% savings)" + ) + else: + logger.info( + f"View-explore optimization: No explores to optimize for {total_views} views" + ) + + logger.debug(f"Final View-to-explore mapping: {view_to_explore}") + return view_to_explore + def get_report(self): return self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 07393ab4cc47ec..d774073ab176b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -1,11 +1,14 @@ import logging import re from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from functools import lru_cache -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from looker_sdk.sdk.api40.models import ( + LookmlModelExplore, + LookmlModelExploreField, WriteQuery, ) @@ -43,6 +46,7 @@ from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, + SqlParsingDebugInfo, SqlParsingResult, Urn, create_and_cache_schema_resolver, @@ -293,6 +297,36 @@ def create_upstream_column_refs( return upstream_column_refs + @classmethod + @lru_cache(maxsize=1000) + def get_explore_fields_from_looker_api( + cls, looker_client: "LookerAPI", model_name: str, explore_name: str + ) -> LookmlModelExplore: + """ + Get fields from Looker API for the given explore. + Only queries for the fields we need (dimensions, measures, and dimension groups) + to optimize API performance. + + Note: This is a cached method to optimize API performance since a single explore can have multiple views. + When we associate a view with an explore, we try to associate as many views as possible with the same explore to reduce the number of API calls. + + Returns: + LookmlModelExplore: The explore with the fields. + Raises: + Exception: If there is an error getting the explore from the Looker API. + """ + try: + fields_to_query = ["fields"] + explore: LookmlModelExplore = looker_client.lookml_model_explore( + model_name, + explore_name, + fields=fields_to_query, + ) + return explore + except Exception as e: + logger.error(f"Error getting explore from Looker API: {e}") + raise e + class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): """ @@ -312,12 +346,32 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): methods are provided to convert between Looker API field names and raw field names. - SQL parsing is cached for efficiency, and the class is designed to gracefully fall back if the Looker Query API fails. - All lineage extraction is based on the SQL returned by the Looker API, ensuring accurate and up-to-date lineage. + - **Field Splitting**: When the number of fields exceeds the configured threshold (default: 100), fields are automatically + split into multiple API calls to avoid SQL parsing failures and provide partial lineage for large field sets. + - **Individual Field Fallback**: When a field chunk fails, the system can attempt to process each field individually + to maximize information and isolate problematic fields, helping identify which specific fields cause issues. Why view_to_explore_map is required: The Looker Query API expects the explore name (not the view name) as the "view" parameter in the WriteQuery. In Looker, a view can be referenced by multiple explores, but the API needs any one of the explores to access the view's fields + Field Splitting Behavior: + When a view has more fields than the `field_threshold_for_splitting` configuration value (default: 100), + the class automatically splits the fields into chunks and makes multiple API calls. This helps: + - Avoid SQL parsing failures that occur with very large field sets + - Provide partial column and table lineage even when some field chunks fail + - Improve reliability for views with hundreds of fields + - Maintain performance by processing manageable chunks + + Individual Field Fallback: + When `enable_individual_field_fallback` is enabled (default: True), if a field chunk fails, + the system attempts to process each field individually. This helps: + - Maximize lineage information by processing working fields even when chunks fail + - Isolate problematic fields that cause SQL parsing or API errors + - Identify specific fields that need attention or exclusion + - Provide detailed reporting about which fields are problematic + Example WriteQuery request (see `_execute_query` for details): { "model": "test_model", @@ -331,7 +385,9 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): The SQL response is then parsed to extract upstream tables and column-level lineage. For further details, see the method-level docstrings, especially: - - `__get_spr`: SQL parsing and lineage extraction workflow + - `__get_spr`: SQL parsing and lineage extraction workflow with field splitting + - `_get_spr_with_field_splitting`: Field splitting logic for large field sets + - `_process_individual_fields`: Individual field processing when chunks fail - `_get_sql_write_query`: WriteQuery construction and field enumeration - `_execute_query`: Looker API invocation and SQL retrieval - this only generates the SQL query, does not execute it - Field name translation: `_get_looker_api_field_name` and `_get_field_name_from_looker_api_field_name` @@ -359,7 +415,6 @@ def __init__( self._get_upstream_dataset_urn = lru_cache(maxsize=1)( self.__get_upstream_dataset_urn ) - # Initialize the cache # Done to fallback to other implementations if the Looker Query API fails self._get_spr() @@ -371,6 +426,9 @@ def __get_spr(self) -> SqlParsingResult: 2. Executing the query via the Looker API to get the SQL. 3. Parsing the SQL to extract lineage information. + If the number of fields exceeds the threshold, fields are split into multiple queries + and the results are combined to provide partial lineage. + Returns: SqlParsingResult if successful, otherwise None. Raises: @@ -381,53 +439,513 @@ def __get_spr(self) -> SqlParsingResult: ValueError: If error in parsing SQL for column lineage. """ try: - # Build the WriteQuery for the current view. - sql_query: WriteQuery = self._get_sql_write_query() + # Get fields for the current view + explore_name = self.view_to_explore_map.get(self.view_context.name()) + assert explore_name # Happy linter - # Execute the query to get the SQL representation from Looker. - sql_response = self._execute_query(sql_query) - - # Parse the SQL to extract lineage information. - spr = create_lineage_sql_parsed_result( - query=sql_response, - default_schema=self.view_context.view_connection.default_schema, - default_db=self.view_context.view_connection.default_db, - platform=self.view_context.view_connection.platform, - platform_instance=self.view_context.view_connection.platform_instance, - env=self.view_context.view_connection.platform_env or self.config.env, - graph=self.ctx.graph, + view_fields = self._get_fields_from_looker_api(explore_name) + # TODO: Remove this after debugging + logger.debug( + f"view-name={self.view_context.name()}, total-fields-from-looker-api={len(view_fields)}, total-fields-from-view-context={len(self._get_fields_from_view_context())}" ) + # Compare diff between view fields, if any, print the diff + # Compute set differences in both directions to identify fields missing from Looker API and fields extra in Looker API + view_context_fields = set(self._get_fields_from_view_context()) + view_fields_set = set(view_fields) + missing_from_looker_api = view_context_fields - view_fields_set + extra_in_looker_api = view_fields_set - view_context_fields + logger.debug( + f"view-name={self.view_context.name()}, missing-from-looker-api={missing_from_looker_api}, extra-in-looker-api={extra_in_looker_api}" + ) + if not view_fields: + view_fields = self._get_fields_from_view_context() - # Check for errors encountered during table extraction. - table_error = spr.debug_info.table_error - if table_error is not None: - self.reporter.report_warning( - title="Table Level Lineage Extraction Failed", - message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", - exc=table_error, - ) + if not view_fields: raise ValueError( - f"Error in parsing SQL for upstream tables: {table_error}" + f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." ) - column_error = spr.debug_info.column_error - if column_error is not None: - self.reporter.report_warning( - title="Column Level Lineage Extraction Failed", - message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", - exc=column_error, + if len(view_fields) > self.config.field_threshold_for_splitting: + logger.info( + f"View '{self.view_context.name()}' has {len(view_fields)} fields, " + f"exceeding threshold of {self.config.field_threshold_for_splitting}. Splitting into multiple queries for partial lineage." ) - raise ValueError( - f"Error in parsing SQL for column lineage: {column_error}" + return self._get_spr_with_field_splitting(view_fields, explore_name) + else: + # Use the original single-query approach + sql_query: WriteQuery = self._get_sql_write_query_with_fields( + view_fields, explore_name + ) + sql_response = self._execute_query(sql_query) + spr, has_errors = self._parse_sql_response( + sql_response, + allow_partial=self.config.allow_partial_lineage_results, ) + return spr - return spr except Exception: # Reraise the exception to allow higher-level handling. raise + def _get_sql_write_query_with_fields( + self, view_fields: List[str], explore_name: str + ) -> WriteQuery: + """ + Constructs a WriteQuery object with the provided fields. + + Args: + view_fields: List of field names in Looker API format + explore_name: The explore name to use in the query + + Returns: + WriteQuery: The WriteQuery object + """ + return WriteQuery( + model=self.looker_view_id_cache.model_name, + view=explore_name, + fields=view_fields, + filters={}, + limit="1", + ) + + def _parse_sql_response( + self, sql_response: str, allow_partial: bool = True + ) -> tuple[SqlParsingResult, bool]: + """ + Parse SQL response to extract lineage information. + + Args: + sql_response: The SQL string returned by the Looker API + allow_partial: Whether to allow partial results when there are parsing errors + + Returns: + Tuple of (SqlParsingResult, has_errors) where has_errors indicates if there were parsing issues + """ + spr = create_lineage_sql_parsed_result( + query=sql_response, + default_schema=self.view_context.view_connection.default_schema, + default_db=self.view_context.view_connection.default_db, + platform=self.view_context.view_connection.platform, + platform_instance=self.view_context.view_connection.platform_instance, + env=self.view_context.view_connection.platform_env or self.config.env, + graph=self.ctx.graph, + ) + + has_errors = False + + # Check for errors encountered during table extraction. + table_error = spr.debug_info.table_error + if table_error is not None: + has_errors = True + logger.debug( + f"view-name={self.view_context.name()}, table-error={table_error}, sql-query={sql_response}" + ) + self.reporter.report_warning( + title="Table Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=table_error, + ) + if not allow_partial: + raise ValueError( + f"Error in parsing SQL for upstream tables: {table_error}" + ) + else: + logger.warning( + f"Allowing partial results despite table parsing error for view '{self.view_context.name()}'" + ) + + column_error = spr.debug_info.column_error + if column_error is not None: + has_errors = True + logger.debug( + f"view-name={self.view_context.name()}, column-error={column_error}, sql-query={sql_response}" + ) + self.reporter.report_warning( + title="Column Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=column_error, + ) + if not allow_partial: + raise ValueError( + f"Error in parsing SQL for column lineage: {column_error}" + ) + else: + logger.warning( + f"Allowing partial results despite column parsing error for view '{self.view_context.name()}'" + ) + + return spr, has_errors + + def _process_field_chunk( + self, field_chunk: List[str], explore_name: str, chunk_idx: int + ) -> Tuple[SqlParsingResult, bool, int, int]: + """ + Process a single field chunk and return results. + + Returns: + Tuple of (SqlParsingResult, has_errors, successful_queries, failed_queries) + """ + try: + sql_query = self._get_sql_write_query_with_fields(field_chunk, explore_name) + sql_response = self._execute_query(sql_query) + spr, has_errors = self._parse_sql_response( + sql_response, self.config.allow_partial_lineage_results + ) + + # Check if we should trigger individual field fallback due to parsing errors + if has_errors and self.config.enable_individual_field_fallback: + logger.warning( + f"Chunk {chunk_idx + 1} had parsing errors, attempting individual field processing for view '{self.view_context.name()}'" + ) + try: + ( + individual_tables, + individual_lineage, + individual_success, + individual_failed, + ) = self._process_individual_fields(field_chunk, explore_name) + + # Combine individual field results with chunk results + spr.in_tables.extend(list(individual_tables)) + if individual_lineage and spr.column_lineage: + spr.column_lineage.extend(list(individual_lineage)) + elif individual_lineage: + spr.column_lineage = list(individual_lineage) + + return spr, False, individual_success, individual_failed + + except Exception as individual_error: + logger.error( + f"Individual field processing also failed for chunk {chunk_idx + 1}: {individual_error}" + ) + return spr, has_errors, 0, 1 + else: + if has_errors: + return spr, has_errors, 0, 1 + else: + return spr, has_errors, 1, 0 + + except Exception as e: + logger.warning( + f"Failed to process field chunk {chunk_idx + 1} for view '{self.view_context.name()}': {e}" + ) + self.reporter.report_warning( + title="Field Chunk Processing Failed", + message="Failed to process field chunk with fields", + context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}, Field count: {len(field_chunk)}", + exc=e, + ) + + return ( + SqlParsingResult( + in_tables=[], + out_tables=[], + column_lineage=None, + debug_info=SqlParsingDebugInfo(), + ), + True, + 0, + 1, + ) + + def _process_individual_fields( + self, field_chunk: List[str], explore_name: str + ) -> tuple[set, list, int, int]: + """ + Process individual fields when a chunk fails to isolate problematic fields. + + This method processes each field individually in parallel to maximize information + extraction and isolate problematic fields that cause SQL parsing or API errors. + + Args: + field_chunk: List of field names that failed as a chunk + explore_name: The explore name to use in queries + + Returns: + Tuple of (successful_tables, successful_column_lineage, successful_count, failed_count) + """ + successful_tables = set() + successful_column_lineage = [] + successful_count = 0 + failed_count = 0 + problematic_fields = [] + + logger.info( + f"Processing {len(field_chunk)} fields individually in parallel for view '{self.view_context.name()}' " + f"to isolate problematic fields" + ) + + # Process fields in parallel + with ThreadPoolExecutor( + max_workers=self.config.max_workers_for_parallel_processing + ) as executor: + # Submit all field processing tasks + future_to_field = { + executor.submit( + self._process_individual_field, field_name, explore_name + ): field_name + for field_name in field_chunk + } + + # Collect results as they complete + for future in as_completed(future_to_field): + field_name = future_to_field[future] + try: + tables, lineage, success = future.result() + + if success: + successful_tables.update(tables) + if lineage: + successful_column_lineage.extend(lineage) + successful_count += 1 + else: + failed_count += 1 + problematic_fields.append(field_name) + + except Exception as e: + failed_count += 1 + problematic_fields.append(field_name) + logger.warning( + f"Unexpected error processing individual field '{field_name}' for view '{self.view_context.name()}'': {e}" + ) + + if problematic_fields: + logger.warning( + f"Identified {len(problematic_fields)} problematic fields for view '{self.view_context.name()}': " + f"{problematic_fields}" + ) + self.reporter.report_warning( + title="Problematic Fields Identified", + message="Identified problematic fields that cannot be processed", + context=f"View-name: {self.view_context.name()}, Total problematic: {len(problematic_fields)}, Fields: {', '.join(problematic_fields[:10])}{'...' if len(problematic_fields) > 10 else ''}", + ) + + logger.info( + f"Individual field processing completed for view '{self.view_context.name()}: " + f"{successful_count} successful, {failed_count} failed" + ) + + return ( + successful_tables, + successful_column_lineage, + successful_count, + failed_count, + ) + + def _process_individual_field( + self, field_name: str, explore_name: str + ) -> Tuple[set, list, bool]: + """ + Process a single individual field and return results. + + Returns: + Tuple of (tables, lineage, success) + """ + try: + sql_query = self._get_sql_write_query_with_fields( + [field_name], explore_name + ) + sql_response = self._execute_query(sql_query) + + # Parse the SQL response without partial results to catch parsing errors + # We want to raise exceptions for individual field processing to properly handle failures + spr, has_errors = self._parse_sql_response( + sql_response, allow_partial=False + ) + + # Collect results + tables = set(spr.in_tables) if spr.in_tables else set() + lineage = list(spr.column_lineage) if spr.column_lineage else [] + + logger.debug( + f"Successfully processed individual field '{field_name}' for view '{self.view_context.name()}'" + ) + + return tables, lineage, True + + except Exception as e: + logger.warning( + f"Failed to process individual field '{field_name}' for view '{self.view_context.name()}'': {e}" + ) + self.reporter.report_warning( + title="Individual Field Processing Failed", + message="Failed to process individual field", + context=f"View-name: {self.view_context.name()}, Field: {field_name}", + exc=e, + ) + return set(), [], False + + def _get_spr_with_field_splitting( + self, view_fields: List[str], explore_name: str + ) -> SqlParsingResult: + """ + Handle field splitting when the number of fields exceeds the threshold. + Splits fields into chunks and processes them in parallel for better performance. + + Args: + view_fields: List of all field names + explore_name: The explore name to use in queries + + Returns: + SqlParsingResult: Combined result from multiple queries + """ + + # Split fields into chunks + field_chunks = [ + view_fields[i : i + self.config.field_threshold_for_splitting] + for i in range( + 0, len(view_fields), self.config.field_threshold_for_splitting + ) + ] + + logger.info( + f"Split {len(view_fields)} fields into {len(field_chunks)} chunks for view '{self.view_context.name()}' " + f"and processing them in parallel" + ) + + all_in_tables = set() + all_column_lineage = [] + combined_debug_info = None + successful_queries = 0 + failed_queries = 0 + + # Process chunks in parallel + with ThreadPoolExecutor( + max_workers=self.config.max_workers_for_parallel_processing + ) as executor: + # Submit all chunk processing tasks + future_to_chunk = { + executor.submit( + self._process_field_chunk, field_chunk, explore_name, chunk_idx + ): chunk_idx + for chunk_idx, field_chunk in enumerate(field_chunks) + } + + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + spr, has_errors, chunk_success, chunk_failed = future.result() + + # Combine results + all_in_tables.update(spr.in_tables) + if spr.column_lineage: + all_column_lineage.extend(spr.column_lineage) + + # Use debug info from the first successful query + if combined_debug_info is None and not has_errors: + combined_debug_info = spr.debug_info + + # Update counters + successful_queries += chunk_success + failed_queries += chunk_failed + + logger.debug( + f"Completed processing chunk {chunk_idx + 1}/{len(field_chunks)}: " + f"{chunk_success} successful, {chunk_failed} failed" + ) + + except Exception as e: + failed_queries += 1 + logger.warning( + f"Unexpected error processing chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" + ) + + if not successful_queries or not all_in_tables or not all_column_lineage: + logger.debug( + f"All field chunks failed for view '{self.view_context.name()}'. Total chunks: {len(field_chunks)}, Failed: {failed_queries}", + successful_queries, + all_in_tables, + all_column_lineage, + ) + raise ValueError( + f"All field chunks failed for view '{self.view_context.name()}'. " + f"Total chunks: {len(field_chunks)}, Failed: {failed_queries}" + ) + + # Create combined result + combined_result = SqlParsingResult( + in_tables=list(all_in_tables), + out_tables=[], # No output tables for upstream lineage + column_lineage=all_column_lineage if all_column_lineage else None, + debug_info=combined_debug_info or SqlParsingDebugInfo(), + ) + + logger.info( + f"Successfully combined results for view '{self.view_context.name()}': " + f"{len(all_in_tables)} upstream tables, {len(all_column_lineage)} column lineages. " + f"Success rate: {(successful_queries / (successful_queries + failed_queries)) * 100:.1f}%" + ) + + # Report field splitting statistics + self._report_field_splitting_stats( + total_fields=len(view_fields), + total_chunks=len(field_chunks), + successful_queries=successful_queries, + failed_queries=failed_queries, + upstream_tables=len(all_in_tables), + column_lineages=len(all_column_lineage), + individual_field_fallback_enabled=self.config.enable_individual_field_fallback, + ) + + return combined_result + + def _report_field_splitting_stats( + self, + total_fields: int, + total_chunks: int, + successful_queries: int, + failed_queries: int, + upstream_tables: int, + column_lineages: int, + individual_field_fallback_enabled: bool = False, + ) -> None: + """ + Report statistics about field splitting processing. + + Args: + total_fields: Total number of fields processed + total_chunks: Number of field chunks created + successful_queries: Number of successful API queries + failed_queries: Number of failed API queries + upstream_tables: Number of upstream tables found + column_lineages: Number of column lineages found + individual_field_fallback_enabled: Whether individual field fallback was enabled + """ + # Use different reporting levels based on success rate + total_queries = successful_queries + failed_queries + success_rate = ( + (successful_queries / total_queries) * 100 if total_queries > 0 else 0 + ) + + fallback_info = "" + if individual_field_fallback_enabled: + fallback_info = " (Individual field fallback enabled)" + + if success_rate == 100: + # All queries succeeded + self.reporter.report_warning( + title="Field Splitting Statistics - Complete Success", + message="Field splitting completed successfully", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", + ) + elif success_rate > 0: + # Partial success + self.reporter.report_warning( + title="Field Splitting Statistics - Partial Success", + message="Field splitting completed with partial results", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, Successful queries: {successful_queries}, Failed queries: {failed_queries}, Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", + ) + else: + # Complete failure + self.reporter.report_warning( + title="Field Splitting Statistics - Complete Failure", + message="Field splitting failed completely", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, All {failed_queries} queries failed{fallback_info}", + ) + def _get_time_dim_group_field_name(self, dim_group: dict) -> str: """ Time dimension groups must be referenced by their individual timeframes suffix. @@ -473,27 +991,180 @@ def _get_duration_dim_group_field_name(self, dim_group: dict) -> str: prefix = f"{intervals[0]}s" if intervals else "days" return f"{prefix}_{dim_group_name}" - def _get_sql_write_query(self) -> WriteQuery: + def _is_field_from_current_view( + self, field: LookmlModelExploreField, current_view_name: str + ) -> bool: """ - Constructs a WriteQuery object to obtain the SQL representation of the current Looker view. + Check if a field belongs to the current view based on the original_view attribute. - We need to list all the fields for the view to get the SQL representation of the view - this fully resolved SQL for view dimensions and measures. + Args: + field: The field object from the explore + current_view_name: The name of the current view we're processing - The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery. - This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter. + Returns: + True if the field belongs to the current view, False otherwise + """ + # Check if the field has an original_view attribute and it matches our current view + if hasattr(field, "view"): + return field.view == current_view_name - Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions + # If no view attribute, we can't determine the source view + # In this case, we'll be conservative and include the field + logger.debug(f"Field {field.name} has no view attribute, including it") + return True + def _get_fields_from_looker_api(self, explore_name: str) -> List[str]: + """ + Get fields from Looker API for the given explore. + Only queries for the fields we need (dimensions, measures, and dimension groups) + to optimize API performance. + + Sample Response: [{ + "align": "left", + "can_filter": true, + "category": "dimension", + "default_filter_value": null, + "description": "", + "enumerations": null, + "field_group_label": null, + "fill_style": null, + "fiscal_month_offset": 0, + "has_allowed_values": false, + "hidden": false, + "is_filter": false, + "is_numeric": false, + "label": "Customer Analysis User Purchase Status", + "label_from_parameter": null, + "label_short": "User Purchase Status", + "map_layer": null, + "name": "customer_analysis.user_purchase_status", + "strict_value_format": false, + "requires_refresh_on_sort": false, + "sortable": true, + "suggestions": null, + "synonyms": [], + "tags": [], + "type": "string", + "user_attribute_filter_types": [ + "string", + "advanced_filter_string" + ], + "value_format": null, + "view": "customer_analysis", + "view_label": "Customer Analysis", + "dynamic": false, + "week_start_day": "monday", + "original_view": "users", + "dimension_group": null, + "error": null, + "field_group_variant": "User Purchase Status", + "measure": false, + "parameter": false, + "primary_key": false, + "project_name": "anush-dev-project", + "scope": "customer_analysis", + "suggest_dimension": "customer_analysis.user_purchase_status", + "suggest_explore": "customer_analysis", + "suggestable": true, + "is_fiscal": false, + "is_timeframe": false, + "can_time_filter": false, + "time_interval": null, + "lookml_link": "/projects/anush-dev-project/files/views%2Fusers.view.lkml?line=52", + "period_over_period_params": null, + "permanent": null, + "source_file": "views/users.view.lkml", + "source_file_path": "anush-dev-project/views/users.view.lkml", + "sql": "CASE\n WHEN ${user_metrics.purchase_count} = 0 THEN 'Prospect'\n WHEN ${user_metrics.purchase_count} = 1 THEN 'New Customer'\n WHEN ${user_metrics.purchase_count} BETWEEN 2 AND 5 THEN 'Happy Customer'\n ELSE 'Loyal Customer'\n END ", + "sql_case": null, + "filters": null, + "times_used": 0 + } + ] Returns: - WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None. + List of field names in Looker API format + """ + view_fields: List[str] = [] + # Get the current view name to filter fields + current_view_name = self.view_context.name() - Raises: - ValueError: If the explore name is not found in the view_to_explore_map for the current view. - ValueError: If no fields are found for the view. + try: + logger.debug( + f"Attempting to get explore details from Looker API for explore: {explore_name} and view: {current_view_name}" + ) + # Only query for the fields we need to optimize API performance + explore: LookmlModelExplore = self.get_explore_fields_from_looker_api( + self.looker_client, self.looker_view_id_cache.model_name, explore_name + ) + + if explore and explore.fields: + logger.debug( + f"Looker API response for explore fields: {explore.fields}" + ) + # Creating a map to de-dup dimension group fields - adding all of them adds to the query length, we dont need all of them for CLL + dimension_group_fields_mapping: Dict[str, str] = {} + # Get dimensions from API + if explore.fields.dimensions: + for dim_field in explore.fields.dimensions: + if dim_field.name and self._is_field_from_current_view( + dim_field, current_view_name + ): + # Handle dimension group fields - only add one field per dimension group + if dim_field.dimension_group: + # Skip if this dimension group already has a field + if ( + dim_field.dimension_group + in dimension_group_fields_mapping + ): + continue + # Add this field as the representative for this dimension group + dimension_group_fields_mapping[ + dim_field.dimension_group + ] = dim_field.name + + view_fields.append(dim_field.name) + logger.debug( + f"Added dimension field from API: {dim_field.name} (dimension_group: {dim_field.dimension_group})" + ) + + # Get measures from API + if explore.fields.measures: + for measure_field in explore.fields.measures: + if measure_field.name and self._is_field_from_current_view( + measure_field, current_view_name + ): + view_fields.append(measure_field.name) + logger.debug( + f"Added measure field from API: {measure_field.name}" + ) + else: + logger.warning( + f"No fields found in explore '{explore_name}' from Looker API, falling back to view context" + ) + + except Exception: + logger.warning( + f"Failed to get explore details from Looker API for explore '{explore_name}'. Current view: {self.view_context.name()} and view_fields: {view_fields}. Falling back to view csontext.", + exc_info=True, + ) + # Resetting view_fields to trigger fallback to view context + view_fields = [] + + return view_fields + + def _get_fields_from_view_context(self) -> List[str]: """ + Get fields from view context as fallback. - # Collect all dimension and measure fields for the view. + Returns: + List of field names in Looker API format + """ view_fields: List[str] = [] + + logger.debug( + f"Using view context as fallback for view: {self.view_context.name()}" + ) + # Add dimension fields in the format: . or . for field in self.view_context.dimensions() + self.view_context.measures(): field_name = field.get(NAME) @@ -501,42 +1172,44 @@ def _get_sql_write_query(self) -> WriteQuery: view_fields.append(self._get_looker_api_field_name(field_name)) for dim_group in self.view_context.dimension_groups(): - dim_group_type: ViewFieldDimensionGroupType = ViewFieldDimensionGroupType( - dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) + dim_group_type_str = dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) + + logger.debug( + f"Processing dimension group from view context: {dim_group.get(NAME, 'unknown')}, type: {dim_group_type_str}" ) - if dim_group_type == ViewFieldDimensionGroupType.TIME: - view_fields.append( - self._get_looker_api_field_name( - self._get_time_dim_group_field_name(dim_group) - ) - ) - elif dim_group_type == ViewFieldDimensionGroupType.DURATION: - view_fields.append( - self._get_looker_api_field_name( - self._get_duration_dim_group_field_name(dim_group) - ) + if dim_group_type_str is None: + logger.warning( + f"Dimension group '{dim_group.get(NAME, 'unknown')}' has None type, skipping" ) + continue - # Use explore name from view_to_explore_map if available - # explore_name is always present in the view_to_explore_map because of the check in view_upstream.create_view_upstream - explore_name = self.view_to_explore_map.get(self.view_context.name()) - assert explore_name # Happy linter + try: + dim_group_type: ViewFieldDimensionGroupType = ( + ViewFieldDimensionGroupType(dim_group_type_str) + ) - if not view_fields: - raise ValueError( - f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." - ) + if dim_group_type == ViewFieldDimensionGroupType.TIME: + view_fields.append( + self._get_looker_api_field_name( + self._get_time_dim_group_field_name(dim_group) + ) + ) + elif dim_group_type == ViewFieldDimensionGroupType.DURATION: + view_fields.append( + self._get_looker_api_field_name( + self._get_duration_dim_group_field_name(dim_group) + ) + ) + except Exception: + logger.error( + f"Failed to process dimension group for View-name: {self.view_context.name()}", + exc_info=True, + ) + # Continue processing other fields instead of failing completely + continue - # Construct and return the WriteQuery object. - # The 'limit' is set to "1" as the query is only used to obtain SQL, not to fetch data. - return WriteQuery( - model=self.looker_view_id_cache.model_name, - view=explore_name, - fields=view_fields, - filters={}, - limit="1", - ) + return view_fields def _execute_query(self, query: WriteQuery) -> str: """ @@ -608,6 +1281,9 @@ def _execute_query(self, query: WriteQuery) -> str: f"No SQL found in response for view '{self.view_context.name()}'. Response: {sql_response}" ) + logger.debug( + f"view-name={self.view_context.name()}, sql-response={sql_response}" + ) # Extract the SQL string from the response. return sql_response @@ -645,6 +1321,24 @@ def _get_looker_api_field_name(self, field_name: str) -> str: """ return f"{self.view_context.name()}.{field_name}" + def _get_looker_api_field_names(self, field_name: str) -> List[str]: + """ + Translate the field name to the looker api field names + + Example: + pk -> purchases.pk + + When we do chucking of fields, we need to handle cases like: + pk -> purchases_pk + + Note: cases like pk -> purchases_pk_1 are handled with startswith to cover all possible variants + + """ + return [ + self._get_looker_api_field_name(field_name), + f"{self.view_context.name()}_{field_name}", + ] + def _get_field_name_from_looker_api_field_name( self, looker_api_field_name: str ) -> str: @@ -697,17 +1391,31 @@ def get_upstream_column_ref( f"view-name={self.view_context.name()}, field-name={field_name}, field-type={field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE)}" ) - field_api_name = self._get_looker_api_field_name(field_name).lower() + field_api_names = self._get_looker_api_field_names(field_name) + field_api_names = [name.lower() for name in field_api_names] upstream_refs: List[ColumnRef] = [] - + no_cll_found = True for lineage in spr.column_lineage: - if lineage.downstream.column.lower() == field_api_name: + if any( + lineage.downstream.column.lower().startswith(name) + for name in field_api_names + ): + no_cll_found = False + # Log if there are no upstreams + if not lineage.upstreams: + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column.lower()}, lineage-downstream={lineage.downstream}, no-upstreams" + ) for upstream in lineage.upstreams: upstream_refs.append( ColumnRef(table=upstream.table, column=upstream.column) ) + if no_cll_found: + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column.lower()}, lineage-downstream={lineage.downstream}, no-match" + ) return _drop_hive_dot_from_upstream(upstream_refs) def create_fields(self) -> List[ViewField]: diff --git a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py index a00aea29ab626c..de2363c384d4a4 100644 --- a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py +++ b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py @@ -1,3 +1,4 @@ +from typing import Dict from unittest.mock import MagicMock, patch import pytest @@ -25,6 +26,7 @@ LookMLSourceConfig, LookMLSourceReport, ) +from datahub.ingestion.source.looker.lookml_source import LookMLSource from datahub.ingestion.source.looker.view_upstream import ( LookerQueryAPIBasedViewUpstream, ) @@ -45,6 +47,21 @@ def create_mock_sql_parsing_result( return mock_spr +def create_mock_lookml_source(): + """Helper function to create a properly mocked LookMLSource.""" + config = MagicMock(spec=LookMLSourceConfig) + config.base_folder = None + config.project_name = "test_project" + config.connection_to_platform_map = {"test_connection": "test_platform"} + config.stateful_ingestion = MagicMock() + config.stateful_ingestion.enabled = False + config.api = None + config.looker_client = None + + ctx = MagicMock(spec=PipelineContext) + return LookMLSource(config, ctx) + + class TestLookMLAPIBasedViewUpstream: """Test suite for LookerQueryAPIBasedViewUpstream functionality.""" @@ -516,3 +533,253 @@ def test_latency_tracking( # Verify that latency was reported (may be called multiple times due to caching) assert mock_reporter.report_looker_query_api_latency.call_count >= 1 + + def test_class_level_cache_for_explore_fields(self, mock_looker_client): + """Test that get_explore_fields_from_looker_api uses class-level cache.""" + + # Mock the LookerAPI response + mock_explore = MagicMock() + mock_explore.name = "test_explore" + mock_explore.fields = [ + MagicMock(name="field1", type="string"), + MagicMock(name="field2", type="number"), + ] + + # Add the lookml_model_explore method to the mock + mock_looker_client.lookml_model_explore = MagicMock(return_value=mock_explore) + + # Call the class method multiple times + result1 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "test_explore" + ) + result2 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "test_explore" + ) + + # Verify the method was only called once due to caching + assert mock_looker_client.lookml_model_explore.call_count == 1 + + # Verify results are the same (cached) + assert result1 == result2 + assert result1.name == "test_explore" + assert result1.fields is not None + assert len(result1.fields) == 2 + + def test_class_level_cache_different_explores(self, mock_looker_client): + """Test that class-level cache works for different explores.""" + + # Mock different explores + mock_explore1 = MagicMock() + mock_explore1.name = "explore1" + mock_explore1.fields = [MagicMock(name="field1", type="string")] + + mock_explore2 = MagicMock() + mock_explore2.name = "explore2" + mock_explore2.fields = [MagicMock(name="field2", type="number")] + + def mock_lookml_model_explore(model, explore, fields=None): + if explore == "explore1": + return mock_explore1 + elif explore == "explore2": + return mock_explore2 + return None + + # Add the lookml_model_explore method to the mock + mock_looker_client.lookml_model_explore = MagicMock( + side_effect=mock_lookml_model_explore + ) + + # Call for different explores + result1 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "explore1" + ) + result2 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "explore2" + ) + + # Verify both calls were made + assert mock_looker_client.lookml_model_explore.call_count == 2 + + # Verify results are different + assert result1.name == "explore1" + assert result2.name == "explore2" + assert result1 != result2 + + def test_view_optimization_algorithm(self): + """Test the _optimize_views_by_common_explore algorithm.""" + source = create_mock_lookml_source() + + # Test case with overlapping views and explores + view_to_explores = { + "view1": {"explore_a", "explore_b"}, + "view2": {"explore_a", "explore_b"}, + "view3": {"explore_a"}, + "view4": {"explore_b"}, + "view5": {"explore_c"}, + "view6": {"explore_c"}, + } + + explore_to_views = { + "explore_a": {"view1", "view2", "view3"}, + "explore_b": {"view1", "view2", "view4"}, + "explore_c": {"view5", "view6"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify views are assigned to explores + for view, explore in result.items(): + assert explore in explore_to_views + assert view in explore_to_views[explore] + + def test_view_optimization_empty_input(self): + """Test the optimization algorithm with empty input.""" + source = create_mock_lookml_source() + + # Test with empty input + result = source._optimize_views_by_common_explore({}, {}) + + # Verify empty result + assert len(result) == 0 + assert result == {} + + def test_view_optimization_single_view_multiple_explores(self): + """Test optimization when a view can be in multiple explores.""" + source = create_mock_lookml_source() + + # Test case where one view can be in multiple explores + view_to_explores = {"shared_view": {"explore_a", "explore_b", "explore_c"}} + + explore_to_views = { + "explore_a": {"shared_view"}, + "explore_b": {"shared_view"}, + "explore_c": {"shared_view"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify the view is assigned to one of the explores + assert len(result) == 1 + assert "shared_view" in result + assert result["shared_view"] in ["explore_a", "explore_b", "explore_c"] + + def test_view_optimization_efficiency(self): + """Test that the optimization algorithm reduces the number of explores used.""" + source = create_mock_lookml_source() + + # Create a scenario where optimization can reduce explores + view_to_explores = { + "view1": {"explore_a", "explore_b"}, + "view2": {"explore_a", "explore_b"}, + "view3": {"explore_a"}, + "view4": {"explore_b"}, + "view5": {"explore_c"}, + "view6": {"explore_c"}, + "view7": {"explore_d"}, + "view8": {"explore_d"}, + } + + explore_to_views = { + "explore_a": {"view1", "view2", "view3"}, + "explore_b": {"view1", "view2", "view4"}, + "explore_c": {"view5", "view6"}, + "explore_d": {"view7", "view8"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify the algorithm assigns views optimally + unique_explores_used = len(set(result.values())) + total_explores = len(explore_to_views) + + # The algorithm should use fewer or equal explores + assert unique_explores_used <= total_explores + + def test_optimization_algorithm_performance(self): + """Test that the optimization algorithm handles large datasets efficiently.""" + source = create_mock_lookml_source() + + # Create a larger dataset to test performance + view_to_explores = {} + explore_to_views: Dict[str, set] = {} + + # Create 50 views across 10 explores + for i in range(50): + view_name = f"view_{i}" + # Each view can be in 1-3 explores + num_explores = (i % 3) + 1 + explores = set() + + for j in range(num_explores): + explore_name = f"explore_{(i + j) % 10 + 1}" + explores.add(explore_name) + if explore_name not in explore_to_views: + explore_to_views[explore_name] = set() + explore_to_views[explore_name].add(view_name) + + view_to_explores[view_name] = explores + + # Test the optimization + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify the algorithm completed successfully + unique_explores_used = len(set(result.values())) + assert unique_explores_used > 0 + assert unique_explores_used <= len(explore_to_views) + + def test_optimization_algorithm_edge_cases(self): + """Test edge cases for the optimization algorithm.""" + source = create_mock_lookml_source() + + # Test case 1: All views in one explore + view_to_explores_1 = { + "view1": {"explore_a"}, + "view2": {"explore_a"}, + "view3": {"explore_a"}, + } + explore_to_views_1 = {"explore_a": {"view1", "view2", "view3"}} + + result_1 = source._optimize_views_by_common_explore( + view_to_explores_1, explore_to_views_1 + ) + assert len(result_1) == 3 + assert all(explore == "explore_a" for explore in result_1.values()) + + # Test case 2: Views with no explores + result_2 = source._optimize_views_by_common_explore({}, {}) + assert len(result_2) == 0 + + # Test case 3: Single view, multiple explores + view_to_explores_3 = {"view1": {"explore_a", "explore_b", "explore_c"}} + explore_to_views_3 = { + "explore_a": {"view1"}, + "explore_b": {"view1"}, + "explore_c": {"view1"}, + } + + result_3 = source._optimize_views_by_common_explore( + view_to_explores_3, explore_to_views_3 + ) + assert len(result_3) == 1 + assert "view1" in result_3 + assert result_3["view1"] in ["explore_a", "explore_b", "explore_c"]