Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Object validation #307

Merged
merged 6 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 45 additions & 6 deletions tap_postgres/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Type, Union

import pendulum
import psycopg2
import singer_sdk.helpers._typing
import sqlalchemy
Expand All @@ -29,23 +30,55 @@
if TYPE_CHECKING:
from sqlalchemy.dialects import postgresql

unpatched_conform = singer_sdk.helpers._typing._conform_primitive_property


def patched_conform(
elem: Any,
property_schema: dict,
) -> Any:
"""Overrides Singer SDK type conformance to prevent dates turning into datetimes.
"""Overrides Singer SDK type conformance.

Most logic here is from singer_sdk.helpers._typing._conform_primitive_property, as
marked by "# copied". This is a full override rather than calling the "super"
because the final piece of logic in the super `if is_boolean_type(property_schema):`
is flawed. is_boolean_type will return True if the schema contains a boolean
anywhere. Therefore, a jsonschema type like ["boolean", "integer"] will return true
and will have its values coerced to either True or False. In practice, this occurs
for columns with JSONB type: no guarantees can be made about their data, so the
schema has every possible data type, including boolean. Without this override, all
JSONB columns would be coerced to True or False.

Modifications:
- prevent dates from turning into datetimes.
- prevent collapsing values to booleans. (discussed above)

Converts a primitive (i.e. not object or array) to a json compatible type.

Returns:
The appropriate json compatible type.
"""
if isinstance(elem, datetime.date):
if isinstance(elem, datetime.date): # not copied, original logic
return elem.isoformat()
return unpatched_conform(elem=elem, property_schema=property_schema)
if isinstance(elem, (datetime.datetime, pendulum.DateTime)): # copied
return singer_sdk.helpers._typing.to_json_compatible(elem)
if isinstance(elem, datetime.timedelta): # copied
epoch = datetime.datetime.fromtimestamp(0, datetime.timezone.utc)
timedelta_from_epoch = epoch + elem
if timedelta_from_epoch.tzinfo is None:
timedelta_from_epoch = timedelta_from_epoch.replace(
tzinfo=datetime.timezone.utc
)
return timedelta_from_epoch.isoformat()
if isinstance(elem, datetime.time): # copied
return str(elem)
if isinstance(elem, bytes): # copied, modified to import is_boolean_type
# for BIT value, treat 0 as False and anything else as True
# Will only due this for booleans, not `bytea` data.
return (
elem != b"\x00"
if singer_sdk.helpers._typing.is_boolean_type(property_schema)
else elem.hex()
)
return elem


singer_sdk.helpers._typing._conform_primitive_property = patched_conform
Expand Down Expand Up @@ -124,8 +157,14 @@ def to_jsonschema_type(
elif isinstance(sql_type, sqlalchemy.types.TypeEngine):
type_name = type(sql_type).__name__

# Should theoretically be th.AnyType().type_dict but that causes errors down
# the line with an error like:
# singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from
# empty type_dict. Did you forget to define a property in the stream schema?
if type_name is not None and type_name in ("JSONB", "JSON"):
return {}
return {
"type": ["string", "number", "integer", "array", "object", "boolean"]
}

if (
type_name is not None
Expand Down
45 changes: 35 additions & 10 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,20 @@ def test_jsonb_json():
Column("column_jsonb", JSONB),
Column("column_json", JSON),
)

rows = [
{"column_jsonb": {"foo": "bar"}, "column_json": {"baz": "foo"}},
{"column_jsonb": 3.14, "column_json": -9.3},
{"column_jsonb": 22, "column_json": 10000000},
{"column_jsonb": {}, "column_json": {}},
{"column_jsonb": ["bar", "foo"], "column_json": ["foo", "baz"]},
{"column_jsonb": True, "column_json": False},
]

with engine.begin() as conn:
table.drop(conn, checkfirst=True)
metadata_obj.create_all(conn)
insert = table.insert().values(
column_jsonb={"foo": "bar"},
column_json={"baz": "foo"},
)
insert = table.insert().values(rows)
conn.execute(insert)
tap = TapPostgres(config=SAMPLE_CONFIG)
tap_catalog = json.loads(tap.catalog_json_text)
Expand All @@ -236,12 +243,30 @@ def test_jsonb_json():
"stream" in schema_message
and schema_message["stream"] == altered_table_name
):
assert schema_message["schema"]["properties"]["column_jsonb"] == {}
assert schema_message["schema"]["properties"]["column_json"] == {}
assert test_runner.records[altered_table_name][0] == {
"column_jsonb": {"foo": "bar"},
"column_json": {"baz": "foo"},
}
assert schema_message["schema"]["properties"]["column_jsonb"] == {
"type": [
"string",
"number",
"integer",
"array",
"object",
"boolean",
"null",
]
}
assert schema_message["schema"]["properties"]["column_json"] == {
"type": [
"string",
"number",
"integer",
"array",
"object",
"boolean",
"null",
]
}
for i in range(len(rows)):
assert test_runner.records[altered_table_name][i] == rows[i]


def test_decimal():
Expand Down