Skip to content

Commit a9188ce

Browse files
feat: Emit Postgres schema as _sdc_postgres_schema
1 parent d196204 commit a9188ce

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

tap_postgres/client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,13 @@ class PostgresStream(SQLStream):
264264
# JSONB Objects won't be selected without type_conformance_level to ROOT_ONLY
265265
TYPE_CONFORMANCE_LEVEL = TypeConformanceLevel.ROOT_ONLY
266266

267+
@cached_property
268+
def schema(self) -> dict:
269+
"""Override schema adding _sdc columns."""
270+
schema_dict = self._singer_catalog_entry.schema.to_dict()
271+
schema_dict["properties"]["_sdc_postgres_schema"] = {"type": ["string", "null"]}
272+
return schema_dict
273+
267274
def max_record_count(self) -> int | None:
268275
"""Return the maximum number of records to fetch in a single query."""
269276
return self.config.get("max_record_count")
@@ -326,6 +333,7 @@ def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
326333
# TODO: Standardize record mapping type
327334
# https://github.com/meltano/sdk/issues/2096
328335
transformed_record = self.post_process(dict(record))
336+
transformed_record["_sdc_postgres_schema"] = table.schema
329337
if transformed_record is None:
330338
# Record filtered out during post_process()
331339
continue

0 commit comments

Comments
 (0)