From 6efa7d0c7aeefaa2f2d1f82126a6e9ffa7ee23f3 Mon Sep 17 00:00:00 2001 From: Jon Cass Date: Fri, 1 Nov 2024 10:33:51 -0400 Subject: [PATCH] feat: Add json_as_object option (#526) To allow JSON and JSONB to be tapped as plain objects See [this Slack thread](https://meltano.slack.com/archives/C069CQNHDNF/p1729784981090099) for context --- README.md | 3 ++ tap_postgres/client.py | 10 ++++-- tap_postgres/tap.py | 8 +++++ tests/test_core.py | 73 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 69bedbc..e6f6d07 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Built with the [Meltano Singer SDK](https://sdk.meltano.com). | sqlalchemy_url | False | None | Example postgresql://[username]:[password]@localhost:5432/[db_name] | | filter_schemas | False | None | If an array of schema names is provided, the tap will only process the specified Postgres schemas and ignore others. If left blank, the tap automatically determines ALL available Postgres schemas. | | dates_as_string | False | 0 | Defaults to false, if true, date, and timestamp fields will be Strings. If you see ValueError: Year is out of range, try setting this to True. | +| json_as_object | False | 0 | Defaults to false, if true, json and jsonb fields will be Objects. | | ssh_tunnel | False | None | SSH Tunnel Configuration, this is a json object | | ssh_tunnel.enable | False | 0 | Enable an ssh tunnel (also known as bastion server), see the other ssh_tunnel.* properties for more details | | ssh_tunnel.host | False | None | Host of the bastion server, this is the host we'll connect to via ssh | @@ -132,6 +133,8 @@ Create tests within the `tap_postgres/tests` subfolder and poetry run pytest ``` +NOTE: Running the tests requires a locally running postgres. See tests/settings.py for the expected configuration. + You can also test the `tap-postgres` CLI interface directly using `poetry run`: ```bash diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 40e2f4b..f3ceecf 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -36,10 +36,11 @@ class PostgresSQLToJSONSchema(SQLToJSONSchema): """Custom SQL to JSON Schema conversion for Postgres.""" - def __init__(self, dates_as_string: bool, *args, **kwargs): + def __init__(self, dates_as_string: bool, json_as_object: bool, *args, **kwargs): """Initialize the SQL to JSON Schema converter.""" super().__init__(*args, **kwargs) self.dates_as_string = dates_as_string + self.json_as_object = json_as_object @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict: @@ -55,6 +56,8 @@ def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict: @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict: """Override the default mapping for JSON and JSONB columns.""" + if self.json_as_object: + return {"type": ["object", "null"]} return {"type": ["string", "number", "integer", "array", "object", "boolean"]} @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] @@ -159,7 +162,10 @@ def __init__( @functools.cached_property def sql_to_jsonschema(self): """Return a mapping of SQL types to JSON Schema types.""" - return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"]) + return PostgresSQLToJSONSchema( + dates_as_string=self.config["dates_as_string"], + json_as_object=self.config["json_as_object"], + ) def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: """Return a list of schema names in DB, or overrides with user-provided values. diff --git a/tap_postgres/tap.py b/tap_postgres/tap.py index ed75a47..31831c5 100644 --- a/tap_postgres/tap.py +++ b/tap_postgres/tap.py @@ -176,6 +176,14 @@ def __init__( ), default=False, ), + th.Property( + "json_as_object", + th.BooleanType, + description=( + "Defaults to false, if true, json and jsonb fields will be Objects." + ), + default=False, + ), th.Property( "ssh_tunnel", th.ObjectType( diff --git a/tests/test_core.py b/tests/test_core.py index 0c29b24..3288466 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -338,6 +338,79 @@ def test_jsonb_array(): assert test_runner.records[altered_table_name][i] == rows[i] +def test_json_as_object(): + """Some use cases require JSON and JSONB columns to be typed as Objects.""" + table_name = "test_json_as_object" + engine = sa.create_engine(SAMPLE_CONFIG["sqlalchemy_url"], future=True) + + metadata_obj = sa.MetaData() + table = sa.Table( + table_name, + metadata_obj, + sa.Column("column_jsonb", JSONB), + sa.Column("column_json", JSON), + ) + + rows = [ + {"column_jsonb": {"foo": "bar"}, "column_json": {"baz": "foo"}}, + {"column_jsonb": 3.14, "column_json": -9.3}, + {"column_jsonb": 22, "column_json": 10000000}, + {"column_jsonb": {}, "column_json": {}}, + {"column_jsonb": ["bar", "foo"], "column_json": ["foo", "baz"]}, + {"column_jsonb": True, "column_json": False}, + ] + + with engine.begin() as conn: + table.drop(conn, checkfirst=True) + metadata_obj.create_all(conn) + insert = table.insert().values(rows) + conn.execute(insert) + + copied_config = copy.deepcopy(SAMPLE_CONFIG) + # This should cause the same data to pass + copied_config["json_as_object"] = True + + tap = TapPostgres(config=copied_config) + tap_catalog = json.loads(tap.catalog_json_text) + altered_table_name = f"{DB_SCHEMA_NAME}-{table_name}" + + for stream in tap_catalog["streams"]: + if stream.get("stream") and altered_table_name not in stream["stream"]: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = False + else: + for metadata in stream["metadata"]: + metadata["metadata"]["selected"] = True + if metadata["breadcrumb"] == []: + metadata["metadata"]["replication-method"] = "FULL_TABLE" + + test_runner = PostgresTestRunner( + tap_class=TapPostgres, config=SAMPLE_CONFIG, catalog=tap_catalog + ) + test_runner.sync_all() + for schema_message in test_runner.schema_messages: + if ( + "stream" in schema_message + and schema_message["stream"] == altered_table_name + ): + assert schema_message["schema"]["properties"]["column_jsonb"] == { + "type": [ + "object", + "null", + ] + } + assert schema_message["schema"]["properties"]["column_json"] == { + "type": [ + "object", + "null", + ] + } + + assert len(rows) == len(test_runner.records[altered_table_name]) + for expected_row, actual_row in zip(rows, test_runner.records[altered_table_name]): + assert actual_row == expected_row + + def test_numeric_types(): """Schema was wrong for Decimal objects. Check they are correctly selected.""" table_name = "test_decimal"