Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,18 @@ def all_lookml_models(self) -> Sequence[LookmlModel]:
transport_options=self.transport_options,
)

def lookml_model_explore(self, model: str, explore_name: str) -> LookmlModelExplore:
def lookml_model_explore(
self,
model: str,
explore_name: str,
fields: Optional[List[str]] = None,
) -> LookmlModelExplore:
self.client_stats.explore_calls += 1
return self.client.lookml_model_explore(
model, explore_name, transport_options=self.transport_options
model,
explore_name,
fields=self.__fields_mapper(fields) if fields else None,
transport_options=self.transport_options,
)

@lru_cache(maxsize=1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ class LookMLSourceConfig(
"All if comments are evaluated to true for configured looker_environment value",
)

field_threshold_for_splitting: int = Field(
100,
description="When the total number of fields returned by Looker API exceeds this threshold, "
"the fields will be split into multiple API calls to avoid SQL parsing failures. "
"This helps provide partial column and table lineage when dealing with large field sets.",
)

allow_partial_lineage_results: bool = Field(
True,
description="When enabled, allows partial lineage results to be returned even when some field chunks "
"fail or when there are SQL parsing errors. This provides better resilience for large field sets "
"and ensures some lineage information is available rather than complete failure.",
)

enable_individual_field_fallback: bool = Field(
True,
description="When enabled, if a field chunk fails, the system will attempt to process each field "
"individually to maximize information and isolate problematic fields. This helps identify "
"which specific fields are causing issues while still getting lineage for working fields.",
)

@validator("connection_to_platform_map", pre=True)
def convert_string_to_connection_def(cls, conn_map):
# Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import pathlib
import tempfile
from collections import OrderedDict
from collections import OrderedDict, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
Expand Down Expand Up @@ -709,10 +709,14 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
# Value: Tuple(model file name, connection name)
view_connection_map: Dict[str, Tuple[str, str]] = {}

# Map of view name to explore name for API-based view lineage
# A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API
# Key: view_name, Value: explore_name
view_to_explore_map: Dict[str, str] = {}
# Map of view name to all possible explores for API-based view lineage
# A view can be referenced by multiple explores, we'll optimize the assignment
# Key: view_name, Value: set of explore_names
view_to_explores: Dict[str, Set[str]] = defaultdict(set)

# Temporary map to keep track of the views in an explore
# Key: explore_name, Value: set of view_names
explore_to_views: Dict[str, Set[str]] = defaultdict(set)

# The ** means "this directory and all subdirectories", and hence should
# include all the files we want.
Expand Down Expand Up @@ -789,8 +793,9 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
for view_name in explore.upstream_views:
if self.source_config.emit_reachable_views_only:
explore_reachable_views.add(view_name.include)
# Build view to explore mapping for API-based view lineage
view_to_explore_map[view_name.include] = explore.name

view_to_explores[view_name.include].add(explore.name)
explore_to_views[explore.name].add(view_name.include)
except Exception as e:
self.reporter.report_warning(
title="Failed to process explores",
Expand All @@ -804,6 +809,16 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
model.connection, set()
)

view_to_explore_map = {}
if view_to_explores and explore_to_views:
view_to_explore_map = self._optimize_views_by_common_explore(
view_to_explores, explore_to_views
)
else:
logger.warning(
f"Either view_to_explores: {view_to_explores} or explore_to_views: {explore_to_views} is empty"
)

project_name = self.get_project_name(model_name)

looker_view_id_cache: LookerViewIdCache = LookerViewIdCache(
Expand Down Expand Up @@ -888,9 +903,7 @@ def get_internal_workunits(self) -> Iterable[Union[MetadataWorkUnit, Entity]]:
config=self.source_config,
ctx=self.ctx,
looker_client=self.looker_client,
view_to_explore_map=view_to_explore_map
if view_to_explore_map
else None,
view_to_explore_map=view_to_explore_map,
)
except Exception as e:
self.reporter.report_warning(
Expand Down Expand Up @@ -1040,5 +1053,61 @@ def report_skipped_unreachable_views(
context=(f"Project: {project}, View File Path: {path}"),
)

def _optimize_views_by_common_explore(
self,
view_to_explores: Dict[str, Set[str]],
explore_to_views: Dict[str, Set[str]],
) -> Dict[str, str]:
"""
Optimize view-to-explore mapping by grouping views to minimize API calls.

This uses a greedy algorithm that prioritizes explores that appear in the most views,
maximizing the number of views assigned to the same explore.

Args:
view_to_explores: Dict mapping view_name -> set of explore_names
explore_to_views: Dict mapping explore_name -> set of view_names

Returns:
Dict mapping view_name -> explore_name (optimized assignment)
"""

# Pre-compute explore sizes
explore_sizes = {
explore: len(views) for explore, views in explore_to_views.items()
}

# Build view-to-explore mapping using dynamic programming approach
view_to_explore: Dict[str, str] = {}

# For each view, find the explore with maximum size that contains it
for view_name, candidate_explores in view_to_explores.items():
if candidate_explores:
# Find explore with maximum size using max() with key function
# This assings the view to the explore with the most views that contains it
best_explore = max(
candidate_explores, key=lambda explore: explore_sizes[explore]
)
view_to_explore[view_name] = best_explore

# Log optimization results
unique_explores_used = len(set(view_to_explore.values()))
total_views = len(view_to_explore)
total_explores = len(explore_to_views)

if total_explores > 0:
efficiency = (1 - unique_explores_used / total_explores) * 100
logger.info(
f"View-explore optimization: Using {unique_explores_used}/{total_explores} "
f"explores for {total_views} views (efficiency: {efficiency:.1f}% savings)"
)
else:
logger.info(
f"View-explore optimization: No explores to optimize for {total_views} views"
)

logger.debug(f"Final View-to-explore mapping: {view_to_explore}")
return view_to_explore

def get_report(self):
return self.reporter
Loading
Loading