Skip to content

Commit f3e016f

Browse files
authored
feat: Filter schemas based on user-provided config (#218)
Makes use of config filter_schemas to only pull specified schemas, increasing performance and allowing users to sidestep permissions issues for schemas they don't have access to. Closes #215
1 parent 640c002 commit f3e016f

File tree

4 files changed

+51
-1
lines changed

4 files changed

+51
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
2323
| password | False | None | Password used to authenticate. Note if sqlalchemy_url is set this will be ignored. |
2424
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
2525
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
26+
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
2627
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
2728
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details.
2829
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion server, this is the host we'll connect to via ssh

tap_postgres/client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from singer_sdk import SQLConnector, SQLStream
1313
from singer_sdk import typing as th
1414
from singer_sdk.helpers._typing import TypeConformanceLevel
15+
from sqlalchemy.engine import Engine
16+
from sqlalchemy.engine.reflection import Inspector
1517

1618
if TYPE_CHECKING:
1719
from sqlalchemy.dialects import postgresql
@@ -165,6 +167,20 @@ def sdk_typing_object(
165167

166168
return sqltype_lookup["string"] # safe failover to str
167169

170+
def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
171+
"""Return a list of schema names in DB, or overrides with user-provided values.
172+
173+
Args:
174+
engine: SQLAlchemy engine
175+
inspected: SQLAlchemy inspector instance for engine
176+
177+
Returns:
178+
List of schema names
179+
"""
180+
if "filter_schemas" in self.config and len(self.config["filter_schemas"]) != 0:
181+
return self.config["filter_schemas"]
182+
return super().get_schema_names(engine, inspected)
183+
168184

169185
class PostgresStream(SQLStream):
170186
"""Stream class for Postgres streams."""

tap_postgres/tap.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ def __init__(
128128
"Example postgresql://[username]:[password]@localhost:5432/[db_name]"
129129
),
130130
),
131+
th.Property(
132+
"filter_schemas",
133+
th.ArrayType(th.StringType),
134+
description=(
135+
"If an array of schema names is provided, the tap will only process "
136+
"the specified Postgres schemas and ignore others. If left blank, the "
137+
"tap automatically determines ALL available Postgres schemas."
138+
),
139+
),
131140
th.Property(
132141
"ssh_tunnel",
133142
th.ObjectType(

tests/test_core.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import datetime
23
import decimal
34
import json
@@ -9,7 +10,7 @@
910
from singer_sdk.testing import get_tap_test_class, suites
1011
from singer_sdk.testing.runners import TapTestRunner
1112
from sqlalchemy import Column, DateTime, Integer, MetaData, Numeric, String, Table
12-
from sqlalchemy.dialects.postgresql import DATE, JSONB, TIME, TIMESTAMP, JSON
13+
from sqlalchemy.dialects.postgresql import BIGINT, DATE, JSON, JSONB, TIME, TIMESTAMP
1314
from test_replication_key import TABLE_NAME, TapTestReplicationKey
1415
from test_selected_columns_only import (
1516
TABLE_NAME_SELECTED_COLUMNS_ONLY,
@@ -290,6 +291,29 @@ def test_decimal():
290291
assert "number" in schema_message["schema"]["properties"]["column"]["type"]
291292

292293

294+
def test_filter_schemas():
295+
"""Only return tables from a given schema"""
296+
table_name = "test_filter_schemas"
297+
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])
298+
299+
metadata_obj = MetaData()
300+
table = Table(table_name, metadata_obj, Column("id", BIGINT), schema="new_schema")
301+
302+
with engine.connect() as conn:
303+
conn.execute("CREATE SCHEMA IF NOT EXISTS new_schema")
304+
if table.exists(conn):
305+
table.drop(conn)
306+
metadata_obj.create_all(conn)
307+
filter_schemas_config = copy.deepcopy(SAMPLE_CONFIG)
308+
filter_schemas_config.update({"filter_schemas": ["new_schema"]})
309+
tap = TapPostgres(config=filter_schemas_config)
310+
tap_catalog = json.loads(tap.catalog_json_text)
311+
altered_table_name = f"new_schema-{table_name}"
312+
# Check that the only stream in the catalog is the one table put into new_schema
313+
assert len(tap_catalog["streams"]) == 1
314+
assert tap_catalog["streams"][0]["stream"] == altered_table_name
315+
316+
293317
class PostgresTestRunner(TapTestRunner):
294318
def run_sync_dry_run(self) -> bool:
295319
"""

0 commit comments

Comments
 (0)