diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 12d2c2d..ed688be 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -4,7 +4,7 @@ updates: - package-ecosystem: "pip" directory: "/" schedule: - interval: "weekly" + interval: "daily" - package-ecosystem: "github-actions" directory: "/" diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1edaa9a..94a5373 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,15 +28,15 @@ jobs: matrix: os: ["ubuntu-latest"] python-version: [ - "3.9", + "3.10", "3.12", ] - env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} - # Do not tear down Testcontainers - TC_KEEPALIVE: true + TC_KEEPALIVE: true # Do not tear down Testcontainers + UV_PYTHON_DOWNLOADS: never + UV_SYSTEM_PYTHON: true # https://docs.github.com/en/actions/using-containerized-services/about-service-containers services: @@ -56,19 +56,19 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - architecture: x64 - cache: 'pip' - cache-dependency-path: 'pyproject.toml' + + - name: Set up uv + uses: astral-sh/setup-uv@v7 + with: + cache-dependency-glob: | + pyproject.toml + cache-suffix: ${{ matrix.python-version }} + enable-cache: true + version: "latest" - name: Set up project run: | - - # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). - # https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 - pip install "setuptools>=64" --upgrade - - # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[all,develop,test] + uv pip install --editable='.[all,develop,test]' - name: Run linter and software tests run: | diff --git a/.gitignore b/.gitignore index 2614b47..7279471 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .venv* *.egg-info .coverage* +.http_cache coverage.xml build dist diff --git a/CHANGES.md b/CHANGES.md index 3db9c11..99d393c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,10 @@ ## In progress - Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`. - Improve write operations to be closer to `target-postgres`. +- Switch to new SQLAlchemy dialect for CrateDB. +- Removed workaround for `_`-prefixed column names, which needs + CrateDB 6.2 and higher. +- Dependencies: Updated to vanilla meltanolabs-target-postgres 0.6 ## 2023-12-08 v0.0.1 - Make it work. It can run the canonical Meltano GitHub -> DB example. diff --git a/README.md b/README.md index cd421f4..dc3926a 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ and loaders, and based on the [Meltano PostgreSQL target]. In order to learn more about Singer, Meltano, and friends, navigate to the [Singer Intro](./docs/singer-intro.md). +Operating the package successfully needs CrateDB 6.2 or higher. ## Install diff --git a/pyproject.toml b/pyproject.toml index d5a6a25..8c9ad29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ license = { text = "MIT" } authors = [ { name = "Andreas Motl", email = "andreas.motl@crate.io" }, ] -requires-python = ">=3.8,<3.13" +requires-python = ">=3.10,<3.13" classifiers = [ "Development Status :: 3 - Alpha", "Environment :: Console", @@ -55,8 +55,6 @@ classifiers = [ "Operating System :: Unix", "Programming Language :: Python", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -89,10 +87,10 @@ dynamic = [ "version", ] dependencies = [ - "crate[sqlalchemy]<1", "cratedb-toolkit", - "importlib-resources; python_version<'3.9'", # "meltanolabs-target-postgres==0.0.9", - "meltanolabs-target-postgres @ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector", + "importlib-resources; python_version<'3.9'", # "meltanolabs-target-postgres==0.0.9", + "meltanolabs-target-postgres>=0.6,<0.7", + "sqlalchemy-cratedb", ] optional-dependencies.all = [ "meltano-target-cratedb[vector]", @@ -113,10 +111,13 @@ optional-dependencies.test = [ "pytest<9", "pytest-cov<5", "pytest-mock<4", + "tap-countries", + "tap-fundamentals", ] -optional-dependencies.vector = [ - "numpy", -] +# optional-dependencies.vector = [ +# "meltanolabs-target-postgres @ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector", +# "sqlalchemy-cratedb[vector]", +# ] urls.changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md" urls.documentation = "https://github.com/crate-workbench/meltano-target-cratedb" urls.homepage = "https://github.com/crate-workbench/meltano-target-cratedb" @@ -126,6 +127,10 @@ scripts.target-cratedb = "target_cratedb.target:TargetCrateDB.cli" [tool.setuptools.packages.find] namespaces = false +[tool.uv.sources] +tap-countries = { git = "https://github.com/meltano/sdk.git", subdirectory = "packages/tap-countries", rev = "main" } +tap-fundamentals = { git = "https://github.com/meltano/sdk.git", subdirectory = "packages/tap-fundamentals", rev = "main" } + [tool.black] line-length = 120 diff --git a/target_cratedb/connector.py b/target_cratedb/connector.py index f0b35ed..75167c1 100644 --- a/target_cratedb/connector.py +++ b/target_cratedb/connector.py @@ -7,13 +7,14 @@ from datetime import datetime import sqlalchemy as sa -from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray from singer_sdk import typing as th from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type +from sqlalchemy_cratedb.type import FloatVector, ObjectType +from sqlalchemy_cratedb.type.array import _ObjectArray +from sqlalchemy_cratedb.type.object import ObjectTypeImpl from target_postgres.connector import NOTYPE, PostgresConnector from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine -from target_cratedb.sqlalchemy.vector import FloatVector class CrateDBConnector(PostgresConnector): @@ -226,6 +227,9 @@ def _get_type_sort_key( if isinstance(sql_type, NOTYPE): return 0, _len + if not hasattr(sql_type, "python_type"): + raise TypeError(f"Resolving type for sort key failed: {sql_type}") + _pytype = t.cast(type, sql_type.python_type) if issubclass(_pytype, (str, bytes)): return 900, _len diff --git a/target_cratedb/sinks.py b/target_cratedb/sinks.py index 5040212..ec3e2e3 100644 --- a/target_cratedb/sinks.py +++ b/target_cratedb/sinks.py @@ -2,11 +2,10 @@ import datetime import os -import time from typing import List, Optional, Union import sqlalchemy as sa -from pendulum import now +from singer_sdk.sql.connector import FullyQualifiedName from sqlalchemy.util import asbool from target_postgres.sinks import PostgresSink @@ -20,9 +19,6 @@ class CrateDBSink(PostgresSink): connector_class = CrateDBConnector - soft_delete_column_name = "__sdc_deleted_at" - version_column_name = "__sdc_table_version" - def __init__(self, *args, **kwargs): """Initialize SQL Sink. See super class for more details.""" super().__init__(*args, **kwargs) @@ -32,91 +28,6 @@ def __init__(self, *args, **kwargs): # operations on the target table. self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT - # Record processing - - def _add_sdc_metadata_to_record( - self, - record: dict, - message: dict, - context: dict, - ) -> None: - """Populate metadata _sdc columns from incoming record message. - - Record metadata specs documented at: - https://sdk.meltano.com/en/latest/implementation/record_metadata.html - - Args: - record: Individual record in the stream. - message: The record message. - context: Stream partition or context dictionary. - """ - record["__sdc_extracted_at"] = message.get("time_extracted") - record["__sdc_received_at"] = datetime.datetime.now( - tz=datetime.timezone.utc, - ).isoformat() - record["__sdc_batched_at"] = ( - context.get("batch_start_time", None) or datetime.datetime.now(tz=datetime.timezone.utc) - ).isoformat() - record["__sdc_deleted_at"] = record.get("__sdc_deleted_at") - record["__sdc_sequence"] = int(round(time.time() * 1000)) - record["__sdc_table_version"] = message.get("version") - record["__sdc_sync_started_at"] = self.sync_started_at - - def _add_sdc_metadata_to_schema(self) -> None: - """Add _sdc metadata columns. - - Record metadata specs documented at: - https://sdk.meltano.com/en/latest/implementation/record_metadata.html - """ - properties_dict = self.schema["properties"] - for col in ( - "__sdc_extracted_at", - "__sdc_received_at", - "__sdc_batched_at", - "__sdc_deleted_at", - ): - properties_dict[col] = { - "type": ["null", "string"], - "format": "date-time", - } - for col in ("__sdc_sequence", "__sdc_table_version", "__sdc_sync_started_at"): - properties_dict[col] = {"type": ["null", "integer"]} - - def _remove_sdc_metadata_from_schema(self) -> None: - """Remove _sdc metadata columns. - - Record metadata specs documented at: - https://sdk.meltano.com/en/latest/implementation/record_metadata.html - """ - properties_dict = self.schema["properties"] - for col in ( - "__sdc_extracted_at", - "__sdc_received_at", - "__sdc_batched_at", - "__sdc_deleted_at", - "__sdc_sequence", - "__sdc_table_version", - "__sdc_sync_started_at", - ): - properties_dict.pop(col, None) - - def _remove_sdc_metadata_from_record(self, record: dict) -> None: - """Remove metadata _sdc columns from incoming record message. - - Record metadata specs documented at: - https://sdk.meltano.com/en/latest/implementation/record_metadata.html - - Args: - record: Individual record in the stream. - """ - record.pop("__sdc_extracted_at", None) - record.pop("__sdc_received_at", None) - record.pop("__sdc_batched_at", None) - record.pop("__sdc_deleted_at", None) - record.pop("__sdc_sequence", None) - record.pop("__sdc_table_version", None) - record.pop("__sdc_sync_started_at", None) - def process_batch(self, context: dict) -> None: """Process a batch with the given batch context. @@ -303,7 +214,8 @@ def activate_version(self, new_version: int) -> None: if not self.connector.table_exists(self.full_table_name): return - deleted_at = now() + deleted_at = datetime.datetime.now(tz=datetime.timezone.utc) + # Different from SingerSDK as we need to handle types the # same as SCHEMA messsages datetime_type = self.connector.to_sql_type({"type": "string", "format": "date-time"}) @@ -388,10 +300,12 @@ def refresh_table(self, table: Union[sa.Table, str]): Synchronize write operations on CrateDB. """ with self.connector._connect() as connection: - if isinstance(table, sa.Table): + if isinstance(table, FullyQualifiedName): + table_full = str(table) + elif isinstance(table, sa.Table): table_full = f'"{table.schema}"."{table.name}"' elif isinstance(table, str): table_full = table else: - raise TypeError(f"Unknown type for `table`: {table}") + raise TypeError(f"Unknown type `{type(table)}` for table: {table}") connection.exec_driver_sql(f"REFRESH TABLE {table_full};") diff --git a/target_cratedb/sqlalchemy/patch.py b/target_cratedb/sqlalchemy/patch.py index 3de6909..35d9d33 100644 --- a/target_cratedb/sqlalchemy/patch.py +++ b/target_cratedb/sqlalchemy/patch.py @@ -1,11 +1,11 @@ from _decimal import Decimal from datetime import datetime +from typing import Any, Union +import crate.client.http import sqlalchemy as sa -from crate.client.http import CrateJsonEncoder -from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime -from crate.client.sqlalchemy.types import _ObjectArray -from sqlalchemy.sql import sqltypes +from sqlalchemy_cratedb.dialect import TYPES_MAP, DateTime +from sqlalchemy_cratedb.type.array import _ObjectArray def patch_sqlalchemy(): @@ -19,20 +19,21 @@ def patch_types(): TODO: Upstream to crate-python. """ - TYPES_MAP["bigint"] = sqltypes.BIGINT - TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT) - TYPES_MAP["long"] = sqltypes.BIGINT - TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT) - TYPES_MAP["real"] = sqltypes.DOUBLE - TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE) - TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP - TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP + # abc() + TYPES_MAP["bigint"] = sa.BIGINT + TYPES_MAP["bigint_array"] = sa.ARRAY(sa.BIGINT) + TYPES_MAP["long"] = sa.BIGINT + TYPES_MAP["long_array"] = sa.ARRAY(sa.BIGINT) + TYPES_MAP["real"] = sa.DOUBLE + TYPES_MAP["real_array"] = sa.ARRAY(sa.DOUBLE) + TYPES_MAP["timestamp without time zone"] = sa.TIMESTAMP + TYPES_MAP["timestamp with time zone"] = sa.TIMESTAMP # TODO: Can `ARRAY` be inherited from PostgreSQL's # `ARRAY`, to make type checking work? def as_generic(self, allow_nulltype: bool = False): - return sqltypes.ARRAY + return sa.ARRAY _ObjectArray.as_generic = as_generic @@ -58,14 +59,14 @@ def patch_json_encoder(): TODO: Upstream to crate-python. """ - json_encoder_default = CrateJsonEncoder.default + json_encoder_default = crate.client.http.json_encoder - def default(self, o): - if isinstance(o, Decimal): - return float(o) - return json_encoder_default(o) + def json_encoder_new(obj: Any) -> Union[int, str, float]: + if isinstance(obj, Decimal): + return float(obj) + return json_encoder_default(obj) - CrateJsonEncoder.default = default + crate.client.http.json_encoder = json_encoder_new def polyfill_refresh_after_dml_engine(engine: sa.Engine): diff --git a/target_cratedb/sqlalchemy/vector.py b/target_cratedb/sqlalchemy/vector.py deleted file mode 100644 index 4c99a4f..0000000 --- a/target_cratedb/sqlalchemy/vector.py +++ /dev/null @@ -1,139 +0,0 @@ -# TODO: Refactor to CrateDB SQLAlchemy dialect. -import typing as t - -import numpy as np -import numpy.typing as npt -import sqlalchemy as sa -from crate.client.sqlalchemy.compiler import CrateTypeCompiler -from crate.client.sqlalchemy.dialect import TYPES_MAP -from sqlalchemy import TypeDecorator -from sqlalchemy.sql import sqltypes - -__all__ = ["FloatVector"] - - -def from_db(value: t.Iterable) -> t.Optional[npt.ArrayLike]: - # from `pgvector.utils` - # could be ndarray if already cast by lower-level driver - if value is None or isinstance(value, np.ndarray): - return value - - return np.array(value, dtype=np.float32) - - -def to_db(value: t.Any, dim: t.Optional[int] = None) -> t.Optional[t.List]: - # from `pgvector.utils` - if value is None: - return value - - if isinstance(value, np.ndarray): - if value.ndim != 1: - raise ValueError("expected ndim to be 1") - - if not np.issubdtype(value.dtype, np.integer) and not np.issubdtype(value.dtype, np.floating): - raise ValueError("dtype must be numeric") - - value = value.tolist() - - if dim is not None and len(value) != dim: - raise ValueError("expected %d dimensions, not %d" % (dim, len(value))) - - return value - - -class FloatVector(TypeDecorator[t.Sequence[float]]): - """ - An improved implementation of the `FloatVector` data type for CrateDB, - compared to the previous implementation on behalf of the LangChain adapter. - - https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html#float-vector - https://crate.io/docs/crate/reference/en/master/general/builtins/scalar-functions.html#scalar-knn-match - - The previous implementation, based on SQLAlchemy's `UserDefinedType`, didn't - respect the `python_type` property on backward/reverse resolution of types. - This was observed on Meltano's database connector machinery doing a - type cast, which led to a `NotImplementedError`. - - typing.cast(type, sql_type.python_type) => NotImplementedError - - The `UserDefinedType` approach is easier to implement, because it doesn't - need compiler support. - - To get full SQLAlchemy type support, including support for forward- and - backward resolution / type casting, the custom data type should derive - from SQLAlchemy's `TypeEngine` base class instead. - - When deriving from `TypeEngine`, you will need to set the `__visit_name__` - attribute, and add a corresponding visitor method to the `CrateTypeCompiler`, - in this case, `visit_FLOAT_VECTOR`. - - Now, rendering a DDL succeeds. However, when reflecting the DDL schema back, - it doesn't work until you will establish a corresponding reverse type mapping. - - By invoking `SELECT DISTINCT(data_type) FROM information_schema.columns;`, - you will find out that the internal type name is `float_vector`, so you - announce it to the dialect using `TYPES_MAP["float_vector"] = FloatVector`. - - Still not there: `NotImplementedError: Default TypeEngine.as_generic() heuristic - method was unsuccessful for target_cratedb.sqlalchemy.vector.FloatVector. A - custom as_generic() method must be implemented for this type class.` - - So, as it signals that the type implementation also needs an `as_generic` - property, let's supply one, returning `sqltypes.ARRAY`. - - It looks like, in exchange to those improvements, the `get_col_spec` - method is not needed any longer. - - TODO: Would it be a good idea to derive from SQLAlchemy's - `ARRAY` right away, to get a few of the features without - the need to redefine them? - - Please note the outcome of this analysis and the corresponding implementation - has been derived from empirical observations, and from the feeling that we also - lack corresponding support on the other special data types of CrateDB (ARRAY and - OBJECT) within the SQLAlchemy dialect, i.e. "that something must be wrong or - incomplete". In this spirit, it is advisable to review and improve their - implementations correspondingly. - """ - - cache_ok = False - - __visit_name__ = "FLOAT_VECTOR" - - _is_array = True - - zero_indexes = False - - impl = sa.ARRAY - - def __init__(self, dimensions: int = None): - super().__init__(sa.FLOAT, dimensions=dimensions) - - def as_generic(self, allow_nulltype: bool = False): - return sqltypes.ARRAY - - def bind_processor(self, dialect: sa.Dialect) -> t.Callable: - def process(value: t.Iterable) -> t.Optional[t.List]: - return to_db(value, self.dimensions) - - return process - - def result_processor(self, dialect: sa.Dialect, coltype: t.Any) -> t.Callable: - def process(value: t.Any) -> t.Optional[npt.ArrayLike]: - return from_db(value) - - return process - - -# Accompanies the type definition for reverse type lookups. -TYPES_MAP["float_vector"] = FloatVector - - -def visit_FLOAT_VECTOR(self, type_, **kw): - dimensions = type_.dimensions - if dimensions is None: - raise ValueError("FloatVector must be initialized with dimension size") - return f"FLOAT_VECTOR({dimensions})" - - -CrateTypeCompiler.visit_FLOAT_VECTOR = visit_FLOAT_VECTOR diff --git a/target_cratedb/tests/data_files/array_boolean.singer b/target_cratedb/tests/data_files/array_boolean.singer new file mode 100644 index 0000000..268a64a --- /dev/null +++ b/target_cratedb/tests/data_files/array_boolean.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_boolean", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "boolean"}}}}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 1, "value": [ true, false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 2, "value": [ false ]}} +{"type": "RECORD", "stream": "array_boolean", "record": {"id": 3, "value": [ false, true, true, false ]}} +{"type": "STATE", "value": {"array_boolean": 3}} diff --git a/target_cratedb/tests/data_files/array_float_vector.singer b/target_cratedb/tests/data_files/array_float_vector.singer new file mode 100644 index 0000000..9f4cd04 --- /dev/null +++ b/target_cratedb/tests/data_files/array_float_vector.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_float_vector", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}, "additionalProperties": {"storage": {"type": "vector", "dim": 4}}}}}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 1, "value": [ 1.1, 2.1, 1.1, 1.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 2, "value": [ 1.0, 1.0, 1.0, 2.3 ]}} +{"type": "RECORD", "stream": "array_float_vector", "record": {"id": 3, "value": [ 2.0, 1.2, 1.0, 0.9 ]}} +{"type": "STATE", "value": {"array_float_vector": 3}} diff --git a/target_cratedb/tests/data_files/array_number.singer b/target_cratedb/tests/data_files/array_number.singer new file mode 100644 index 0000000..4eac276 --- /dev/null +++ b/target_cratedb/tests/data_files/array_number.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_number", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "number"}}}}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 1, "value": [ 42.42, 84.84, 23 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 2, "value": [ 1.0 ]}} +{"type": "RECORD", "stream": "array_number", "record": {"id": 3, "value": [ 1.11, 2.22, 3, 4, 5.55 ]}} +{"type": "STATE", "value": {"array_number": 3}} diff --git a/target_cratedb/tests/data_files/array_string.singer b/target_cratedb/tests/data_files/array_string.singer new file mode 100644 index 0000000..f14e787 --- /dev/null +++ b/target_cratedb/tests/data_files/array_string.singer @@ -0,0 +1,6 @@ +{"type": "SCHEMA", "stream": "array_string", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array","items": {"type": "string"}}}}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 1, "value": [ "apple", "orange", "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 2, "value": [ "banana", "apple" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 3, "value": [ "pear" ]}} +{"type": "RECORD", "stream": "array_string", "record": {"id": 4, "value": [ "orange", "banana", "apple", "pear" ]}} +{"type": "STATE", "value": {"array_string": 4}} diff --git a/target_cratedb/tests/data_files/array_timestamp.singer b/target_cratedb/tests/data_files/array_timestamp.singer new file mode 100644 index 0000000..e5192ce --- /dev/null +++ b/target_cratedb/tests/data_files/array_timestamp.singer @@ -0,0 +1,5 @@ +{"type": "SCHEMA", "stream": "array_timestamp", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "array", "items": {"type": "string", "format": "date-time"}}}}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 1, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 2, "value": [ "2023-12-13T01:15:02" ]}} +{"type": "RECORD", "stream": "array_timestamp", "record": {"id": 3, "value": [ "2023-12-13T01:15:02", "2023-12-13T01:16:02", "2023-12-13T01:17:02" ]}} +{"type": "STATE", "value": {"array_timestamp": 3}} diff --git a/target_cratedb/tests/data_files/object_mixed.singer b/target_cratedb/tests/data_files/object_mixed.singer new file mode 100644 index 0000000..2ed8626 --- /dev/null +++ b/target_cratedb/tests/data_files/object_mixed.singer @@ -0,0 +1,3 @@ +{"type": "SCHEMA", "stream": "object_mixed", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "object"}}}} +{"type": "RECORD", "stream": "object_mixed", "record": {"id": 1, "value": {"string": "foo", "integer": 42, "float": 42.42, "timestamp": "2023-12-13T01:15:02", "array_boolean": [true, false], "array_float": [42.42, 84.84], "array_integer": [42, 84], "array_string": ["foo", "bar"], "nested_object": {"foo": "bar"}}}} +{"type": "STATE", "value": {"object_mixed": 1}} diff --git a/target_cratedb/tests/test_standard_target.py b/target_cratedb/tests/test_standard_target.py index 92dee00..3a150ec 100644 --- a/target_cratedb/tests/test_standard_target.py +++ b/target_cratedb/tests/test_standard_target.py @@ -6,22 +6,19 @@ import io from contextlib import redirect_stdout -import jsonschema import pytest import sqlalchemy as sa -from crate.client.sqlalchemy.types import ObjectTypeImpl -from singer_sdk.exceptions import MissingKeyPropertiesError +from singer_sdk.exceptions import InvalidRecord, MissingKeyPropertiesError from singer_sdk.testing import sync_end_to_end -from target_postgres.tests.samples.aapl.aapl import Fundamentals -from target_postgres.tests.samples.sample_tap_countries.countries_tap import ( - SampleTapCountries, -) -from target_postgres.tests.test_target_postgres import AssertionHelper +from sqlalchemy_cratedb.type import FloatVector +from sqlalchemy_cratedb.type.object import ObjectTypeImpl +from tap_countries.tap import TapCountries +from tap_fundamentals import Fundamentals +from target_postgres.tests.test_target_postgres import verify_data from target_cratedb.connector import CrateDBConnector from target_cratedb.sinks import MELTANO_CRATEDB_STRATEGY_DIRECT from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine -from target_cratedb.sqlalchemy.vector import FloatVector from target_cratedb.target import TargetCrateDB try: @@ -30,7 +27,7 @@ from importlib_resources import files as resource_files # type: ignore[no-redef] -METADATA_COLUMN_PREFIX = "__sdc" +METADATA_COLUMN_PREFIX = "_sdc" @pytest.fixture(scope="session") @@ -120,11 +117,6 @@ def initialize_database(cratedb_config): conn.exec_driver_sql("CREATE TABLE IF NOT EXISTS melty.foo (a INT);") -@pytest.fixture -def helper(cratedb_target) -> AssertionHelper: - return AssertionHelper(target=cratedb_target, metadata_column_prefix=METADATA_COLUMN_PREFIX) - - def singer_file_to_target(file_name, target) -> None: """Singer file to Target, emulates a tap run @@ -153,6 +145,42 @@ def singer_file_to_target(file_name, target) -> None: target.listen(buf) +def verify_schema( + target: TargetCrateDB, + table_name: str, + check_columns: dict = None, +): + """Checks whether the schema of a database table matches the provided column definitions. + + Args: + target: The target to obtain a database connection from. + table_name: The schema and table name of the table to check data for. + check_columns: A dictionary mapping column names to their definitions. Currently, + it is all about the `type` attribute which is compared. + metadata_column_prefix: The prefix string for metadata columns. Usually `_sdc`. + """ + check_columns = check_columns or {} + engine = create_engine(target) + schema = target.config["default_target_schema"] + with engine.connect() as connection: + meta = sa.MetaData() + table = sa.Table(table_name, meta, schema=schema, autoload_with=connection) + for column in table.c: + # Ignore `_sdc` metadata columns when verifying table schema. + if column.name.startswith(METADATA_COLUMN_PREFIX): + continue + try: + column_type_expected = check_columns[column.name]["type"] + except KeyError as ex: + raise ValueError(f"Invalid check_columns - missing definition for column: {column.name}") from ex + if not isinstance(column.type, column_type_expected): + raise TypeError( + f"Column '{column.name}' (with type '{column.type}') " + f"does not match expected type: {column_type_expected}" + ) + engine.dispose() + + # TODO should set schemas for each tap individually so we don't collide @@ -169,7 +197,7 @@ def test_sqlalchemy_url_config(cratedb_config): port = cratedb_config["port"] config = {"sqlalchemy_url": f"crate://{user}:{password}@{host}:{port}/{database}"} - tap = SampleTapCountries(config={}, state=None) + tap = TapCountries(config={}, state=None) target = TargetCrateDB(config=config) sync_end_to_end(tap, target) @@ -225,7 +253,7 @@ def test_port_config(): # Test name would work well def test_countries_to_cratedb(cratedb_config): - tap = SampleTapCountries(config={}, state=None) + tap = TapCountries(config={}, state=None) target = TargetCrateDB(config=cratedb_config) sync_end_to_end(tap, target) @@ -252,9 +280,10 @@ def test_record_missing_key_property(cratedb_target): def test_record_missing_required_property(cratedb_target): - with pytest.raises(jsonschema.exceptions.ValidationError): + with pytest.raises(InvalidRecord) as e: file_name = "record_missing_required_property.singer" singer_file_to_target(file_name, cratedb_target) + assert "Record Message Validation Error: 'id' is a required property" in str(e.value) @pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode") @@ -269,11 +298,11 @@ def test_special_chars_in_attributes(cratedb_target): singer_file_to_target(file_name, cratedb_target) -def test_optional_attributes(cratedb_target, helper): +def test_optional_attributes(cratedb_target): file_name = "optional_attributes.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "optional": "This is optional"} - helper.verify_data("test_optional_attributes", 4, "id", row) + verify_data(cratedb_target, "test_optional_attributes", 4, "id", row) def test_schema_no_properties(cratedb_target): @@ -282,7 +311,7 @@ def test_schema_no_properties(cratedb_target): singer_file_to_target(file_name, cratedb_target) -def test_schema_updates(cratedb_target, helper): +def test_schema_updates(cratedb_target): file_name = "schema_updates.singer" singer_file_to_target(file_name, cratedb_target) row = { @@ -294,16 +323,16 @@ def test_schema_updates(cratedb_target, helper): "a5": None, "a6": None, } - helper.verify_data("test_schema_updates", 6, "id", row) + verify_data(cratedb_target, "test_schema_updates", 6, "id", row) -def test_multiple_state_messages(cratedb_target, helper): +def test_multiple_state_messages(cratedb_target): file_name = "multiple_state_messages.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "metric": 100} - helper.verify_data("test_multiple_state_messages_a", 6, "id", row) + verify_data(cratedb_target, "test_multiple_state_messages_a", 6, "id", row) row = {"id": 1, "metric": 110} - helper.verify_data("test_multiple_state_messages_b", 6, "id", row) + verify_data(cratedb_target, "test_multiple_state_messages_b", 6, "id", row) # TODO test that data is correct @@ -321,7 +350,7 @@ def test_multiple_schema_messages(cratedb_target, caplog): @pytest.mark.skip("ColumnValidationException[Validation failed for id: Updating a primary key is not supported]") -def test_relational_data(cratedb_target, helper): +def test_relational_data(cratedb_target): file_name = "user_location_data.singer" singer_file_to_target(file_name, cratedb_target) @@ -378,12 +407,12 @@ def test_relational_data(cratedb_target, helper): }, ] - helper.verify_data("test_users", 8, "id", users) - helper.verify_data("test_locations", 5, "id", locations) - helper.verify_data("test_user_in_location", 5, "id", user_in_location) + verify_data("test_users", 8, "id", users) + verify_data("test_locations", 5, "id", locations) + verify_data("test_user_in_location", 5, "id", user_in_location) -def test_no_primary_keys(cratedb_target, helper): +def test_no_primary_keys(cratedb_target): """We run both of these tests twice just to ensure that no records are removed and append only works properly""" engine = create_engine(cratedb_target) table_name = "test_no_pk" @@ -403,7 +432,7 @@ def test_no_primary_keys(cratedb_target, helper): singer_file_to_target(file_name, cratedb_target) # Will populate 22 records, we run this twice. - helper.verify_data(table_name, 16) + verify_data(cratedb_target, table_name, 16) def test_no_type(cratedb_target): @@ -411,19 +440,20 @@ def test_no_type(cratedb_target): singer_file_to_target(file_name, cratedb_target) -def test_duplicate_records(cratedb_target, helper): +def test_duplicate_records(cratedb_target): file_name = "duplicate_records.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "metric": 100} - helper.verify_data("test_duplicate_records", 2, "id", row) + verify_data(cratedb_target, "test_duplicate_records", 2, "id", row) -def test_array_boolean(cratedb_target, helper): +def test_array_boolean(cratedb_target): file_name = "array_boolean.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "value": [True, False]} - helper.verify_data("array_boolean", 3, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "array_boolean", 3, "id", row) + verify_schema( + cratedb_target, "array_boolean", check_columns={ "id": {"type": sa.BIGINT}, @@ -432,16 +462,18 @@ def test_array_boolean(cratedb_target, helper): ) +@pytest.mark.skip("pgvector patch did not land in upstream target-postgres yet") @pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode") -def test_array_float_vector(cratedb_target, helper): +def test_array_float_vector(cratedb_target): file_name = "array_float_vector.singer" singer_file_to_target(file_name, cratedb_target) row = { "id": 1, "value": [1.1, 2.1, 1.1, 1.3], } - helper.verify_data("array_float_vector", 3, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "array_float_vector", 3, "id", row) + verify_schema( + cratedb_target, "array_float_vector", check_columns={ "id": {"type": sa.BIGINT}, @@ -450,12 +482,13 @@ def test_array_float_vector(cratedb_target, helper): ) -def test_array_number(cratedb_target, helper): +def test_array_number(cratedb_target): file_name = "array_number.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "value": [42.42, 84.84, 23]} - helper.verify_data("array_number", 3, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "array_number", 3, "id", row) + verify_schema( + cratedb_target, "array_number", check_columns={ "id": {"type": sa.BIGINT}, @@ -464,12 +497,13 @@ def test_array_number(cratedb_target, helper): ) -def test_array_string(cratedb_target, helper): +def test_array_string(cratedb_target): file_name = "array_string.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "value": ["apple", "orange", "pear"]} - helper.verify_data("array_string", 4, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "array_string", 4, "id", row) + verify_schema( + cratedb_target, "array_string", check_columns={ "id": {"type": sa.BIGINT}, @@ -478,12 +512,13 @@ def test_array_string(cratedb_target, helper): ) -def test_array_timestamp(cratedb_target, helper): +def test_array_timestamp(cratedb_target): file_name = "array_timestamp.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "value": ["2023-12-13T01:15:02", "2023-12-13T01:16:02"]} - helper.verify_data("array_timestamp", 3, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "array_timestamp", 3, "id", row) + verify_schema( + cratedb_target, "array_timestamp", check_columns={ "id": {"type": sa.BIGINT}, @@ -492,7 +527,7 @@ def test_array_timestamp(cratedb_target, helper): ) -def test_object_mixed(cratedb_target, helper): +def test_object_mixed(cratedb_target): file_name = "object_mixed.singer" singer_file_to_target(file_name, cratedb_target) row = { @@ -509,8 +544,9 @@ def test_object_mixed(cratedb_target, helper): "nested_object": {"foo": "bar"}, }, } - helper.verify_data("object_mixed", 1, "id", row) - helper.verify_schema( + verify_data(cratedb_target, "object_mixed", 1, "id", row) + verify_schema( + cratedb_target, "object_mixed", check_columns={ "id": {"type": sa.BIGINT}, @@ -519,7 +555,7 @@ def test_object_mixed(cratedb_target, helper): ) -def test_encoded_string_data(cratedb_target, helper): +def test_encoded_string_data(cratedb_target): """ We removed NUL characters from the original encoded_strings.singer as postgres doesn't allow them. https://www.postgresql.org/docs/current/functions-string.html#:~:text=chr(0)%20is%20disallowed%20because%20text%20data%20types%20cannot%20store%20that%20character. @@ -534,11 +570,11 @@ def test_encoded_string_data(cratedb_target, helper): file_name = "encoded_strings.singer" singer_file_to_target(file_name, cratedb_target) row = {"id": 1, "info": "simple string 2837"} - helper.verify_data("test_strings", 11, "id", row) + verify_data(cratedb_target, "test_strings", 11, "id", row) row = {"id": 1, "info": {"name": "simple", "value": "simple string 2837"}} - helper.verify_data("test_strings_in_objects", 11, "id", row) + verify_data(cratedb_target, "test_strings_in_objects", 11, "id", row) row = {"id": 1, "strings": ["simple string", "απλή συμβολοσειρά", "简单的字串"]} - helper.verify_data("test_strings_in_arrays", 6, "id", row) + verify_data(cratedb_target, "test_strings_in_arrays", 6, "id", row) @pytest.mark.skip("Fails with: SQLParseException[Limit of total fields [1000] in index [melty.aapl] has been exceeded]") @@ -628,7 +664,7 @@ def test_activate_version_hard_delete(cratedb_config): singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection, connection.begin(): result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) - assert result.rowcount == 7 + assert result.rowcount == 8 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( @@ -642,14 +678,14 @@ def test_activate_version_hard_delete(cratedb_config): connection.execute(sa.text(f"REFRESH TABLE {full_table_name}")) with engine.connect() as connection, connection.begin(): result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) - assert result.rowcount == 9 + assert result.rowcount == 10 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection, connection.begin(): result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) - assert result.rowcount == 7 + assert result.rowcount == 8 def test_activate_version_soft_delete(cratedb_target):