Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add sql-datatype to the SDK discovery and catalog #1872

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions singer_sdk/_singerlib/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ class StreamMetadata(Metadata):
schema_name: str | None = None


@dataclass
class SqlMetadata(Metadata):
sql_datatype: str | None = None


AnyMetadata: TypeAlias = t.Union[Metadata, StreamMetadata]


Expand Down Expand Up @@ -167,6 +172,7 @@ def get_standard_metadata(
valid_replication_keys: list[str] | None = None,
replication_method: str | None = None,
selected_by_default: bool | None = None,
sql_datatypes: dict | None = None,
) -> MetadataMapping:
"""Get default metadata for a stream.

Expand All @@ -177,6 +183,7 @@ def get_standard_metadata(
valid_replication_keys: Stream valid replication keys.
replication_method: Stream replication method.
selected_by_default: Whether the stream is selected by default.
sql_datatypes: SQL datatypes present in the stream.

Returns:
Metadata mapping.
Expand All @@ -196,14 +203,19 @@ def get_standard_metadata(
root.schema_name = schema_name

for field_name in schema.get("properties", {}):
if sql_datatypes is None:
entry = Metadata()
else:
entry = SqlMetadata(sql_datatype=sql_datatypes.get(field_name))

if (
key_properties
and field_name in key_properties
or (valid_replication_keys and field_name in valid_replication_keys)
):
entry = Metadata(inclusion=Metadata.InclusionType.AUTOMATIC)
entry.inclusion = Metadata.InclusionType.AUTOMATIC
else:
entry = Metadata(inclusion=Metadata.InclusionType.AVAILABLE)
entry.inclusion = Metadata.InclusionType.AVAILABLE

mapping[("properties", field_name)] = entry

Expand Down
67 changes: 54 additions & 13 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,33 @@ def get_object_names(
view_names = []
return [(t, False) for t in table_names] + [(v, True) for v in view_names]

def discover_catalog_entry_sql_datatype(
self,
data_type: sqlalchemy.types.TypeEngine,
) -> str:
"""Retrun SQL Datatype as a string to utilize in the catalog.

Args:
data_type: given data type as sqlalchemy.types.TypeEngine

Returns:
A string description the given data type example "VARCHAR(length=15)".
"""
datatype_attributes = ("length", "scale", "precision")

catalog_format = f"{type(data_type).__name__}("

for attribute in datatype_attributes:
if hasattr(data_type, attribute) and getattr(data_type, attribute):
catalog_format += f"{attribute}={(getattr(data_type, attribute))}, "

if catalog_format.endswith(", "):
catalog_format = catalog_format[:-2]

catalog_format += ")"

return catalog_format

# TODO maybe should be splitted into smaller parts?
def discover_catalog_entry(
self,
Expand Down Expand Up @@ -441,21 +468,34 @@ def discover_catalog_entry(

# Initialize columns list
table_schema = th.PropertiesList()
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(
t.cast(sqlalchemy.types.TypeEngine, column_def["type"]),
)
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
with warnings.catch_warnings(record=True) as inspection_warnings:
for column_def in inspected.get_columns(table_name, schema=schema_name):
column_name = column_def["name"]
is_nullable = column_def.get("nullable", False)
jsonschema_type: dict = self.to_jsonschema_type(
t.cast(sqlalchemy.types.TypeEngine, column_def["type"]),
)
table_schema.append(
th.Property(
name=column_name,
wrapped=th.CustomType(jsonschema_type),
required=not is_nullable,
),
)
if len(inspection_warnings) > 0:
for line in inspection_warnings:
expanded_msg: str = (
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
f"Discovery warning: {line.message} in '{unique_stream_id}'"
)
self.logger.info(expanded_msg)
schema = table_schema.to_dict()

sql_datatypes = {}
for column_def in inspected.get_columns(table_name, schema=schema_name):
sql_datatypes[
str(column_def["name"])
] = self.discover_catalog_entry_sql_datatype(column_def["type"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you feel comfortable adding tests for this? A good place might be in tests/core/test_connector_sql.py since this is a connector method. I think it could even be parametrized with many examples of column types.


# Initialize available replication methods
addl_replication_methods: list[str] = [""] # By default an empty list.
# Notes regarding replication methods:
Expand All @@ -480,6 +520,7 @@ def discover_catalog_entry(
replication_method=replication_method,
key_properties=key_properties,
valid_replication_keys=None, # Must be defined by user
sql_datatypes=sql_datatypes,
),
database=None, # Expects single-database context
row_count=None,
Expand Down