Skip to content

Commit

Permalink
perf(taps): Improved discovery performance for SQL taps (#2793)
Browse files Browse the repository at this point in the history
* perf(taps): Improved discovery performance for SQL taps

* Discover constraints scoped by object type

* Inspect only once per schema

* Try indices instead of UNIQUE constraints

* Extend existing API

* Avoid unnecessarily reflecting stuff

* Backwards-compatible API

* Deprecate `SQLConnector.get_object_names`
  • Loading branch information
edgarrmondragon authored Dec 3, 2024
1 parent b989747 commit d3dcc8e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 64 deletions.
22 changes: 0 additions & 22 deletions samples/sample_tap_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,6 @@ def get_sqlalchemy_url(self, config: dict) -> str: # noqa: PLR6301
"""Concatenate a SQLAlchemy URL for use in connecting to the source."""
return f"bigquery://{config['project_id']}"

def get_object_names(
self,
engine,
inspected,
schema_name: str,
) -> list[tuple[str, bool]]:
"""Return discoverable object names."""
# Bigquery inspections returns table names in the form
# `schema_name.table_name` which later results in the project name
# override due to specifics in behavior of sqlalchemy-bigquery
#
# Let's strip `schema_name` prefix on the inspection

return [
(table_name.split(".")[-1], is_view)
for (table_name, is_view) in super().get_object_names(
engine,
inspected,
schema_name,
)
]


class BigQueryStream(SQLStream):
"""Stream class for BigQuery streams."""
Expand Down
110 changes: 68 additions & 42 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.engine import reflection

from singer_sdk import typing as th
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
from singer_sdk.exceptions import ConfigValidationError
from singer_sdk.helpers._compat import SingerSDKDeprecationWarning
from singer_sdk.helpers._util import dump_json, load_json
from singer_sdk.helpers.capabilities import TargetLoadMethods

Expand Down Expand Up @@ -847,7 +849,12 @@ def get_schema_names( # noqa: PLR6301
"""
return inspected.get_schema_names()

def get_object_names(
@deprecated(
"This method is deprecated.",
category=SingerSDKDeprecationWarning,
stacklevel=1,
)
def get_object_names( # pragma: no cover
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
Expand Down Expand Up @@ -877,12 +884,14 @@ def get_object_names(
def discover_catalog_entry(
self,
engine: Engine, # noqa: ARG002
inspected: Inspector,
schema_name: str,
inspected: Inspector, # noqa: ARG002
schema_name: str | None,
table_name: str,
is_view: bool, # noqa: FBT001
*,
reflect_indices: bool = True,
reflected_columns: list[reflection.ReflectedColumn] | None = None,
reflected_pk: reflection.ReflectedPrimaryKeyConstraint | None = None,
reflected_indices: list[reflection.ReflectedIndex] | None = None,
) -> CatalogEntry:
"""Create `CatalogEntry` object for the given table or a view.
Expand All @@ -893,45 +902,47 @@ def discover_catalog_entry(
table_name: Name of the table or a view
is_view: Flag whether this object is a view, returned by `get_object_names`
reflect_indices: Whether to reflect indices
reflected_columns: List of reflected columns
reflected_pk: Reflected primary key
reflected_indices: List of reflected indices
Returns:
`CatalogEntry` object for the given table or a view
"""
# Initialize unique stream name
unique_stream_id = f"{schema_name}-{table_name}"
unique_stream_id = f"{schema_name}-{table_name}" if schema_name else table_name

# Backwards-compatibility
reflected_columns = reflected_columns or []
reflected_indices = reflected_indices or []

# Detect key properties
possible_primary_keys: list[list[str]] = []
pk_def = inspected.get_pk_constraint(table_name, schema=schema_name)
if pk_def and "constrained_columns" in pk_def: # type: ignore[redundant-expr]
possible_primary_keys.append(pk_def["constrained_columns"])
if reflected_pk and "constrained_columns" in reflected_pk:
possible_primary_keys.append(reflected_pk["constrained_columns"])

# An element of the columns list is ``None`` if it's an expression and is
# returned in the ``expressions`` list of the reflected index.
if reflect_indices:
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in inspected.get_indexes(table_name, schema=schema_name)
if index_def.get("unique", False)
)
possible_primary_keys.extend(
index_def["column_names"] # type: ignore[misc]
for index_def in reflected_indices
if index_def.get("unique", False)
)

key_properties = next(iter(possible_primary_keys), None)
key_properties = next(iter(possible_primary_keys), [])

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(column_def["type"])
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
nullable=is_nullable,
required=column_name in key_properties if key_properties else False,
),
properties = [
th.Property(
name=column["name"],
wrapped=th.CustomType(self.to_jsonschema_type(column["type"])),
nullable=column.get("nullable", False),
required=column["name"] in key_properties,
description=column.get("comment"),
)
schema = table_schema.to_dict()
for column in reflected_columns
]
schema = th.PropertiesList(*properties).to_dict()

# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
Expand Down Expand Up @@ -983,25 +994,40 @@ def discover_catalog_entries(
result: list[dict] = []
engine = self._engine
inspected = sa.inspect(engine)
object_kinds = (
(reflection.ObjectKind.TABLE, False),
(reflection.ObjectKind.ANY_VIEW, True),
)
for schema_name in self.get_schema_names(engine, inspected):
if schema_name in exclude_schemas:
continue

# Iterate through each table and view
for table_name, is_view in self.get_object_names(
engine,
inspected,
schema_name,
):
catalog_entry = self.discover_catalog_entry(
engine,
inspected,
schema_name,
table_name,
is_view,
reflect_indices=reflect_indices,
primary_keys = inspected.get_multi_pk_constraint(schema=schema_name)

if reflect_indices:
indices = inspected.get_multi_indexes(schema=schema_name)
else:
indices = {}

for object_kind, is_view in object_kinds:
columns = inspected.get_multi_columns(
schema=schema_name,
kind=object_kind,
)

result.extend(
self.discover_catalog_entry(
engine,
inspected,
schema_name,
table,
is_view,
reflected_columns=columns[schema, table],
reflected_pk=primary_keys.get((schema, table)),
reflected_indices=indices.get((schema, table), []),
).to_dict()
for schema, table in columns
)
result.append(catalog_entry.to_dict())

return result

Expand Down

0 comments on commit d3dcc8e

Please sign in to comment.