Skip to content

Commit e8aa7b6

Browse files
committed
Add json_as_object option to allow JSON and JSONB to be tapped as plain objects.
1 parent aaa3b7e commit e8aa7b6

File tree

4 files changed

+90
-2
lines changed

4 files changed

+90
-2
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com).
2626
| sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] |
2727
| filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. |
2828
| dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. |
29+
| json_as_object | False | 0 | Defaults to false, if true, json and jsonb fields will be Objects. |
2930
| ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object |
3031
| ssh_tunnel.enable | False | 0 | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details |
3132
| ssh_tunnel.host | False | None | Host of the bastion server, this is the host we'll connect to via ssh |
@@ -132,6 +133,8 @@ Create tests within the `tap_postgres/tests` subfolder and
132133
poetry run pytest
133134
```
134135

136+
NOTE: Running the tests requires a locally running postgres. See tests/settings.py for the expected configuration.
137+
135138
You can also test the `tap-postgres` CLI interface directly using `poetry run`:
136139

137140
```bash

tap_postgres/client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@
3636
class PostgresSQLToJSONSchema(SQLToJSONSchema):
3737
"""Custom SQL to JSON Schema conversion for Postgres."""
3838

39-
def __init__(self, dates_as_string: bool, *args, **kwargs):
39+
def __init__(self, dates_as_string: bool, json_as_object: bool, *args, **kwargs):
4040
"""Initialize the SQL to JSON Schema converter."""
4141
super().__init__(*args, **kwargs)
4242
self.dates_as_string = dates_as_string
43+
self.json_as_object = json_as_object
4344

4445
@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
4546
def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
@@ -55,6 +56,8 @@ def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict:
5556
@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
5657
def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict:
5758
"""Override the default mapping for JSON and JSONB columns."""
59+
if self.json_as_object:
60+
return {"type": ["object", "null"]}
5861
return {"type": ["string", "number", "integer", "array", "object", "boolean"]}
5962

6063
@SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined]
@@ -159,7 +162,10 @@ def __init__(
159162
@functools.cached_property
160163
def sql_to_jsonschema(self):
161164
"""Return a mapping of SQL types to JSON Schema types."""
162-
return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"])
165+
return PostgresSQLToJSONSchema(
166+
dates_as_string=self.config["dates_as_string"],
167+
json_as_object=self.config["json_as_object"],
168+
)
163169

164170
def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
165171
"""Return a list of schema names in DB, or overrides with user-provided values.

tap_postgres/tap.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,14 @@ def __init__(
176176
),
177177
default=False,
178178
),
179+
th.Property(
180+
"json_as_object",
181+
th.BooleanType,
182+
description=(
183+
"Defaults to false, if true, json and jsonb fields will be Objects."
184+
),
185+
default=False,
186+
),
179187
th.Property(
180188
"ssh_tunnel",
181189
th.ObjectType(

tests/test_core.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,77 @@ def test_jsonb_array():
338338
assert test_runner.records[altered_table_name][i] == rows[i]
339339

340340

341+
def test_json_as_object():
342+
"""Some use cases require JSON and JSONB columns to be typed as Objects."""
343+
table_name = "test_json_as_object"
344+
engine = sa.create_engine(SAMPLE_CONFIG["sqlalchemy_url"], future=True)
345+
346+
metadata_obj = sa.MetaData()
347+
table = sa.Table(
348+
table_name,
349+
metadata_obj,
350+
sa.Column("column_jsonb", JSONB),
351+
sa.Column("column_json", JSON),
352+
)
353+
354+
rows = [
355+
{"column_jsonb": {"foo": "bar"}, "column_json": {"baz": "foo"}},
356+
{"column_jsonb": 3.14, "column_json": -9.3},
357+
{"column_jsonb": 22, "column_json": 10000000},
358+
{"column_jsonb": {}, "column_json": {}},
359+
{"column_jsonb": ["bar", "foo"], "column_json": ["foo", "baz"]},
360+
{"column_jsonb": True, "column_json": False},
361+
]
362+
363+
with engine.begin() as conn:
364+
table.drop(conn, checkfirst=True)
365+
metadata_obj.create_all(conn)
366+
insert = table.insert().values(rows)
367+
conn.execute(insert)
368+
369+
copied_config = copy.deepcopy(SAMPLE_CONFIG)
370+
# This should cause the same data to pass
371+
copied_config["json_as_object"] = True
372+
373+
tap = TapPostgres(config=copied_config)
374+
tap_catalog = json.loads(tap.catalog_json_text)
375+
altered_table_name = f"{DB_SCHEMA_NAME}-{table_name}"
376+
377+
for stream in tap_catalog["streams"]:
378+
if stream.get("stream") and altered_table_name not in stream["stream"]:
379+
for metadata in stream["metadata"]:
380+
metadata["metadata"]["selected"] = False
381+
else:
382+
for metadata in stream["metadata"]:
383+
metadata["metadata"]["selected"] = True
384+
if metadata["breadcrumb"] == []:
385+
metadata["metadata"]["replication-method"] = "FULL_TABLE"
386+
387+
test_runner = PostgresTestRunner(
388+
tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog
389+
)
390+
test_runner.sync_all()
391+
for schema_message in test_runner.schema_messages:
392+
if (
393+
"stream" in schema_message
394+
and schema_message["stream"] == altered_table_name
395+
):
396+
assert schema_message["schema"]["properties"]["column_jsonb"] == {
397+
"type": [
398+
"object",
399+
"null",
400+
]
401+
}
402+
assert schema_message["schema"]["properties"]["column_json"] == {
403+
"type": [
404+
"object",
405+
"null",
406+
]
407+
}
408+
for i in range(len(rows)):
409+
assert test_runner.records[altered_table_name][i] == rows[i]
410+
411+
341412
def test_numeric_types():
342413
"""Schema was wrong for Decimal objects. Check they are correctly selected."""
343414
table_name = "test_decimal"

0 commit comments

Comments
 (0)