From 3bcf86de571a61f1393d4334bcd2e6e43e148c19 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 20 Oct 2025 16:44:56 -0700 Subject: [PATCH 01/17] feat(looker): enhance Looker API integration by adding field retrieval and caching for explores - Updated `lookml_model_explore` method to accept an optional `fields` parameter for optimized API calls. - Introduced `get_explore_fields_from_looker_api` method to fetch fields directly from the Looker API, improving performance and reducing unnecessary API calls. - Implemented fallback mechanisms to retrieve fields from view context if API calls fail. - Added logging for better traceability of API interactions and field retrieval processes. --- .../source/looker/looker_lib_wrapper.py | 12 +- .../ingestion/source/looker/view_upstream.py | 278 ++++++++++++++++-- 2 files changed, 264 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index ef43510611bbcc..53c4137348fc97 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -228,10 +228,18 @@ def all_lookml_models(self) -> Sequence[LookmlModel]: transport_options=self.transport_options, ) - def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore: + def lookml_model_explore( + self, + model: str, + explore_name: str, + fields: Optional[List[str]] = None, + ) -> LookmlModelExplore: self.client_stats.explore_calls += 1 return self.client.lookml_model_explore( - model, explore_name, transport_options=self.transport_options + model, + explore_name, + fields=self.__fields_mapper(fields) if fields else None, + transport_options=self.transport_options, ) @lru_cache(maxsize=1000) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 07393ab4cc47ec..ee200edd1aeed3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -6,6 +6,8 @@ from typing import Dict, List, Optional from looker_sdk.sdk.api40.models import ( + LookmlModelExplore, + LookmlModelExploreField, WriteQuery, ) @@ -293,6 +295,36 @@ def create_upstream_column_refs( return upstream_column_refs + @classmethod + @lru_cache(maxsize=1000) + def get_explore_fields_from_looker_api( + cls, looker_client: "LookerAPI", model_name: str, explore_name: str + ) -> LookmlModelExplore: + """ + Get fields from Looker API for the given explore. + Only queries for the fields we need (dimensions, measures, and dimension groups) + to optimize API performance. + + Note: This is a cached method to optimize API performance since a single explore can have multiple views. + When we associate a view with an explore, we try to associate as many views as possible with the same explore to reduce the number of API calls. + + Returns: + LookmlModelExplore: The explore with the fields. + Raises: + Exception: If there is an error getting the explore from the Looker API. + """ + try: + fields_to_query = ["fields"] + explore: LookmlModelExplore = looker_client.lookml_model_explore( + model_name, + explore_name, + fields=fields_to_query, + ) + return explore + except Exception as e: + logger.error(f"Error getting explore from Looker API: {e}") + raise e + class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): """ @@ -359,7 +391,6 @@ def __init__( self._get_upstream_dataset_urn = lru_cache(maxsize=1)( self.__get_upstream_dataset_urn ) - # Initialize the cache # Done to fallback to other implementations if the Looker Query API fails self._get_spr() @@ -473,27 +504,176 @@ def _get_duration_dim_group_field_name(self, dim_group: dict) -> str: prefix = f"{intervals[0]}s" if intervals else "days" return f"{prefix}_{dim_group_name}" - def _get_sql_write_query(self) -> WriteQuery: + def _is_field_from_current_view( + self, field: LookmlModelExploreField, current_view_name: str + ) -> bool: """ - Constructs a WriteQuery object to obtain the SQL representation of the current Looker view. + Check if a field belongs to the current view based on the original_view attribute. - We need to list all the fields for the view to get the SQL representation of the view - this fully resolved SQL for view dimensions and measures. + Args: + field: The field object from the explore + current_view_name: The name of the current view we're processing - The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery. - This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter. + Returns: + True if the field belongs to the current view, False otherwise + """ + # Check if the field has an original_view attribute and it matches our current view + if hasattr(field, "view"): + return field.view == current_view_name - Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions + # If no view attribute, we can't determine the source view + # In this case, we'll be conservative and include the field + logger.debug(f"Field {field.name} has no view attribute, including it") + return True + def _get_fields_from_looker_api(self, explore_name: str) -> List[str]: + """ + Get fields from Looker API for the given explore. + Only queries for the fields we need (dimensions, measures, and dimension groups) + to optimize API performance. + + Sample Response: [{ + "align": "left", + "can_filter": true, + "category": "dimension", + "default_filter_value": null, + "description": "", + "enumerations": null, + "field_group_label": null, + "fill_style": null, + "fiscal_month_offset": 0, + "has_allowed_values": false, + "hidden": false, + "is_filter": false, + "is_numeric": false, + "label": "Customer Analysis User Purchase Status", + "label_from_parameter": null, + "label_short": "User Purchase Status", + "map_layer": null, + "name": "customer_analysis.user_purchase_status", + "strict_value_format": false, + "requires_refresh_on_sort": false, + "sortable": true, + "suggestions": null, + "synonyms": [], + "tags": [], + "type": "string", + "user_attribute_filter_types": [ + "string", + "advanced_filter_string" + ], + "value_format": null, + "view": "customer_analysis", + "view_label": "Customer Analysis", + "dynamic": false, + "week_start_day": "monday", + "original_view": "users", + "dimension_group": null, + "error": null, + "field_group_variant": "User Purchase Status", + "measure": false, + "parameter": false, + "primary_key": false, + "project_name": "anush-dev-project", + "scope": "customer_analysis", + "suggest_dimension": "customer_analysis.user_purchase_status", + "suggest_explore": "customer_analysis", + "suggestable": true, + "is_fiscal": false, + "is_timeframe": false, + "can_time_filter": false, + "time_interval": null, + "lookml_link": "/projects/anush-dev-project/files/views%2Fusers.view.lkml?line=52", + "period_over_period_params": null, + "permanent": null, + "source_file": "views/users.view.lkml", + "source_file_path": "anush-dev-project/views/users.view.lkml", + "sql": "CASE\n WHEN ${user_metrics.purchase_count} = 0 THEN 'Prospect'\n WHEN ${user_metrics.purchase_count} = 1 THEN 'New Customer'\n WHEN ${user_metrics.purchase_count} BETWEEN 2 AND 5 THEN 'Happy Customer'\n ELSE 'Loyal Customer'\n END ", + "sql_case": null, + "filters": null, + "times_used": 0 + } + ] Returns: - WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None. + List of field names in Looker API format + """ + view_fields: List[str] = [] + # Get the current view name to filter fields + current_view_name = self.view_context.name() - Raises: - ValueError: If the explore name is not found in the view_to_explore_map for the current view. - ValueError: If no fields are found for the view. + try: + logger.debug( + f"Attempting to get explore details from Looker API for explore: {explore_name} and view: {current_view_name}" + ) + # Only query for the fields we need to optimize API performance + explore: LookmlModelExplore = self.get_explore_fields_from_looker_api( + self.looker_client, self.looker_view_id_cache.model_name, explore_name + ) + + if explore and explore.fields: + # Creating a map to de-dup dimension group fields - adding all of them adds to the query length, we dont need all of them for CLL + dimension_group_fields_mapping: Dict[str, str] = {} + # Get dimensions from API + if explore.fields.dimensions: + for dim_field in explore.fields.dimensions: + if dim_field.name and self._is_field_from_current_view( + dim_field, current_view_name + ): + # Skipping adding dimension group fields if already added + if ( + dim_field.dimension_group + and dim_field.dimension_group + in dimension_group_fields_mapping + ): + continue + else: + assert ( + dim_field.dimension_group is not None + ) # HAPPY linter + dimension_group_fields_mapping[ + dim_field.dimension_group + ] = dim_field.name + + view_fields.append(dim_field.name) + logger.debug( + f"Added dimension field from API: {dim_field.name} (dimension_group: {dim_field.dimension_group})" + ) + + # Get measures from API + if explore.fields.measures: + for measure_field in explore.fields.measures: + if measure_field.name and self._is_field_from_current_view( + measure_field, current_view_name + ): + view_fields.append(measure_field.name) + logger.debug( + f"Added measure field from API: {measure_field.name}" + ) + else: + logger.warning( + f"No fields found in explore '{explore_name}' from Looker API, falling back to view context" + ) + + except Exception as e: + logger.warning( + f"Failed to get explore details from Looker API for explore '{explore_name}': {e}. Falling back to view context." + ) + + return view_fields + + def _get_fields_from_view_context(self) -> List[str]: """ + Get fields from view context as fallback. - # Collect all dimension and measure fields for the view. + Returns: + List of field names in Looker API format + """ view_fields: List[str] = [] + + logger.debug( + f"Using view context as fallback for view: {self.view_context.name()}" + ) + # Add dimension fields in the format: . or . for field in self.view_context.dimensions() + self.view_context.measures(): field_name = field.get(NAME) @@ -501,33 +681,83 @@ def _get_sql_write_query(self) -> WriteQuery: view_fields.append(self._get_looker_api_field_name(field_name)) for dim_group in self.view_context.dimension_groups(): - dim_group_type: ViewFieldDimensionGroupType = ViewFieldDimensionGroupType( - dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) + dim_group_type_str = dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) + + logger.debug( + f"Processing dimension group from view context: {dim_group.get(NAME, 'unknown')}, type: {dim_group_type_str}" ) - if dim_group_type == ViewFieldDimensionGroupType.TIME: - view_fields.append( - self._get_looker_api_field_name( - self._get_time_dim_group_field_name(dim_group) - ) + if dim_group_type_str is None: + logger.warning( + f"Dimension group '{dim_group.get(NAME, 'unknown')}' has None type, skipping" ) - elif dim_group_type == ViewFieldDimensionGroupType.DURATION: - view_fields.append( - self._get_looker_api_field_name( - self._get_duration_dim_group_field_name(dim_group) - ) + continue + + try: + dim_group_type: ViewFieldDimensionGroupType = ( + ViewFieldDimensionGroupType(dim_group_type_str) ) + if dim_group_type == ViewFieldDimensionGroupType.TIME: + view_fields.append( + self._get_looker_api_field_name( + self._get_time_dim_group_field_name(dim_group) + ) + ) + elif dim_group_type == ViewFieldDimensionGroupType.DURATION: + view_fields.append( + self._get_looker_api_field_name( + self._get_duration_dim_group_field_name(dim_group) + ) + ) + except Exception as e: + logger.error(f"View-name: {self.view_context.name()}: {e}") + # Continue processing other fields instead of failing completely + continue + + return view_fields + + def _get_sql_write_query(self) -> WriteQuery: + """ + Constructs a WriteQuery object to obtain the SQL representation of the current Looker view. + + This method now uses the Looker API to get explore details directly, providing more comprehensive + field information compared to relying solely on view context. It falls back to view context + if API calls fail. + + The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery. + This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter. + + Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions + + Returns: + WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None. + + Raises: + ValueError: If no fields are found for the view. + """ + # Use explore name from view_to_explore_map if available # explore_name is always present in the view_to_explore_map because of the check in view_upstream.create_view_upstream explore_name = self.view_to_explore_map.get(self.view_context.name()) assert explore_name # Happy linter + # Try to get fields from Looker API first for more comprehensive information + view_fields = self._get_fields_from_looker_api(explore_name) + + # Fallback to view context if API didn't provide fields or failed + if not view_fields: + view_fields = self._get_fields_from_view_context() + if not view_fields: raise ValueError( f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." ) + logger.debug( + f"Final field list for view '{self.view_context.name()}': {view_fields}" + ) + # Construct and return the WriteQuery object. # The 'limit' is set to "1" as the query is only used to obtain SQL, not to fetch data. return WriteQuery( From 6cb48e1d9fe926ba3f1205ff8164d8b14d207673 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 20 Oct 2025 18:13:27 -0700 Subject: [PATCH 02/17] feat(looker): optimize view-to-explore mapping for API efficiency - Implemented a greedy algorithm to minimize API calls by grouping views with common explores, improving overall efficiency. --- .../ingestion/source/looker/lookml_source.py | 78 ++++++++++++++++--- 1 file changed, 68 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index eca713efc2d1ab..b7c01775afa4c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -1,7 +1,7 @@ import logging import pathlib import tempfile -from collections import OrderedDict +from collections import OrderedDict, defaultdict from dataclasses import dataclass from datetime import datetime, timezone from typing import Dict, Iterable, List, Optional, Set, Tuple, Union @@ -709,10 +709,14 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: # Value: Tuple(model file name, connection name) view_connection_map: Dict[str, Tuple[str, str]] = {} - # Map of view name to explore name for API-based view lineage - # A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API - # Key: view_name, Value: explore_name - view_to_explore_map: Dict[str, str] = {} + # Map of view name to all possible explores for API-based view lineage + # A view can be referenced by multiple explores, we'll optimize the assignment + # Key: view_name, Value: set of explore_names + view_to_explores: Dict[str, Set[str]] = defaultdict(set) + + # Temporary map to keep track of the views in an explore + # Key: explore_name, Value: set of view_names + explore_to_views: Dict[str, Set[str]] = defaultdict(set) # The ** means "this directory and all subdirectories", and hence should # include all the files we want. @@ -789,8 +793,10 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: for view_name in explore.upstream_views: if self.source_config.emit_reachable_views_only: explore_reachable_views.add(view_name.include) - # Build view to explore mapping for API-based view lineage - view_to_explore_map[view_name.include] = explore.name + + # Build view-to-explores mapping efficiently + view_to_explores[view_name.include].add(explore.name) + explore_to_views[explore.name].add(view_name.include) except Exception as e: self.reporter.report_warning( title="Failed to process explores", @@ -888,9 +894,9 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: config=self.source_config, ctx=self.ctx, looker_client=self.looker_client, - view_to_explore_map=view_to_explore_map - if view_to_explore_map - else None, + view_to_explore_map=self._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ), ) except Exception as e: self.reporter.report_warning( @@ -1040,5 +1046,57 @@ def report_skipped_unreachable_views( context=(f"Project: {project}, View File Path: {path}"), ) + def _optimize_views_by_common_explore( + self, + view_to_explores: Dict[str, Set[str]], + explore_to_views: Dict[str, Set[str]], + ) -> Dict[str, str]: + """ + Optimize view-to-explore mapping by grouping views to minimize API calls. + + This uses a greedy algorithm that prioritizes explores that appear in the most views, + maximizing the number of views assigned to the same explore. + + Args: + view_to_explores: Dict mapping view_name -> set of explore_names + explore_to_views: Dict mapping explore_name -> set of view_names + + Returns: + Dict mapping view_name -> explore_name (optimized assignment) + """ + + # Pre-compute explore sizes + explore_sizes = { + explore: len(views) for explore, views in explore_to_views.items() + } + + # Build view-to-explore mapping using dynamic programming approach + view_to_explore: Dict[str, str] = {} + + # For each view, find the explore with maximum size that contains it + for view_name in view_to_explores: + # Get all explores that contain this view from pre-built mapping + candidate_explores = view_to_explores[view_name] + + if candidate_explores: + # Find explore with maximum size using max() with key function + # This assings the view to the explore with the most views that contains it + best_explore = max( + candidate_explores, key=lambda explore: explore_sizes[explore] + ) + view_to_explore[view_name] = best_explore + + # Log optimization results + unique_explores_used = len(set(view_to_explore.values())) + total_views = len(view_to_explore) + total_explores = len(explore_to_views) + + logger.info( + f"View-explore optimization: Using {unique_explores_used}/{total_explores} " + f"explores for {total_views} views (efficiency: {(1 - unique_explores_used / total_explores):.1%} savings)" + ) + + return view_to_explore + def get_report(self): return self.reporter From 16eb0c449d8d7c45d77aedec85fa79c6305a1a06 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 20 Oct 2025 18:23:32 -0700 Subject: [PATCH 03/17] tests(looker): Added unit tests for the optimization algorithm, covering various scenarios including edge cases and performance with large datasets. --- .../ingestion/source/looker/lookml_source.py | 14 +- .../test_lookml_api_based_view_upstream.py | 265 ++++++++++++++++++ 2 files changed, 275 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index b7c01775afa4c5..e0309da48d5d24 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -1091,10 +1091,16 @@ def _optimize_views_by_common_explore( total_views = len(view_to_explore) total_explores = len(explore_to_views) - logger.info( - f"View-explore optimization: Using {unique_explores_used}/{total_explores} " - f"explores for {total_views} views (efficiency: {(1 - unique_explores_used / total_explores):.1%} savings)" - ) + if total_explores > 0: + efficiency = (1 - unique_explores_used / total_explores) * 100 + logger.info( + f"View-explore optimization: Using {unique_explores_used}/{total_explores} " + f"explores for {total_views} views (efficiency: {efficiency:.1f}% savings)" + ) + else: + logger.info( + f"View-explore optimization: No explores to optimize for {total_views} views" + ) return view_to_explore diff --git a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py index a00aea29ab626c..4ffb20890c7330 100644 --- a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py +++ b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py @@ -25,6 +25,7 @@ LookMLSourceConfig, LookMLSourceReport, ) +from datahub.ingestion.source.looker.lookml_source import LookMLSource from datahub.ingestion.source.looker.view_upstream import ( LookerQueryAPIBasedViewUpstream, ) @@ -45,6 +46,21 @@ def create_mock_sql_parsing_result( return mock_spr +def create_mock_lookml_source(): + """Helper function to create a properly mocked LookMLSource.""" + config = MagicMock(spec=LookMLSourceConfig) + config.base_folder = None + config.project_name = "test_project" + config.connection_to_platform_map = {"test_connection": "test_platform"} + config.stateful_ingestion = MagicMock() + config.stateful_ingestion.enabled = False + config.api = None + config.looker_client = None + + ctx = MagicMock(spec=PipelineContext) + return LookMLSource(config, ctx) + + class TestLookMLAPIBasedViewUpstream: """Test suite for LookerQueryAPIBasedViewUpstream functionality.""" @@ -516,3 +532,252 @@ def test_latency_tracking( # Verify that latency was reported (may be called multiple times due to caching) assert mock_reporter.report_looker_query_api_latency.call_count >= 1 + + def test_class_level_cache_for_explore_fields(self, mock_looker_client): + """Test that get_explore_fields_from_looker_api uses class-level cache.""" + + # Mock the LookerAPI response + mock_explore = MagicMock() + mock_explore.name = "test_explore" + mock_explore.fields = [ + MagicMock(name="field1", type="string"), + MagicMock(name="field2", type="number"), + ] + + # Add the lookml_model_explore method to the mock + mock_looker_client.lookml_model_explore = MagicMock(return_value=mock_explore) + + # Call the class method multiple times + result1 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "test_explore" + ) + result2 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "test_explore" + ) + + # Verify the method was only called once due to caching + assert mock_looker_client.lookml_model_explore.call_count == 1 + + # Verify results are the same (cached) + assert result1 == result2 + assert result1.name == "test_explore" + assert len(result1.fields) == 2 + + def test_class_level_cache_different_explores(self, mock_looker_client): + """Test that class-level cache works for different explores.""" + + # Mock different explores + mock_explore1 = MagicMock() + mock_explore1.name = "explore1" + mock_explore1.fields = [MagicMock(name="field1", type="string")] + + mock_explore2 = MagicMock() + mock_explore2.name = "explore2" + mock_explore2.fields = [MagicMock(name="field2", type="number")] + + def mock_lookml_model_explore(model, explore, fields=None): + if explore == "explore1": + return mock_explore1 + elif explore == "explore2": + return mock_explore2 + return None + + # Add the lookml_model_explore method to the mock + mock_looker_client.lookml_model_explore = MagicMock( + side_effect=mock_lookml_model_explore + ) + + # Call for different explores + result1 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "explore1" + ) + result2 = LookerQueryAPIBasedViewUpstream.get_explore_fields_from_looker_api( + mock_looker_client, "test_model", "explore2" + ) + + # Verify both calls were made + assert mock_looker_client.lookml_model_explore.call_count == 2 + + # Verify results are different + assert result1.name == "explore1" + assert result2.name == "explore2" + assert result1 != result2 + + def test_view_optimization_algorithm(self): + """Test the _optimize_views_by_common_explore algorithm.""" + source = create_mock_lookml_source() + + # Test case with overlapping views and explores + view_to_explores = { + "view1": {"explore_a", "explore_b"}, + "view2": {"explore_a", "explore_b"}, + "view3": {"explore_a"}, + "view4": {"explore_b"}, + "view5": {"explore_c"}, + "view6": {"explore_c"}, + } + + explore_to_views = { + "explore_a": {"view1", "view2", "view3"}, + "explore_b": {"view1", "view2", "view4"}, + "explore_c": {"view5", "view6"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify views are assigned to explores + for view, explore in result.items(): + assert explore in explore_to_views + assert view in explore_to_views[explore] + + def test_view_optimization_empty_input(self): + """Test the optimization algorithm with empty input.""" + source = create_mock_lookml_source() + + # Test with empty input + result = source._optimize_views_by_common_explore({}, {}) + + # Verify empty result + assert len(result) == 0 + assert result == {} + + def test_view_optimization_single_view_multiple_explores(self): + """Test optimization when a view can be in multiple explores.""" + source = create_mock_lookml_source() + + # Test case where one view can be in multiple explores + view_to_explores = {"shared_view": {"explore_a", "explore_b", "explore_c"}} + + explore_to_views = { + "explore_a": {"shared_view"}, + "explore_b": {"shared_view"}, + "explore_c": {"shared_view"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify the view is assigned to one of the explores + assert len(result) == 1 + assert "shared_view" in result + assert result["shared_view"] in ["explore_a", "explore_b", "explore_c"] + + def test_view_optimization_efficiency(self): + """Test that the optimization algorithm reduces the number of explores used.""" + source = create_mock_lookml_source() + + # Create a scenario where optimization can reduce explores + view_to_explores = { + "view1": {"explore_a", "explore_b"}, + "view2": {"explore_a", "explore_b"}, + "view3": {"explore_a"}, + "view4": {"explore_b"}, + "view5": {"explore_c"}, + "view6": {"explore_c"}, + "view7": {"explore_d"}, + "view8": {"explore_d"}, + } + + explore_to_views = { + "explore_a": {"view1", "view2", "view3"}, + "explore_b": {"view1", "view2", "view4"}, + "explore_c": {"view5", "view6"}, + "explore_d": {"view7", "view8"}, + } + + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify the algorithm assigns views optimally + unique_explores_used = len(set(result.values())) + total_explores = len(explore_to_views) + + # The algorithm should use fewer or equal explores + assert unique_explores_used <= total_explores + + def test_optimization_algorithm_performance(self): + """Test that the optimization algorithm handles large datasets efficiently.""" + source = create_mock_lookml_source() + + # Create a larger dataset to test performance + view_to_explores = {} + explore_to_views = {} + + # Create 50 views across 10 explores + for i in range(50): + view_name = f"view_{i}" + # Each view can be in 1-3 explores + num_explores = (i % 3) + 1 + explores = set() + + for j in range(num_explores): + explore_name = f"explore_{(i + j) % 10 + 1}" + explores.add(explore_name) + if explore_name not in explore_to_views: + explore_to_views[explore_name] = set() + explore_to_views[explore_name].add(view_name) + + view_to_explores[view_name] = explores + + # Test the optimization + result = source._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + + # Verify all views are present + assert len(result) == len(view_to_explores) + assert set(result.keys()) == set(view_to_explores.keys()) + + # Verify the algorithm completed successfully + unique_explores_used = len(set(result.values())) + assert unique_explores_used > 0 + assert unique_explores_used <= len(explore_to_views) + + def test_optimization_algorithm_edge_cases(self): + """Test edge cases for the optimization algorithm.""" + source = create_mock_lookml_source() + + # Test case 1: All views in one explore + view_to_explores_1 = { + "view1": {"explore_a"}, + "view2": {"explore_a"}, + "view3": {"explore_a"}, + } + explore_to_views_1 = {"explore_a": {"view1", "view2", "view3"}} + + result_1 = source._optimize_views_by_common_explore( + view_to_explores_1, explore_to_views_1 + ) + assert len(result_1) == 3 + assert all(explore == "explore_a" for explore in result_1.values()) + + # Test case 2: Views with no explores + result_2 = source._optimize_views_by_common_explore({}, {}) + assert len(result_2) == 0 + + # Test case 3: Single view, multiple explores + view_to_explores_3 = {"view1": {"explore_a", "explore_b", "explore_c"}} + explore_to_views_3 = { + "explore_a": {"view1"}, + "explore_b": {"view1"}, + "explore_c": {"view1"}, + } + + result_3 = source._optimize_views_by_common_explore( + view_to_explores_3, explore_to_views_3 + ) + assert len(result_3) == 1 + assert "view1" in result_3 + assert result_3["view1"] in ["explore_a", "explore_b", "explore_c"] From 62d1c9673a21de4bc7a13963743b406626c1be83 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 21 Oct 2025 15:58:40 -0700 Subject: [PATCH 04/17] refactor(looker): optimize view-to-explore mapping and enhance logging --- .../ingestion/source/looker/lookml_source.py | 21 ++++++++++++------- .../ingestion/source/looker/view_upstream.py | 19 ++++++++++++----- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index e0309da48d5d24..7b500b2e828b7c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -794,7 +794,6 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: if self.source_config.emit_reachable_views_only: explore_reachable_views.add(view_name.include) - # Build view-to-explores mapping efficiently view_to_explores[view_name.include].add(explore.name) explore_to_views[explore.name].add(view_name.include) except Exception as e: @@ -810,6 +809,16 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: model.connection, set() ) + view_to_explore_map = {} + if view_to_explores and explore_to_views: + view_to_explore_map = self._optimize_views_by_common_explore( + view_to_explores, explore_to_views + ) + else: + logger.warning( + f"Either view_to_explores: {view_to_explores} or explore_to_views: {explore_to_views} is empty" + ) + project_name = self.get_project_name(model_name) looker_view_id_cache: LookerViewIdCache = LookerViewIdCache( @@ -894,9 +903,7 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]: config=self.source_config, ctx=self.ctx, looker_client=self.looker_client, - view_to_explore_map=self._optimize_views_by_common_explore( - view_to_explores, explore_to_views - ), + view_to_explore_map=view_to_explore_map, ) except Exception as e: self.reporter.report_warning( @@ -1074,10 +1081,7 @@ def _optimize_views_by_common_explore( view_to_explore: Dict[str, str] = {} # For each view, find the explore with maximum size that contains it - for view_name in view_to_explores: - # Get all explores that contain this view from pre-built mapping - candidate_explores = view_to_explores[view_name] - + for view_name, candidate_explores in view_to_explores.items(): if candidate_explores: # Find explore with maximum size using max() with key function # This assings the view to the explore with the most views that contains it @@ -1102,6 +1106,7 @@ def _optimize_views_by_common_explore( f"View-explore optimization: No explores to optimize for {total_views} views" ) + logger.debug(f"Final View-to-explore mapping: {view_to_explore}") return view_to_explore def get_report(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index ee200edd1aeed3..0f293ffa0fb850 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -611,6 +611,9 @@ def _get_fields_from_looker_api(self, explore_name: str) -> List[str]: ) if explore and explore.fields: + logger.debug( + f"Looker API response for explore fields: {explore.fields}" + ) # Creating a map to de-dup dimension group fields - adding all of them adds to the query length, we dont need all of them for CLL dimension_group_fields_mapping: Dict[str, str] = {} # Get dimensions from API @@ -654,10 +657,13 @@ def _get_fields_from_looker_api(self, explore_name: str) -> List[str]: f"No fields found in explore '{explore_name}' from Looker API, falling back to view context" ) - except Exception as e: + except Exception: logger.warning( - f"Failed to get explore details from Looker API for explore '{explore_name}': {e}. Falling back to view context." + f"Failed to get explore details from Looker API for explore '{explore_name}'. Current view: {self.view_context.name()} and view_fields: {view_fields}. Falling back to view csontext.", + exc_info=True, ) + # Resetting view_fields to trigger fallback to view context + view_fields = [] return view_fields @@ -682,7 +688,7 @@ def _get_fields_from_view_context(self) -> List[str]: for dim_group in self.view_context.dimension_groups(): dim_group_type_str = dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) - + logger.debug( f"Processing dimension group from view context: {dim_group.get(NAME, 'unknown')}, type: {dim_group_type_str}" ) @@ -710,8 +716,11 @@ def _get_fields_from_view_context(self) -> List[str]: self._get_duration_dim_group_field_name(dim_group) ) ) - except Exception as e: - logger.error(f"View-name: {self.view_context.name()}: {e}") + except Exception: + logger.error( + f"Failed to process dimension group for View-name: {self.view_context.name()}", + exc_info=True, + ) # Continue processing other fields instead of failing completely continue From c10434692318e2ccd86a213b1b17fcf84bc80732 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 21 Oct 2025 16:28:56 -0700 Subject: [PATCH 05/17] lint fixes --- .../tests/unit/lookml/test_lookml_api_based_view_upstream.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py index 4ffb20890c7330..de2363c384d4a4 100644 --- a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py +++ b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py @@ -1,3 +1,4 @@ +from typing import Dict from unittest.mock import MagicMock, patch import pytest @@ -561,6 +562,7 @@ def test_class_level_cache_for_explore_fields(self, mock_looker_client): # Verify results are the same (cached) assert result1 == result2 assert result1.name == "test_explore" + assert result1.fields is not None assert len(result1.fields) == 2 def test_class_level_cache_different_explores(self, mock_looker_client): @@ -713,7 +715,7 @@ def test_optimization_algorithm_performance(self): # Create a larger dataset to test performance view_to_explores = {} - explore_to_views = {} + explore_to_views: Dict[str, set] = {} # Create 50 views across 10 explores for i in range(50): From 41b3dd30829e273b499b34558ee298e982beca2f Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Thu, 23 Oct 2025 13:01:10 -0700 Subject: [PATCH 06/17] refactor(looker): streamline handling of dimension group fields in view upstream processing - Updated logic to ensure only one field per dimension group is added, improving clarity and maintainability of the code. - Removed redundant checks and added comments for better understanding of the dimension group handling process. --- .../ingestion/source/looker/view_upstream.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 0f293ffa0fb850..8df90083110930 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -622,17 +622,15 @@ def _get_fields_from_looker_api(self, explore_name: str) -> List[str]: if dim_field.name and self._is_field_from_current_view( dim_field, current_view_name ): - # Skipping adding dimension group fields if already added - if ( - dim_field.dimension_group - and dim_field.dimension_group - in dimension_group_fields_mapping - ): - continue - else: - assert ( - dim_field.dimension_group is not None - ) # HAPPY linter + # Handle dimension group fields - only add one field per dimension group + if dim_field.dimension_group: + # Skip if this dimension group already has a field + if ( + dim_field.dimension_group + in dimension_group_fields_mapping + ): + continue + # Add this field as the representative for this dimension group dimension_group_fields_mapping[ dim_field.dimension_group ] = dim_field.name From 112e5e36779a136e06bb36b283f012f9e152771e Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 27 Oct 2025 15:08:36 -0700 Subject: [PATCH 07/17] Improve logging for error handling in view upstream processing --- .../src/datahub/ingestion/source/looker/view_upstream.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 8df90083110930..d02583f2f52d15 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -432,6 +432,9 @@ def __get_spr(self) -> SqlParsingResult: # Check for errors encountered during table extraction. table_error = spr.debug_info.table_error if table_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, table-error={table_error}, sql-query={sql_response}" + ) self.reporter.report_warning( title="Table Level Lineage Extraction Failed", message="Error in parsing derived sql", @@ -444,6 +447,9 @@ def __get_spr(self) -> SqlParsingResult: column_error = spr.debug_info.column_error if column_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, column-error={column_error}, sql-query={sql_response}" + ) self.reporter.report_warning( title="Column Level Lineage Extraction Failed", message="Error in parsing derived sql", @@ -845,6 +851,9 @@ def _execute_query(self, query: WriteQuery) -> str: f"No SQL found in response for view '{self.view_context.name()}'. Response: {sql_response}" ) + logger.debug( + f"view-name={self.view_context.name()}, sql-response={sql_response}" + ) # Extract the SQL string from the response. return sql_response From b0ced0fd5eadc6e236c7714e6ce0ebb37f024dfb Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 14:28:09 -0700 Subject: [PATCH 08/17] feat(looker): add field splitting and partial lineage support for large field sets --- .../ingestion/source/looker/lookml_config.py | 14 + .../ingestion/source/looker/view_upstream.py | 379 +++++++++++++----- 2 files changed, 301 insertions(+), 92 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index c22aa9c011e2f8..c4d28f97206d94 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -210,6 +210,20 @@ class LookMLSourceConfig( "All if comments are evaluated to true for configured looker_environment value", ) + field_threshold_for_splitting: int = Field( + 100, + description="When the total number of fields returned by Looker API exceeds this threshold, " + "the fields will be split into multiple API calls to avoid SQL parsing failures. " + "This helps provide partial column and table lineage when dealing with large field sets.", + ) + + allow_partial_lineage_results: bool = Field( + True, + description="When enabled, allows partial lineage results to be returned even when some field chunks " + "fail or when there are SQL parsing errors. This provides better resilience for large field sets " + "and ensures some lineage information is available rather than complete failure.", + ) + @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index d02583f2f52d15..408d7f45e2fe54 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -45,6 +45,7 @@ from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, + SqlParsingDebugInfo, SqlParsingResult, Urn, create_and_cache_schema_resolver, @@ -344,12 +345,22 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): methods are provided to convert between Looker API field names and raw field names. - SQL parsing is cached for efficiency, and the class is designed to gracefully fall back if the Looker Query API fails. - All lineage extraction is based on the SQL returned by the Looker API, ensuring accurate and up-to-date lineage. + - **Field Splitting**: When the number of fields exceeds the configured threshold (default: 100), fields are automatically + split into multiple API calls to avoid SQL parsing failures and provide partial lineage for large field sets. Why view_to_explore_map is required: The Looker Query API expects the explore name (not the view name) as the "view" parameter in the WriteQuery. In Looker, a view can be referenced by multiple explores, but the API needs any one of the explores to access the view's fields + Field Splitting Behavior: + When a view has more fields than the `field_threshold_for_splitting` configuration value (default: 100), + the class automatically splits the fields into chunks and makes multiple API calls. This helps: + - Avoid SQL parsing failures that occur with very large field sets + - Provide partial column and table lineage even when some field chunks fail + - Improve reliability for views with hundreds of fields + - Maintain performance by processing manageable chunks + Example WriteQuery request (see `_execute_query` for details): { "model": "test_model", @@ -363,7 +374,8 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): The SQL response is then parsed to extract upstream tables and column-level lineage. For further details, see the method-level docstrings, especially: - - `__get_spr`: SQL parsing and lineage extraction workflow + - `__get_spr`: SQL parsing and lineage extraction workflow with field splitting + - `_get_spr_with_field_splitting`: Field splitting logic for large field sets - `_get_sql_write_query`: WriteQuery construction and field enumeration - `_execute_query`: Looker API invocation and SQL retrieval - this only generates the SQL query, does not execute it - Field name translation: `_get_looker_api_field_name` and `_get_field_name_from_looker_api_field_name` @@ -402,6 +414,9 @@ def __get_spr(self) -> SqlParsingResult: 2. Executing the query via the Looker API to get the SQL. 3. Parsing the SQL to extract lineage information. + If the number of fields exceeds the threshold, fields are split into multiple queries + and the results are combined to provide partial lineage. + Returns: SqlParsingResult if successful, otherwise None. Raises: @@ -412,58 +427,289 @@ def __get_spr(self) -> SqlParsingResult: ValueError: If error in parsing SQL for column lineage. """ try: - # Build the WriteQuery for the current view. - sql_query: WriteQuery = self._get_sql_write_query() - - # Execute the query to get the SQL representation from Looker. - sql_response = self._execute_query(sql_query) - - # Parse the SQL to extract lineage information. - spr = create_lineage_sql_parsed_result( - query=sql_response, - default_schema=self.view_context.view_connection.default_schema, - default_db=self.view_context.view_connection.default_db, - platform=self.view_context.view_connection.platform, - platform_instance=self.view_context.view_connection.platform_instance, - env=self.view_context.view_connection.platform_env or self.config.env, - graph=self.ctx.graph, - ) + # Get fields for the current view + explore_name = self.view_to_explore_map.get(self.view_context.name()) + assert explore_name # Happy linter - # Check for errors encountered during table extraction. - table_error = spr.debug_info.table_error - if table_error is not None: - logger.debug( - f"view-name={self.view_context.name()}, table-error={table_error}, sql-query={sql_response}" + view_fields = self._get_fields_from_looker_api(explore_name) + if not view_fields: + view_fields = self._get_fields_from_view_context() + + if not view_fields: + raise ValueError( + f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." ) - self.reporter.report_warning( - title="Table Level Lineage Extraction Failed", - message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", - exc=table_error, + + if len(view_fields) > self.config.field_threshold_for_splitting: + logger.info( + f"View '{self.view_context.name()}' has {len(view_fields)} fields, " + f"exceeding threshold of {self.config.field_threshold_for_splitting}. Splitting into multiple queries for partial lineage." + ) + return self._get_spr_with_field_splitting(view_fields, explore_name) + else: + # Use the original single-query approach + sql_query: WriteQuery = self._get_sql_write_query_with_fields( + view_fields, explore_name ) + sql_response = self._execute_query(sql_query) + return self._parse_sql_response( + sql_response, + allow_partial=self.config.allow_partial_lineage_results, + ) + + except Exception: + # Reraise the exception to allow higher-level handling. + raise + + def _get_sql_write_query_with_fields( + self, view_fields: List[str], explore_name: str + ) -> WriteQuery: + """ + Constructs a WriteQuery object with the provided fields. + + Args: + view_fields: List of field names in Looker API format + explore_name: The explore name to use in the query + + Returns: + WriteQuery: The WriteQuery object + """ + return WriteQuery( + model=self.looker_view_id_cache.model_name, + view=explore_name, + fields=view_fields, + filters={}, + limit="1", + ) + + def _parse_sql_response( + self, sql_response: str, allow_partial: bool = True + ) -> SqlParsingResult: + """ + Parse SQL response to extract lineage information. + + Args: + sql_response: The SQL string returned by the Looker API + allow_partial: Whether to allow partial results when there are parsing errors + + Returns: + SqlParsingResult: The parsed result with lineage information + """ + spr = create_lineage_sql_parsed_result( + query=sql_response, + default_schema=self.view_context.view_connection.default_schema, + default_db=self.view_context.view_connection.default_db, + platform=self.view_context.view_connection.platform, + platform_instance=self.view_context.view_connection.platform_instance, + env=self.view_context.view_connection.platform_env or self.config.env, + graph=self.ctx.graph, + ) + + # Check for errors encountered during table extraction. + table_error = spr.debug_info.table_error + if table_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, table-error={table_error}, sql-query={sql_response}" + ) + self.reporter.report_warning( + title="Table Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=table_error, + ) + if not allow_partial: raise ValueError( f"Error in parsing SQL for upstream tables: {table_error}" ) + else: + logger.warning( + f"Allowing partial results despite table parsing error for view '{self.view_context.name()}'" + ) + + column_error = spr.debug_info.column_error + if column_error is not None: + logger.debug( + f"view-name={self.view_context.name()}, column-error={column_error}, sql-query={sql_response}" + ) + self.reporter.report_warning( + title="Column Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=column_error, + ) + if not allow_partial: + raise ValueError( + f"Error in parsing SQL for column lineage: {column_error}" + ) + else: + logger.warning( + f"Allowing partial results despite column parsing error for view '{self.view_context.name()}'" + ) + + return spr + + def _get_spr_with_field_splitting( + self, view_fields: List[str], explore_name: str + ) -> SqlParsingResult: + """ + Handle field splitting when the number of fields exceeds the threshold. + Splits fields into chunks and combines the SQL parsing results. + + Args: + view_fields: List of all field names + explore_name: The explore name to use in queries + + Returns: + SqlParsingResult: Combined result from multiple queries + """ - column_error = spr.debug_info.column_error - if column_error is not None: + # Split fields into chunks + field_chunks = [ + view_fields[i : i + self.config.field_threshold_for_splitting] + for i in range( + 0, len(view_fields), self.config.field_threshold_for_splitting + ) + ] + + logger.info( + f"Split {len(view_fields)} fields into {len(field_chunks)} chunks for view '{self.view_context.name()}'" + ) + + all_in_tables = set() + all_column_lineage = [] + combined_debug_info = None + successful_queries = 0 + failed_queries = 0 + + for chunk_idx, field_chunk in enumerate(field_chunks): + try: logger.debug( - f"view-name={self.view_context.name()}, column-error={column_error}, sql-query={sql_response}" + f"Processing field chunk {chunk_idx + 1}/{len(field_chunks)} with {len(field_chunk)} fields " + f"for view '{self.view_context.name()}'" ) - self.reporter.report_warning( - title="Column Level Lineage Extraction Failed", - message="Error in parsing derived sql", - context=f"View-name: {self.view_context.name()}", - exc=column_error, + + # Create and execute query for this chunk + sql_query = self._get_sql_write_query_with_fields( + field_chunk, explore_name ) - raise ValueError( - f"Error in parsing SQL for column lineage: {column_error}" + sql_response = self._execute_query(sql_query) + + # Parse the SQL response with partial results allowed + spr = self._parse_sql_response( + sql_response, self.config.allow_partial_lineage_results ) - return spr - except Exception: - # Reraise the exception to allow higher-level handling. - raise + # Combine results + all_in_tables.update(spr.in_tables) + if spr.column_lineage: + all_column_lineage.extend(spr.column_lineage) + + # Use debug info from the first successful query + if combined_debug_info is None: + combined_debug_info = spr.debug_info + + successful_queries += 1 + + except Exception as e: + failed_queries += 1 + logger.warning( + f"Failed to process field chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" + ) + self.reporter.report_warning( + title="Field Chunk Processing Failed", + message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", + context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}/{len(field_chunks)}", + exc=e, + ) + + if self.config.allow_partial_lineage_results: + continue + + # If partial lineage is not allowed, raise the exception + raise + + if not successful_queries or not all_in_tables or not all_column_lineage: + logger.debug(successful_queries, all_in_tables, all_column_lineage) + raise ValueError( + f"All field chunks failed for view '{self.view_context.name()}'. " + f"Total chunks: {len(field_chunks)}, Failed: {failed_queries}" + ) + + # Create combined result + combined_result = SqlParsingResult( + in_tables=list(all_in_tables), + out_tables=[], # No output tables for upstream lineage + column_lineage=all_column_lineage if all_column_lineage else None, + debug_info=combined_debug_info or SqlParsingDebugInfo(), + ) + + logger.info( + f"Successfully combined results for view '{self.view_context.name()}': " + f"{len(all_in_tables)} upstream tables, {len(all_column_lineage)} column lineages" + ) + + # Report field splitting statistics + self._report_field_splitting_stats( + total_fields=len(view_fields), + total_chunks=len(field_chunks), + successful_queries=successful_queries, + failed_queries=failed_queries, + upstream_tables=len(all_in_tables), + column_lineages=len(all_column_lineage), + ) + + return combined_result + + def _report_field_splitting_stats( + self, + total_fields: int, + total_chunks: int, + successful_queries: int, + failed_queries: int, + upstream_tables: int, + column_lineages: int, + ) -> None: + """ + Report statistics about field splitting processing. + + Args: + total_fields: Total number of fields processed + total_chunks: Number of field chunks created + successful_queries: Number of successful API queries + failed_queries: Number of failed API queries + upstream_tables: Number of upstream tables found + column_lineages: Number of column lineages found + """ + # Use different reporting levels based on success rate + success_rate = (successful_queries / total_chunks) * 100 + if success_rate == 100: + # All chunks succeeded + self.reporter.report_warning( + title="Field Splitting Statistics - Complete Success", + message=f"Field splitting completed successfully for view '{self.view_context.name()}': " + f"Total fields: {total_fields}, Chunks: {total_chunks}, " + f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + ) + elif success_rate > 0: + # Partial success + self.reporter.report_warning( + title="Field Splitting Statistics - Partial Success", + message=f"Field splitting completed with partial results for view '{self.view_context.name()}': " + f"Total fields: {total_fields}, Chunks: {total_chunks}, " + f"Successful: {successful_queries}, Failed: {failed_queries}, " + f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + ) + else: + # Complete failure + self.reporter.report_warning( + title="Field Splitting Statistics - Complete Failure", + message=f"Field splitting failed completely for view '{self.view_context.name()}': " + f"Total fields: {total_fields}, Chunks: {total_chunks}, " + f"All {failed_queries} chunks failed", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + ) def _get_time_dim_group_field_name(self, dim_group: dict) -> str: """ @@ -730,57 +976,6 @@ def _get_fields_from_view_context(self) -> List[str]: return view_fields - def _get_sql_write_query(self) -> WriteQuery: - """ - Constructs a WriteQuery object to obtain the SQL representation of the current Looker view. - - This method now uses the Looker API to get explore details directly, providing more comprehensive - field information compared to relying solely on view context. It falls back to view context - if API calls fail. - - The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery. - This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter. - - Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions - - Returns: - WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None. - - Raises: - ValueError: If no fields are found for the view. - """ - - # Use explore name from view_to_explore_map if available - # explore_name is always present in the view_to_explore_map because of the check in view_upstream.create_view_upstream - explore_name = self.view_to_explore_map.get(self.view_context.name()) - assert explore_name # Happy linter - - # Try to get fields from Looker API first for more comprehensive information - view_fields = self._get_fields_from_looker_api(explore_name) - - # Fallback to view context if API didn't provide fields or failed - if not view_fields: - view_fields = self._get_fields_from_view_context() - - if not view_fields: - raise ValueError( - f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." - ) - - logger.debug( - f"Final field list for view '{self.view_context.name()}': {view_fields}" - ) - - # Construct and return the WriteQuery object. - # The 'limit' is set to "1" as the query is only used to obtain SQL, not to fetch data. - return WriteQuery( - model=self.looker_view_id_cache.model_name, - view=explore_name, - fields=view_fields, - filters={}, - limit="1", - ) - def _execute_query(self, query: WriteQuery) -> str: """ Executes a Looker SQL query using the Looker API and returns the SQL string. From f525a1cfaf2f38175b652f89e9b5035611a07dd0 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 16:25:41 -0700 Subject: [PATCH 09/17] feat(looker): add method to retrieve Looker API field names and enhance lineage matching --- .../ingestion/source/looker/view_upstream.py | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 408d7f45e2fe54..5c5fa8924cbc2e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -1086,6 +1086,24 @@ def _get_looker_api_field_name(self, field_name: str) -> str: """ return f"{self.view_context.name()}.{field_name}" + def _get_looker_api_field_names(self, field_name: str) -> List[str]: + """ + Translate the field name to the looker api field names + + Example: + pk -> purchases.pk + + When we do chucking of fields, we need to handle cases like: + pk -> purchases_pk + + Note: cases like pk -> purchases_pk_1 are handled with startswith to cover all possible variants + + """ + return [ + self._get_looker_api_field_name(field_name), + f"{self.view_context.name()}_{field_name}", + ] + def _get_field_name_from_looker_api_field_name( self, looker_api_field_name: str ) -> str: @@ -1138,12 +1156,16 @@ def get_upstream_column_ref( f"view-name={self.view_context.name()}, field-name={field_name}, field-type={field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE)}" ) - field_api_name = self._get_looker_api_field_name(field_name).lower() + field_api_names = self._get_looker_api_field_names(field_name) + field_api_names = [name.lower() for name in field_api_names] upstream_refs: List[ColumnRef] = [] for lineage in spr.column_lineage: - if lineage.downstream.column.lower() == field_api_name: + if any( + lineage.downstream.column.lower().startswith(name) + for name in field_api_names + ): for upstream in lineage.upstreams: upstream_refs.append( ColumnRef(table=upstream.table, column=upstream.column) From 10f3aab0b9b937717a3b021625188e48b2cce228 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 17:40:40 -0700 Subject: [PATCH 10/17] feat(looker): implement individual field fallback for enhanced error handling --- .../ingestion/source/looker/lookml_config.py | 7 + .../ingestion/source/looker/view_upstream.py | 176 ++++++++++++++++-- 2 files changed, 171 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index c4d28f97206d94..1bc4f3f23c556f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -224,6 +224,13 @@ class LookMLSourceConfig( "and ensures some lineage information is available rather than complete failure.", ) + enable_individual_field_fallback: bool = Field( + True, + description="When enabled, if a field chunk fails, the system will attempt to process each field " + "individually to maximize information and isolate problematic fields. This helps identify " + "which specific fields are causing issues while still getting lineage for working fields.", + ) + @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 5c5fa8924cbc2e..277f1b7d30ab02 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -347,6 +347,8 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): - All lineage extraction is based on the SQL returned by the Looker API, ensuring accurate and up-to-date lineage. - **Field Splitting**: When the number of fields exceeds the configured threshold (default: 100), fields are automatically split into multiple API calls to avoid SQL parsing failures and provide partial lineage for large field sets. + - **Individual Field Fallback**: When a field chunk fails, the system can attempt to process each field individually + to maximize information and isolate problematic fields, helping identify which specific fields cause issues. Why view_to_explore_map is required: The Looker Query API expects the explore name (not the view name) as the "view" parameter in the WriteQuery. @@ -361,6 +363,14 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): - Improve reliability for views with hundreds of fields - Maintain performance by processing manageable chunks + Individual Field Fallback: + When `enable_individual_field_fallback` is enabled (default: True), if a field chunk fails, + the system attempts to process each field individually. This helps: + - Maximize lineage information by processing working fields even when chunks fail + - Isolate problematic fields that cause SQL parsing or API errors + - Identify specific fields that need attention or exclusion + - Provide detailed reporting about which fields are problematic + Example WriteQuery request (see `_execute_query` for details): { "model": "test_model", @@ -376,6 +386,7 @@ class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): For further details, see the method-level docstrings, especially: - `__get_spr`: SQL parsing and lineage extraction workflow with field splitting - `_get_spr_with_field_splitting`: Field splitting logic for large field sets + - `_process_individual_fields`: Individual field processing when chunks fail - `_get_sql_write_query`: WriteQuery construction and field enumeration - `_execute_query`: Looker API invocation and SQL retrieval - this only generates the SQL query, does not execute it - Field name translation: `_get_looker_api_field_name` and `_get_field_name_from_looker_api_field_name` @@ -548,6 +559,95 @@ def _parse_sql_response( return spr + def _process_individual_fields( + self, field_chunk: List[str], explore_name: str + ) -> tuple[set, list, int, int]: + """ + Process individual fields when a chunk fails to isolate problematic fields. + + Args: + field_chunk: List of field names that failed as a chunk + explore_name: The explore name to use in queries + + Returns: + Tuple of (successful_tables, successful_column_lineage, successful_count, failed_count) + """ + successful_tables = set() + successful_column_lineage = [] + successful_count = 0 + failed_count = 0 + problematic_fields = [] + + logger.info( + f"Processing {len(field_chunk)} fields individually for view '{self.view_context.name()}' " + f"to isolate problematic fields" + ) + + for field_name in field_chunk: + try: + logger.debug( + f"Processing individual field '{field_name}' for view '{self.view_context.name()}'" + ) + + # Create query with single field + sql_query = self._get_sql_write_query_with_fields( + [field_name], explore_name + ) + sql_response = self._execute_query(sql_query) + + # Parse the SQL response with partial results allowed + spr = self._parse_sql_response( + sql_response, self.config.allow_partial_lineage_results + ) + + # Collect results + successful_tables.update(spr.in_tables) + if spr.column_lineage: + successful_column_lineage.extend(spr.column_lineage) + + successful_count += 1 + logger.debug( + f"Successfully processed individual field '{field_name}' for view '{self.view_context.name()}'" + ) + + except Exception as e: + failed_count += 1 + problematic_fields.append(field_name) + logger.warning( + f"Failed to process individual field '{field_name}' for view '{self.view_context.name()}'': {e}" + ) + self.reporter.report_warning( + title="Individual Field Processing Failed", + message=f"Failed to process individual field '{field_name}'", + context=f"View-name: {self.view_context.name()}, Field: {field_name}", + exc=e, + ) + continue + + if problematic_fields: + logger.warning( + f"Identified {len(problematic_fields)} problematic fields for view '{self.view_context.name()}': " + f"{problematic_fields[:5]}{'...' if len(problematic_fields) > 5 else ''}" + ) + self.reporter.report_warning( + title="Problematic Fields Identified", + message=f"Identified {len(problematic_fields)} problematic fields that cannot be processed: " + f"{', '.join(problematic_fields[:10])}{'...' if len(problematic_fields) > 10 else ''}", + context=f"View-name: {self.view_context.name()}, Total problematic: {len(problematic_fields)}", + ) + + logger.info( + f"Individual field processing completed for view '{self.view_context.name()}: " + f"{successful_count} successful, {failed_count} failed" + ) + + return ( + successful_tables, + successful_column_lineage, + successful_count, + failed_count, + ) + def _get_spr_with_field_splitting( self, view_fields: List[str], explore_name: str ) -> SqlParsingResult: @@ -622,14 +722,54 @@ def _get_spr_with_field_splitting( exc=e, ) - if self.config.allow_partial_lineage_results: - continue + # Try individual field processing if enabled + if self.config.enable_individual_field_fallback: + logger.info( + f"Attempting individual field processing for failed chunk {chunk_idx + 1} " + f"with {len(field_chunk)} fields for view '{self.view_context.name()}'" + ) + + try: + ( + individual_tables, + individual_lineage, + individual_success, + individual_failed, + ) = self._process_individual_fields(field_chunk, explore_name) + + # Add individual field results to combined results + all_in_tables.update(individual_tables) + if individual_lineage: + all_column_lineage.extend(individual_lineage) + + # Update counters + successful_queries += individual_success + failed_queries += ( + individual_failed - 1 + ) # Subtract 1 because we already counted the chunk failure + + logger.info( + f"Individual field processing for chunk {chunk_idx + 1} completed: " + f"{individual_success} successful, {individual_failed} failed" + ) - # If partial lineage is not allowed, raise the exception - raise + except Exception as individual_error: + logger.error( + f"Individual field processing also failed for chunk {chunk_idx + 1}: {individual_error}" + ) + if not self.config.allow_partial_lineage_results: + raise + else: + if not self.config.allow_partial_lineage_results: + raise if not successful_queries or not all_in_tables or not all_column_lineage: - logger.debug(successful_queries, all_in_tables, all_column_lineage) + logger.debug( + f"All field chunks failed for view '{self.view_context.name()}'. Total chunks: {len(field_chunks)}, Failed: {failed_queries}", + successful_queries, + all_in_tables, + all_column_lineage, + ) raise ValueError( f"All field chunks failed for view '{self.view_context.name()}'. " f"Total chunks: {len(field_chunks)}, Failed: {failed_queries}" @@ -645,7 +785,8 @@ def _get_spr_with_field_splitting( logger.info( f"Successfully combined results for view '{self.view_context.name()}': " - f"{len(all_in_tables)} upstream tables, {len(all_column_lineage)} column lineages" + f"{len(all_in_tables)} upstream tables, {len(all_column_lineage)} column lineages. " + f"Success rate: {(successful_queries / (successful_queries + failed_queries)) * 100:.1f}%" ) # Report field splitting statistics @@ -656,6 +797,7 @@ def _get_spr_with_field_splitting( failed_queries=failed_queries, upstream_tables=len(all_in_tables), column_lineages=len(all_column_lineage), + individual_field_fallback_enabled=self.config.enable_individual_field_fallback, ) return combined_result @@ -668,6 +810,7 @@ def _report_field_splitting_stats( failed_queries: int, upstream_tables: int, column_lineages: int, + individual_field_fallback_enabled: bool = False, ) -> None: """ Report statistics about field splitting processing. @@ -679,16 +822,25 @@ def _report_field_splitting_stats( failed_queries: Number of failed API queries upstream_tables: Number of upstream tables found column_lineages: Number of column lineages found + individual_field_fallback_enabled: Whether individual field fallback was enabled """ # Use different reporting levels based on success rate - success_rate = (successful_queries / total_chunks) * 100 + total_queries = successful_queries + failed_queries + success_rate = ( + (successful_queries / total_queries) * 100 if total_queries > 0 else 0 + ) + + fallback_info = "" + if individual_field_fallback_enabled: + fallback_info = " (Individual field fallback enabled)" + if success_rate == 100: - # All chunks succeeded + # All queries succeeded self.reporter.report_warning( title="Field Splitting Statistics - Complete Success", message=f"Field splitting completed successfully for view '{self.view_context.name()}': " f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}", + f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", ) elif success_rate > 0: @@ -697,8 +849,8 @@ def _report_field_splitting_stats( title="Field Splitting Statistics - Partial Success", message=f"Field splitting completed with partial results for view '{self.view_context.name()}': " f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"Successful: {successful_queries}, Failed: {failed_queries}, " - f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}", + f"Successful queries: {successful_queries}, Failed queries: {failed_queries}, " + f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", ) else: @@ -707,7 +859,7 @@ def _report_field_splitting_stats( title="Field Splitting Statistics - Complete Failure", message=f"Field splitting failed completely for view '{self.view_context.name()}': " f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"All {failed_queries} chunks failed", + f"All {failed_queries} queries failed{fallback_info}", context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", ) From ea3b6fe311682ae1d97c6513a7a0bbb42c2b76f4 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 18:02:00 -0700 Subject: [PATCH 11/17] feat(looker): enhance SQL response parsing to include error handling --- .../ingestion/source/looker/view_upstream.py | 69 ++++++++++--------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 277f1b7d30ab02..c0db609a531492 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -463,10 +463,11 @@ def __get_spr(self) -> SqlParsingResult: view_fields, explore_name ) sql_response = self._execute_query(sql_query) - return self._parse_sql_response( + spr, has_errors = self._parse_sql_response( sql_response, allow_partial=self.config.allow_partial_lineage_results, ) + return spr except Exception: # Reraise the exception to allow higher-level handling. @@ -495,7 +496,7 @@ def _get_sql_write_query_with_fields( def _parse_sql_response( self, sql_response: str, allow_partial: bool = True - ) -> SqlParsingResult: + ) -> tuple[SqlParsingResult, bool]: """ Parse SQL response to extract lineage information. @@ -504,7 +505,7 @@ def _parse_sql_response( allow_partial: Whether to allow partial results when there are parsing errors Returns: - SqlParsingResult: The parsed result with lineage information + Tuple of (SqlParsingResult, has_errors) where has_errors indicates if there were parsing issues """ spr = create_lineage_sql_parsed_result( query=sql_response, @@ -516,9 +517,12 @@ def _parse_sql_response( graph=self.ctx.graph, ) + has_errors = False + # Check for errors encountered during table extraction. table_error = spr.debug_info.table_error if table_error is not None: + has_errors = True logger.debug( f"view-name={self.view_context.name()}, table-error={table_error}, sql-query={sql_response}" ) @@ -539,6 +543,7 @@ def _parse_sql_response( column_error = spr.debug_info.column_error if column_error is not None: + has_errors = True logger.debug( f"view-name={self.view_context.name()}, column-error={column_error}, sql-query={sql_response}" ) @@ -557,7 +562,7 @@ def _parse_sql_response( f"Allowing partial results despite column parsing error for view '{self.view_context.name()}'" ) - return spr + return spr, has_errors def _process_individual_fields( self, field_chunk: List[str], explore_name: str @@ -596,7 +601,7 @@ def _process_individual_fields( sql_response = self._execute_query(sql_query) # Parse the SQL response with partial results allowed - spr = self._parse_sql_response( + spr, has_errors = self._parse_sql_response( sql_response, self.config.allow_partial_lineage_results ) @@ -695,7 +700,7 @@ def _get_spr_with_field_splitting( sql_response = self._execute_query(sql_query) # Parse the SQL response with partial results allowed - spr = self._parse_sql_response( + spr, has_errors = self._parse_sql_response( sql_response, self.config.allow_partial_lineage_results ) @@ -708,27 +713,11 @@ def _get_spr_with_field_splitting( if combined_debug_info is None: combined_debug_info = spr.debug_info - successful_queries += 1 - - except Exception as e: - failed_queries += 1 - logger.warning( - f"Failed to process field chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" - ) - self.reporter.report_warning( - title="Field Chunk Processing Failed", - message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", - context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}/{len(field_chunks)}", - exc=e, - ) - - # Try individual field processing if enabled - if self.config.enable_individual_field_fallback: - logger.info( - f"Attempting individual field processing for failed chunk {chunk_idx + 1} " - f"with {len(field_chunk)} fields for view '{self.view_context.name()}'" + # Check if we should trigger individual field fallback due to parsing errors + if has_errors and self.config.enable_individual_field_fallback: + logger.warning( + f"Chunk {chunk_idx + 1} had parsing errors, attempting individual field processing for view '{self.view_context.name()}'" ) - try: ( individual_tables, @@ -744,24 +733,40 @@ def _get_spr_with_field_splitting( # Update counters successful_queries += individual_success - failed_queries += ( - individual_failed - 1 - ) # Subtract 1 because we already counted the chunk failure + failed_queries += individual_failed logger.info( f"Individual field processing for chunk {chunk_idx + 1} completed: " f"{individual_success} successful, {individual_failed} failed" ) - except Exception as individual_error: logger.error( f"Individual field processing also failed for chunk {chunk_idx + 1}: {individual_error}" ) + failed_queries += 1 if not self.config.allow_partial_lineage_results: raise else: - if not self.config.allow_partial_lineage_results: - raise + if has_errors: + failed_queries += 1 + if not self.config.allow_partial_lineage_results: + raise ValueError( + f"SQL parsing errors in chunk {chunk_idx + 1}" + ) + else: + successful_queries += 1 + + except Exception as e: + failed_queries += 1 + logger.warning( + f"Failed to process field chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" + ) + self.reporter.report_warning( + title="Field Chunk Processing Failed", + message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", + context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}/{len(field_chunks)}", + exc=e, + ) if not successful_queries or not all_in_tables or not all_column_lineage: logger.debug( From c67f928069d744af67b0394ad53f3e21b8e6dc8a Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 18:18:56 -0700 Subject: [PATCH 12/17] feat(looker): enforce strict SQL response parsing to improve error handling for individual fields --- .../src/datahub/ingestion/source/looker/view_upstream.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index c0db609a531492..5fbd7356e82628 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -600,9 +600,10 @@ def _process_individual_fields( ) sql_response = self._execute_query(sql_query) - # Parse the SQL response with partial results allowed + # Parse the SQL response without partial results to catch parsing errors + # We want to raise exceptions for individual field processing to properly handle failures spr, has_errors = self._parse_sql_response( - sql_response, self.config.allow_partial_lineage_results + sql_response, allow_partial=False ) # Collect results From 272087f771d385724e1d9864e8682b348bde7218 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Tue, 28 Oct 2025 22:59:57 -0700 Subject: [PATCH 13/17] feat(looker): implement parallel processing for field and chunk handling to improve performance and error isolation --- .../ingestion/source/looker/view_upstream.py | 302 +++++++++++------- 1 file changed, 185 insertions(+), 117 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 5fbd7356e82628..32071c6be92e58 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -1,9 +1,10 @@ import logging import re from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from functools import lru_cache -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from looker_sdk.sdk.api40.models import ( LookmlModelExplore, @@ -564,12 +565,87 @@ def _parse_sql_response( return spr, has_errors + def _process_field_chunk( + self, field_chunk: List[str], explore_name: str, chunk_idx: int + ) -> Tuple[SqlParsingResult, bool, int, int]: + """ + Process a single field chunk and return results. + + Returns: + Tuple of (SqlParsingResult, has_errors, successful_queries, failed_queries) + """ + try: + sql_query = self._get_sql_write_query_with_fields(field_chunk, explore_name) + sql_response = self._execute_query(sql_query) + spr, has_errors = self._parse_sql_response( + sql_response, self.config.allow_partial_lineage_results + ) + + # Check if we should trigger individual field fallback due to parsing errors + if has_errors and self.config.enable_individual_field_fallback: + logger.warning( + f"Chunk {chunk_idx + 1} had parsing errors, attempting individual field processing for view '{self.view_context.name()}'" + ) + try: + ( + individual_tables, + individual_lineage, + individual_success, + individual_failed, + ) = self._process_individual_fields(field_chunk, explore_name) + + # Combine individual field results with chunk results + spr.in_tables.extend(list(individual_tables)) + if individual_lineage and spr.column_lineage: + spr.column_lineage.extend(list(individual_lineage)) + elif individual_lineage: + spr.column_lineage = list(individual_lineage) + + return spr, False, individual_success, individual_failed + + except Exception as individual_error: + logger.error( + f"Individual field processing also failed for chunk {chunk_idx + 1}: {individual_error}" + ) + return spr, has_errors, 0, 1 + else: + if has_errors: + return spr, has_errors, 0, 1 + else: + return spr, has_errors, 1, 0 + + except Exception as e: + logger.warning( + f"Failed to process field chunk {chunk_idx + 1} for view '{self.view_context.name()}': {e}" + ) + self.reporter.report_warning( + title="Field Chunk Processing Failed", + message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", + context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}", + exc=e, + ) + + return ( + SqlParsingResult( + in_tables=[], + out_tables=[], + column_lineage=None, + debug_info=SqlParsingDebugInfo(), + ), + True, + 0, + 1, + ) + def _process_individual_fields( self, field_chunk: List[str], explore_name: str ) -> tuple[set, list, int, int]: """ Process individual fields when a chunk fails to isolate problematic fields. + This method processes each field individually in parallel to maximize information + extraction and isolate problematic fields that cause SQL parsing or API errors. + Args: field_chunk: List of field names that failed as a chunk explore_name: The explore name to use in queries @@ -584,51 +660,41 @@ def _process_individual_fields( problematic_fields = [] logger.info( - f"Processing {len(field_chunk)} fields individually for view '{self.view_context.name()}' " + f"Processing {len(field_chunk)} fields individually in parallel for view '{self.view_context.name()}' " f"to isolate problematic fields" ) - for field_name in field_chunk: - try: - logger.debug( - f"Processing individual field '{field_name}' for view '{self.view_context.name()}'" - ) - - # Create query with single field - sql_query = self._get_sql_write_query_with_fields( - [field_name], explore_name - ) - sql_response = self._execute_query(sql_query) - - # Parse the SQL response without partial results to catch parsing errors - # We want to raise exceptions for individual field processing to properly handle failures - spr, has_errors = self._parse_sql_response( - sql_response, allow_partial=False - ) - - # Collect results - successful_tables.update(spr.in_tables) - if spr.column_lineage: - successful_column_lineage.extend(spr.column_lineage) + # Process fields in parallel + with ThreadPoolExecutor(max_workers=min(len(field_chunk), 10)) as executor: + # Submit all field processing tasks + future_to_field = { + executor.submit( + self._process_individual_field, field_name, explore_name + ): field_name + for field_name in field_chunk + } - successful_count += 1 - logger.debug( - f"Successfully processed individual field '{field_name}' for view '{self.view_context.name()}'" - ) + # Collect results as they complete + for future in as_completed(future_to_field): + field_name = future_to_field[future] + try: + tables, lineage, success = future.result() + + if success: + successful_tables.update(tables) + if lineage: + successful_column_lineage.extend(lineage) + successful_count += 1 + else: + failed_count += 1 + problematic_fields.append(field_name) - except Exception as e: - failed_count += 1 - problematic_fields.append(field_name) - logger.warning( - f"Failed to process individual field '{field_name}' for view '{self.view_context.name()}'': {e}" - ) - self.reporter.report_warning( - title="Individual Field Processing Failed", - message=f"Failed to process individual field '{field_name}'", - context=f"View-name: {self.view_context.name()}, Field: {field_name}", - exc=e, - ) - continue + except Exception as e: + failed_count += 1 + problematic_fields.append(field_name) + logger.warning( + f"Unexpected error processing individual field '{field_name}' for view '{self.view_context.name()}'': {e}" + ) if problematic_fields: logger.warning( @@ -654,12 +720,55 @@ def _process_individual_fields( failed_count, ) + def _process_individual_field( + self, field_name: str, explore_name: str + ) -> Tuple[set, list, bool]: + """ + Process a single individual field and return results. + + Returns: + Tuple of (tables, lineage, success) + """ + try: + sql_query = self._get_sql_write_query_with_fields( + [field_name], explore_name + ) + sql_response = self._execute_query(sql_query) + + # Parse the SQL response without partial results to catch parsing errors + # We want to raise exceptions for individual field processing to properly handle failures + spr, has_errors = self._parse_sql_response( + sql_response, allow_partial=False + ) + + # Collect results + tables = set(spr.in_tables) if spr.in_tables else set() + lineage = list(spr.column_lineage) if spr.column_lineage else [] + + logger.debug( + f"Successfully processed individual field '{field_name}' for view '{self.view_context.name()}'" + ) + + return tables, lineage, True + + except Exception as e: + logger.warning( + f"Failed to process individual field '{field_name}' for view '{self.view_context.name()}'': {e}" + ) + self.reporter.report_warning( + title="Individual Field Processing Failed", + message=f"Failed to process individual field '{field_name}'", + context=f"View-name: {self.view_context.name()}, Field: {field_name}", + exc=e, + ) + return set(), [], False + def _get_spr_with_field_splitting( self, view_fields: List[str], explore_name: str ) -> SqlParsingResult: """ Handle field splitting when the number of fields exceeds the threshold. - Splits fields into chunks and combines the SQL parsing results. + Splits fields into chunks and processes them in parallel for better performance. Args: view_fields: List of all field names @@ -678,7 +787,8 @@ def _get_spr_with_field_splitting( ] logger.info( - f"Split {len(view_fields)} fields into {len(field_chunks)} chunks for view '{self.view_context.name()}'" + f"Split {len(view_fields)} fields into {len(field_chunks)} chunks for view '{self.view_context.name()}' " + f"and processing them in parallel" ) all_in_tables = set() @@ -687,87 +797,45 @@ def _get_spr_with_field_splitting( successful_queries = 0 failed_queries = 0 - for chunk_idx, field_chunk in enumerate(field_chunks): - try: - logger.debug( - f"Processing field chunk {chunk_idx + 1}/{len(field_chunks)} with {len(field_chunk)} fields " - f"for view '{self.view_context.name()}'" - ) + # Process chunks in parallel + with ThreadPoolExecutor(max_workers=min(len(field_chunks), 10)) as executor: + # Submit all chunk processing tasks + future_to_chunk = { + executor.submit( + self._process_field_chunk, field_chunk, explore_name, chunk_idx + ): chunk_idx + for chunk_idx, field_chunk in enumerate(field_chunks) + } - # Create and execute query for this chunk - sql_query = self._get_sql_write_query_with_fields( - field_chunk, explore_name - ) - sql_response = self._execute_query(sql_query) + # Collect results as they complete + for future in as_completed(future_to_chunk): + chunk_idx = future_to_chunk[future] + try: + spr, has_errors, chunk_success, chunk_failed = future.result() - # Parse the SQL response with partial results allowed - spr, has_errors = self._parse_sql_response( - sql_response, self.config.allow_partial_lineage_results - ) + # Combine results + all_in_tables.update(spr.in_tables) + if spr.column_lineage: + all_column_lineage.extend(spr.column_lineage) - # Combine results - all_in_tables.update(spr.in_tables) - if spr.column_lineage: - all_column_lineage.extend(spr.column_lineage) + # Use debug info from the first successful query + if combined_debug_info is None and not has_errors: + combined_debug_info = spr.debug_info - # Use debug info from the first successful query - if combined_debug_info is None: - combined_debug_info = spr.debug_info + # Update counters + successful_queries += chunk_success + failed_queries += chunk_failed - # Check if we should trigger individual field fallback due to parsing errors - if has_errors and self.config.enable_individual_field_fallback: - logger.warning( - f"Chunk {chunk_idx + 1} had parsing errors, attempting individual field processing for view '{self.view_context.name()}'" + logger.debug( + f"Completed processing chunk {chunk_idx + 1}/{len(field_chunks)}: " + f"{chunk_success} successful, {chunk_failed} failed" ) - try: - ( - individual_tables, - individual_lineage, - individual_success, - individual_failed, - ) = self._process_individual_fields(field_chunk, explore_name) - - # Add individual field results to combined results - all_in_tables.update(individual_tables) - if individual_lineage: - all_column_lineage.extend(individual_lineage) - - # Update counters - successful_queries += individual_success - failed_queries += individual_failed - - logger.info( - f"Individual field processing for chunk {chunk_idx + 1} completed: " - f"{individual_success} successful, {individual_failed} failed" - ) - except Exception as individual_error: - logger.error( - f"Individual field processing also failed for chunk {chunk_idx + 1}: {individual_error}" - ) - failed_queries += 1 - if not self.config.allow_partial_lineage_results: - raise - else: - if has_errors: - failed_queries += 1 - if not self.config.allow_partial_lineage_results: - raise ValueError( - f"SQL parsing errors in chunk {chunk_idx + 1}" - ) - else: - successful_queries += 1 - except Exception as e: - failed_queries += 1 - logger.warning( - f"Failed to process field chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" - ) - self.reporter.report_warning( - title="Field Chunk Processing Failed", - message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", - context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}/{len(field_chunks)}", - exc=e, - ) + except Exception as e: + failed_queries += 1 + logger.warning( + f"Unexpected error processing chunk {chunk_idx + 1}/{len(field_chunks)} for view '{self.view_context.name()}': {e}" + ) if not successful_queries or not all_in_tables or not all_column_lineage: logger.debug( From 5ec99e594cfc9a912518961d1fd68f4127fdce07 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Wed, 29 Oct 2025 19:09:43 -0700 Subject: [PATCH 14/17] feat(looker): add configurable max workers for parallel processing to enhance performance --- .../src/datahub/ingestion/source/looker/lookml_config.py | 6 ++++++ .../src/datahub/ingestion/source/looker/view_upstream.py | 8 ++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index 1bc4f3f23c556f..a4fcaf7167ae5d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -231,6 +231,12 @@ class LookMLSourceConfig( "which specific fields are causing issues while still getting lineage for working fields.", ) + max_workers_for_parallel_processing: int = Field( + 10, + description="Maximum number of worker threads to use for parallel processing of field chunks and individual fields. " + "Set to 1 to process everything sequentially. Higher values can improve performance but may increase memory usage.", + ) + @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 32071c6be92e58..05b0c68ddaaad8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -665,7 +665,9 @@ def _process_individual_fields( ) # Process fields in parallel - with ThreadPoolExecutor(max_workers=min(len(field_chunk), 10)) as executor: + with ThreadPoolExecutor( + max_workers=self.config.max_workers_for_parallel_processing + ) as executor: # Submit all field processing tasks future_to_field = { executor.submit( @@ -798,7 +800,9 @@ def _get_spr_with_field_splitting( failed_queries = 0 # Process chunks in parallel - with ThreadPoolExecutor(max_workers=min(len(field_chunks), 10)) as executor: + with ThreadPoolExecutor( + max_workers=self.config.max_workers_for_parallel_processing + ) as executor: # Submit all chunk processing tasks future_to_chunk = { executor.submit( From e8a42be5d6d629e3b0466efa692e7200554f89a3 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Wed, 29 Oct 2025 19:20:15 -0700 Subject: [PATCH 15/17] fix(looker): improve warning messages for field processing errors and success statistics --- .../ingestion/source/looker/view_upstream.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 05b0c68ddaaad8..fe4071abe03ffc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -620,8 +620,8 @@ def _process_field_chunk( ) self.reporter.report_warning( title="Field Chunk Processing Failed", - message=f"Failed to process field chunk {chunk_idx + 1} with {len(field_chunk)} fields", - context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}", + message="Failed to process field chunk with fields", + context=f"View-name: {self.view_context.name()}, Chunk: {chunk_idx + 1}, Field count: {len(field_chunk)}", exc=e, ) @@ -701,13 +701,12 @@ def _process_individual_fields( if problematic_fields: logger.warning( f"Identified {len(problematic_fields)} problematic fields for view '{self.view_context.name()}': " - f"{problematic_fields[:5]}{'...' if len(problematic_fields) > 5 else ''}" + f"{problematic_fields}" ) self.reporter.report_warning( title="Problematic Fields Identified", - message=f"Identified {len(problematic_fields)} problematic fields that cannot be processed: " - f"{', '.join(problematic_fields[:10])}{'...' if len(problematic_fields) > 10 else ''}", - context=f"View-name: {self.view_context.name()}, Total problematic: {len(problematic_fields)}", + message="Identified problematic fields that cannot be processed", + context=f"View-name: {self.view_context.name()}, Total problematic: {len(problematic_fields)}, Fields: {', '.join(problematic_fields[:10])}{'...' if len(problematic_fields) > 10 else ''}", ) logger.info( @@ -759,7 +758,7 @@ def _process_individual_field( ) self.reporter.report_warning( title="Individual Field Processing Failed", - message=f"Failed to process individual field '{field_name}'", + message="Failed to process individual field", context=f"View-name: {self.view_context.name()}, Field: {field_name}", exc=e, ) @@ -916,29 +915,22 @@ def _report_field_splitting_stats( # All queries succeeded self.reporter.report_warning( title="Field Splitting Statistics - Complete Success", - message=f"Field splitting completed successfully for view '{self.view_context.name()}': " - f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", - context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + message="Field splitting completed successfully", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", ) elif success_rate > 0: # Partial success self.reporter.report_warning( title="Field Splitting Statistics - Partial Success", - message=f"Field splitting completed with partial results for view '{self.view_context.name()}': " - f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"Successful queries: {successful_queries}, Failed queries: {failed_queries}, " - f"Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", - context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + message="Field splitting completed with partial results", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, Successful queries: {successful_queries}, Failed queries: {failed_queries}, Upstream tables: {upstream_tables}, Column lineages: {column_lineages}{fallback_info}", ) else: # Complete failure self.reporter.report_warning( title="Field Splitting Statistics - Complete Failure", - message=f"Field splitting failed completely for view '{self.view_context.name()}': " - f"Total fields: {total_fields}, Chunks: {total_chunks}, " - f"All {failed_queries} queries failed{fallback_info}", - context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%", + message="Field splitting failed completely", + context=f"View-name: {self.view_context.name()}, Success rate: {success_rate:.1f}%, Total fields: {total_fields}, Chunks: {total_chunks}, All {failed_queries} queries failed{fallback_info}", ) def _get_time_dim_group_field_name(self, dim_group: dict) -> str: @@ -1396,11 +1388,19 @@ def get_upstream_column_ref( lineage.downstream.column.lower().startswith(name) for name in field_api_names ): + # Log if there are no upstreams + if not lineage.upstreams: + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column}, no-upstreams" + ) for upstream in lineage.upstreams: upstream_refs.append( ColumnRef(table=upstream.table, column=upstream.column) ) - + else: + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, no-match" + ) return _drop_hive_dot_from_upstream(upstream_refs) def create_fields(self) -> List[ViewField]: From 07b80ebb5a6957180bc95e8c5f1fe1a8f5ad3a98 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Wed, 29 Oct 2025 23:26:04 -0700 Subject: [PATCH 16/17] Updated debug logs --- .../ingestion/source/looker/view_upstream.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index fe4071abe03ffc..3c5dfc44e652cf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -1382,25 +1382,27 @@ def get_upstream_column_ref( field_api_names = [name.lower() for name in field_api_names] upstream_refs: List[ColumnRef] = [] - + no_cll_found = True for lineage in spr.column_lineage: if any( lineage.downstream.column.lower().startswith(name) for name in field_api_names ): + no_cll_found = False # Log if there are no upstreams if not lineage.upstreams: logger.debug( - f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column}, no-upstreams" + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column.lower()}, lineage-downstream={lineage.downstream}, no-upstreams" ) for upstream in lineage.upstreams: upstream_refs.append( ColumnRef(table=upstream.table, column=upstream.column) ) - else: - logger.debug( - f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, no-match" - ) + + if no_cll_found: + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-api-names={field_api_names}, lineage-downstream-column={lineage.downstream.column.lower()}, lineage-downstream={lineage.downstream}, no-match" + ) return _drop_hive_dot_from_upstream(upstream_refs) def create_fields(self) -> List[ViewField]: From 2d391dbe833ab018d0559b1c32513c46259d3495 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Thu, 30 Oct 2025 00:09:10 -0700 Subject: [PATCH 17/17] Comparing fields from the looker API vs lkml view context parsing --- .../ingestion/source/looker/view_upstream.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 3c5dfc44e652cf..d774073ab176b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -444,6 +444,19 @@ def __get_spr(self) -> SqlParsingResult: assert explore_name # Happy linter view_fields = self._get_fields_from_looker_api(explore_name) + # TODO: Remove this after debugging + logger.debug( + f"view-name={self.view_context.name()}, total-fields-from-looker-api={len(view_fields)}, total-fields-from-view-context={len(self._get_fields_from_view_context())}" + ) + # Compare diff between view fields, if any, print the diff + # Compute set differences in both directions to identify fields missing from Looker API and fields extra in Looker API + view_context_fields = set(self._get_fields_from_view_context()) + view_fields_set = set(view_fields) + missing_from_looker_api = view_context_fields - view_fields_set + extra_in_looker_api = view_fields_set - view_context_fields + logger.debug( + f"view-name={self.view_context.name()}, missing-from-looker-api={missing_from_looker_api}, extra-in-looker-api={extra_in_looker_api}" + ) if not view_fields: view_fields = self._get_fields_from_view_context()