Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dates can now be parsed only as strings, also migrated to the non dep… #247

Merged
merged 4 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ repos:
exclude: tests
additional_dependencies:
- types-paramiko
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
| database | False | None | Database name. Note if sqlalchemy_url is set this will be ignored. |
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. |
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
| ssh_tunnel.enable | True (if ssh_tunnel set) | False | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details.
| ssh_tunnel.host | True (if ssh_tunnel set) | False | Host of the bastion server, this is the host we'll connect to via ssh
Expand Down
52 changes: 45 additions & 7 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type, Union

import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy
from singer_sdk import SQLConnector, SQLStream
Expand Down Expand Up @@ -44,20 +45,53 @@ def patched_conform(
class PostgresConnector(SQLConnector):
"""Connects to the Postgres SQL source."""

@staticmethod
def __init__(
self,
config: dict | None = None,
sqlalchemy_url: str | None = None,
) -> None:
"""Initialize the SQL connector.

Args:
config: The parent tap or target object's config.
sqlalchemy_url: Optional URL for the connection.

"""
# Dates in postgres don't all convert to python datetime objects, so we
# need to register a custom type caster to convert these to a string
# See https://www.psycopg.org/psycopg3/docs/advanced/adapt.html#example-handling-infinity-date # noqa: E501
# For more information
if config is not None and config["dates_as_string"] is True:
string_dates = psycopg2.extensions.new_type(
(1082, 1114, 1184), "STRING_DATES", psycopg2.STRING
)
string_date_arrays = psycopg2.extensions.new_array_type(
(1182, 1115, 1188), "STRING_DATE_ARRAYS[]", psycopg2.STRING
)
psycopg2.extensions.register_type(string_dates)
psycopg2.extensions.register_type(string_date_arrays)

super().__init__(config=config, sqlalchemy_url=sqlalchemy_url)

# Note super is static, we can get away with this because this is called once
# and is luckily referenced via the instance of the class
def to_jsonschema_type(
self,
sql_type: Union[
str,
sqlalchemy.types.TypeEngine,
Type[sqlalchemy.types.TypeEngine],
postgresql.ARRAY,
Any,
]
],
) -> dict:
"""Return a JSON Schema representation of the provided type.

Overidden from SQLConnector to correctly handle JSONB and Arrays.

Also Overridden in order to call our instance method `sdk_typing_object()`
instead of the static version

By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy
types.

Expand Down Expand Up @@ -89,12 +123,12 @@ def to_jsonschema_type(
and isinstance(sql_type, sqlalchemy.dialects.postgresql.ARRAY)
and type_name == "ARRAY"
):
array_type = PostgresConnector.sdk_typing_object(sql_type.item_type)
array_type = self.sdk_typing_object(sql_type.item_type)
return th.ArrayType(array_type).type_dict
return PostgresConnector.sdk_typing_object(sql_type).type_dict
return self.sdk_typing_object(sql_type).type_dict

@staticmethod
def sdk_typing_object(
self,
from_type: str
| sqlalchemy.types.TypeEngine
| type[sqlalchemy.types.TypeEngine],
Expand Down Expand Up @@ -148,6 +182,9 @@ def sdk_typing_object(
"bool": th.BooleanType(),
"variant": th.StringType(),
}
if self.config["dates_as_string"] is True:
sqltype_lookup["date"] = th.StringType()
sqltype_lookup["datetime"] = th.StringType()
if isinstance(from_type, str):
type_name = from_type
elif isinstance(from_type, sqlalchemy.types.TypeEngine):
Expand Down Expand Up @@ -235,5 +272,6 @@ def get_records(self, context: Optional[dict]) -> Iterable[Dict[str, Any]]:
if start_val:
query = query.filter(replication_key_col >= start_val)

for row in self.connector.connection.execute(query):
yield dict(row)
with self.connector._connect() as con:
for row in con.execute(query):
yield dict(row)
14 changes: 12 additions & 2 deletions tap_postgres/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import signal
from functools import cached_property
from os import chmod, path
from typing import Any, Mapping, cast
from typing import Any, Dict, cast

import paramiko
from singer_sdk import SQLTap, Stream
Expand Down Expand Up @@ -137,6 +137,16 @@ def __init__(
"tap automatically determines ALL available Postgres schemas."
),
),
th.Property(
"dates_as_string",
th.BooleanType,
description=(
"Defaults to false, if true, date, and timestamp fields will be "
"Strings. If you see ValueError: Year is out of range, "
"try setting this to True."
),
default=False,
),
th.Property(
"ssh_tunnel",
th.ObjectType(
Expand Down Expand Up @@ -274,7 +284,7 @@ def __init__(
),
).to_dict()

def get_sqlalchemy_url(self, config: Mapping[str, Any]) -> str:
def get_sqlalchemy_url(self, config: Dict[Any, Any]) -> str:
"""Generate a SQLAlchemy URL.

Args:
Expand Down
98 changes: 94 additions & 4 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def test_temporal_datatypes():
This test checks that dates are being parsed correctly, and additionally implements
schema checks, and performs similar tests on times and timestamps.
"""
table_name = "test_date"
table_name = "test_temporal_datatypes"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
Expand Down Expand Up @@ -237,11 +237,17 @@ def test_jsonb_json():
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert "object" in schema_message["schema"]["properties"]["column_jsonb"]["type"]
assert "object" in schema_message["schema"]["properties"]["column_json"]["type"]
assert (
"object"
in schema_message["schema"]["properties"]["column_jsonb"]["type"]
)
assert (
"object"
in schema_message["schema"]["properties"]["column_json"]["type"]
)
assert test_runner.records[altered_table_name][0] == {
"column_jsonb": {"foo": "bar"},
"column_json": {"baz": "foo"}
"column_json": {"baz": "foo"},
}


Expand Down Expand Up @@ -323,3 +329,87 @@ def run_sync_dry_run(self) -> bool:
new_tap = self.new_tap()
new_tap.sync_all()
return True


def test_invalid_python_dates():
"""Some dates are invalid in python, but valid in Postgres

Check out https://www.psycopg.org/psycopg3/docs/advanced/adapt.html#example-handling-infinity-date
for more information.

"""
table_name = "test_invalid_python_dates"
engine = sqlalchemy.create_engine(SAMPLE_CONFIG["sqlalchemy_url"])

metadata_obj = MetaData()
table = Table(
table_name,
metadata_obj,
Column("date", DATE),
Column("datetime", DateTime),
)
with engine.connect() as conn:
if table.exists(conn):
table.drop(conn)
metadata_obj.create_all(conn)
insert = table.insert().values(
date="4713-04-03 BC",
datetime="4712-10-19 10:23:54 BC",
)
conn.execute(insert)
tap = TapPostgres(config=SAMPLE_CONFIG)
# Alter config and then check the data comes through as a string
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "FULL_TABLE"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
)
with pytest.raises(ValueError):
test_runner.sync_all()

copied_config = copy.deepcopy(SAMPLE_CONFIG)
# This should cause the same data to pass
copied_config["dates_as_string"] = True
tap = TapPostgres(config=copied_config)
tap_catalog = json.loads(tap.catalog_json_text)
altered_table_name = f"public-{table_name}"
for stream in tap_catalog["streams"]:
if stream.get("stream") and altered_table_name not in stream["stream"]:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = False
else:
for metadata in stream["metadata"]:
metadata["metadata"]["selected"] = True
if metadata["breadcrumb"] == []:
metadata["metadata"]["replication-method"] = "FULL_TABLE"

test_runner = PostgresTestRunner(
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
)
test_runner.sync_all()

for schema_message in test_runner.schema_messages:
if (
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert ["string", "null"] == schema_message["schema"]["properties"]["date"][
"type"
]
assert ["string", "null"] == schema_message["schema"]["properties"][
"datetime"
]["type"]
assert test_runner.records[altered_table_name][0] == {
"date": "4713-04-03 BC",
"datetime": "4712-10-19 10:23:54 BC",
}