From d3dcc8e9a73cf795c438cd6d8294ff02dc704511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Tue, 3 Dec 2024 17:03:30 -0600 Subject: [PATCH] perf(taps): Improved discovery performance for SQL taps (#2793) * perf(taps): Improved discovery performance for SQL taps * Discover constraints scoped by object type * Inspect only once per schema * Try indices instead of UNIQUE constraints * Extend existing API * Avoid unnecessarily reflecting stuff * Backwards-compatible API * Deprecate `SQLConnector.get_object_names` --- samples/sample_tap_bigquery/__init__.py | 22 ----- singer_sdk/connectors/sql.py | 110 +++++++++++++++--------- 2 files changed, 68 insertions(+), 64 deletions(-) diff --git a/samples/sample_tap_bigquery/__init__.py b/samples/sample_tap_bigquery/__init__.py index ad0d3c3cb..a91f372eb 100644 --- a/samples/sample_tap_bigquery/__init__.py +++ b/samples/sample_tap_bigquery/__init__.py @@ -13,28 +13,6 @@ def get_sqlalchemy_url(self, config: dict) -> str: # noqa: PLR6301 """Concatenate a SQLAlchemy URL for use in connecting to the source.""" return f"bigquery://{config['project_id']}" - def get_object_names( - self, - engine, - inspected, - schema_name: str, - ) -> list[tuple[str, bool]]: - """Return discoverable object names.""" - # Bigquery inspections returns table names in the form - # `schema_name.table_name` which later results in the project name - # override due to specifics in behavior of sqlalchemy-bigquery - # - # Let's strip `schema_name` prefix on the inspection - - return [ - (table_name.split(".")[-1], is_view) - for (table_name, is_view) in super().get_object_names( - engine, - inspected, - schema_name, - ) - ] - class BigQueryStream(SQLStream): """Stream class for BigQuery streams.""" diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index f829fe897..3302a249f 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -13,10 +13,12 @@ from functools import lru_cache import sqlalchemy as sa +from sqlalchemy.engine import reflection from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers._compat import SingerSDKDeprecationWarning from singer_sdk.helpers._util import dump_json, load_json from singer_sdk.helpers.capabilities import TargetLoadMethods @@ -847,7 +849,12 @@ def get_schema_names( # noqa: PLR6301 """ return inspected.get_schema_names() - def get_object_names( + @deprecated( + "This method is deprecated.", + category=SingerSDKDeprecationWarning, + stacklevel=1, + ) + def get_object_names( # pragma: no cover self, engine: Engine, # noqa: ARG002 inspected: Inspector, @@ -877,12 +884,14 @@ def get_object_names( def discover_catalog_entry( self, engine: Engine, # noqa: ARG002 - inspected: Inspector, - schema_name: str, + inspected: Inspector, # noqa: ARG002 + schema_name: str | None, table_name: str, is_view: bool, # noqa: FBT001 *, - reflect_indices: bool = True, + reflected_columns: list[reflection.ReflectedColumn] | None = None, + reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None, + reflected_indices: list[reflection.ReflectedIndex] | None = None, ) -> CatalogEntry: """Create `CatalogEntry` object for the given table or a view. @@ -893,45 +902,47 @@ def discover_catalog_entry( table_name: Name of the table or a view is_view: Flag whether this object is a view, returned by `get_object_names` reflect_indices: Whether to reflect indices + reflected_columns: List of reflected columns + reflected_pk: Reflected primary key + reflected_indices: List of reflected indices Returns: `CatalogEntry` object for the given table or a view """ # Initialize unique stream name - unique_stream_id = f"{schema_name}-{table_name}" + unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name + + # Backwards-compatibility + reflected_columns = reflected_columns or [] + reflected_indices = reflected_indices or [] # Detect key properties possible_primary_keys: list[list[str]] = [] - pk_def = inspected.get_pk_constraint(table_name, schema=schema_name) - if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr] - possible_primary_keys.append(pk_def["constrained_columns"]) + if reflected_pk and "constrained_columns" in reflected_pk: + possible_primary_keys.append(reflected_pk["constrained_columns"]) # An element of the columns list is ``None`` if it's an expression and is # returned in the ``expressions`` list of the reflected index. - if reflect_indices: - possible_primary_keys.extend( - index_def["column_names"] # type: ignore[misc] - for index_def in inspected.get_indexes(table_name, schema=schema_name) - if index_def.get("unique", False) - ) + possible_primary_keys.extend( + index_def["column_names"] # type: ignore[misc] + for index_def in reflected_indices + if index_def.get("unique", False) + ) - key_properties = next(iter(possible_primary_keys), None) + key_properties = next(iter(possible_primary_keys), []) # Initialize columns list - table_schema = th.PropertiesList() - for column_def in inspected.get_columns(table_name, schema=schema_name): - column_name = column_def["name"] - is_nullable = column_def.get("nullable", False) - jsonschema_type: dict = self.to_jsonschema_type(column_def["type"]) - table_schema.append( - th.Property( - name=column_name, - wrapped=th.CustomType(jsonschema_type), - nullable=is_nullable, - required=column_name in key_properties if key_properties else False, - ), + properties = [ + th.Property( + name=column["name"], + wrapped=th.CustomType(self.to_jsonschema_type(column["type"])), + nullable=column.get("nullable", False), + required=column["name"] in key_properties, + description=column.get("comment"), ) - schema = table_schema.to_dict() + for column in reflected_columns + ] + schema = th.PropertiesList(*properties).to_dict() # Initialize available replication methods addl_replication_methods: list[str] = [""] # By default an empty list. @@ -983,25 +994,40 @@ def discover_catalog_entries( result: list[dict] = [] engine = self._engine inspected = sa.inspect(engine) + object_kinds = ( + (reflection.ObjectKind.TABLE, False), + (reflection.ObjectKind.ANY_VIEW, True), + ) for schema_name in self.get_schema_names(engine, inspected): if schema_name in exclude_schemas: continue - # Iterate through each table and view - for table_name, is_view in self.get_object_names( - engine, - inspected, - schema_name, - ): - catalog_entry = self.discover_catalog_entry( - engine, - inspected, - schema_name, - table_name, - is_view, - reflect_indices=reflect_indices, + primary_keys = inspected.get_multi_pk_constraint(schema=schema_name) + + if reflect_indices: + indices = inspected.get_multi_indexes(schema=schema_name) + else: + indices = {} + + for object_kind, is_view in object_kinds: + columns = inspected.get_multi_columns( + schema=schema_name, + kind=object_kind, + ) + + result.extend( + self.discover_catalog_entry( + engine, + inspected, + schema_name, + table, + is_view, + reflected_columns=columns[schema, table], + reflected_pk=primary_keys.get((schema, table)), + reflected_indices=indices.get((schema, table), []), + ).to_dict() + for schema, table in columns ) - result.append(catalog_entry.to_dict()) return result