Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ updates:
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
interval: "daily"

- package-ecosystem: "github-actions"
directory: "/"
Expand Down
28 changes: 14 additions & 14 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ jobs:
matrix:
os: ["ubuntu-latest"]
python-version: [
"3.9",
"3.10",
"3.12",
]

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
# Do not tear down Testcontainers
TC_KEEPALIVE: true
TC_KEEPALIVE: true # Do not tear down Testcontainers
UV_PYTHON_DOWNLOADS: never
UV_SYSTEM_PYTHON: true

# https://docs.github.com/en/actions/using-containerized-services/about-service-containers
services:
Expand All @@ -56,19 +56,19 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path: 'pyproject.toml'

- name: Set up uv
uses: astral-sh/setup-uv@v7
with:
cache-dependency-glob: |
pyproject.toml
cache-suffix: ${{ matrix.python-version }}
enable-cache: true
version: "latest"

- name: Set up project
run: |

# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[all,develop,test]
uv pip install --editable='.[all,develop,test]'

- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.venv*
*.egg-info
.coverage*
.http_cache
coverage.xml
build
dist
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
- Improve write operations to be closer to `target-postgres`.
- Switch to new SQLAlchemy dialect for CrateDB.
- Removed workaround for `_`-prefixed column names, which needs
CrateDB 6.2 and higher.
- Dependencies: Updated to vanilla meltanolabs-target-postgres 0.6

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and loaders, and based on the [Meltano PostgreSQL target].
In order to learn more about Singer, Meltano, and friends, navigate to the
[Singer Intro](./docs/singer-intro.md).

Operating the package successfully needs CrateDB 6.2 or higher.

## Install

Expand Down
23 changes: 14 additions & 9 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ license = { text = "MIT" }
authors = [
{ name = "Andreas Motl", email = "[email protected]" },
]
requires-python = ">=3.8,<3.13"
requires-python = ">=3.10,<3.13"
classifiers = [
"Development Status :: 3 - Alpha",
"Environment :: Console",
Expand All @@ -55,8 +55,6 @@ classifiers = [
"Operating System :: Unix",
"Programming Language :: Python",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
Expand Down Expand Up @@ -89,10 +87,10 @@ dynamic = [
"version",
]
dependencies = [
"crate[sqlalchemy]<1",
"cratedb-toolkit",
"importlib-resources; python_version<'3.9'", # "meltanolabs-target-postgres==0.0.9",
"meltanolabs-target-postgres @ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
"importlib-resources; python_version<'3.9'", # "meltanolabs-target-postgres==0.0.9",
"meltanolabs-target-postgres>=0.6,<0.7",
"sqlalchemy-cratedb",
]
optional-dependencies.all = [
"meltano-target-cratedb[vector]",
Expand All @@ -113,10 +111,13 @@ optional-dependencies.test = [
"pytest<9",
"pytest-cov<5",
"pytest-mock<4",
"tap-countries",
"tap-fundamentals",
]
optional-dependencies.vector = [
"numpy",
]
# optional-dependencies.vector = [
# "meltanolabs-target-postgres @ git+https://github.com/singer-contrib/meltanolabs-target-postgres.git@pgvector",
# "sqlalchemy-cratedb[vector]",
# ]
urls.changelog = "https://github.com/crate-workbench/meltano-target-cratedb/blob/main/CHANGES.md"
urls.documentation = "https://github.com/crate-workbench/meltano-target-cratedb"
urls.homepage = "https://github.com/crate-workbench/meltano-target-cratedb"
Expand All @@ -126,6 +127,10 @@ scripts.target-cratedb = "target_cratedb.target:TargetCrateDB.cli"
[tool.setuptools.packages.find]
namespaces = false

[tool.uv.sources]
tap-countries = { git = "https://github.com/meltano/sdk.git", subdirectory = "packages/tap-countries", rev = "main" }
tap-fundamentals = { git = "https://github.com/meltano/sdk.git", subdirectory = "packages/tap-fundamentals", rev = "main" }

[tool.black]
line-length = 120

Expand Down
8 changes: 6 additions & 2 deletions target_cratedb/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from datetime import datetime

import sqlalchemy as sa
from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray
from singer_sdk import typing as th
from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type
from sqlalchemy_cratedb.type import FloatVector, ObjectType
from sqlalchemy_cratedb.type.array import _ObjectArray
from sqlalchemy_cratedb.type.object import ObjectTypeImpl
from target_postgres.connector import NOTYPE, PostgresConnector

from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.vector import FloatVector


class CrateDBConnector(PostgresConnector):
Expand Down Expand Up @@ -226,6 +227,9 @@ def _get_type_sort_key(
if isinstance(sql_type, NOTYPE):
return 0, _len

if not hasattr(sql_type, "python_type"):
raise TypeError(f"Resolving type for sort key failed: {sql_type}")

_pytype = t.cast(type, sql_type.python_type)
if issubclass(_pytype, (str, bytes)):
return 900, _len
Expand Down
100 changes: 7 additions & 93 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import datetime
import os
import time
from typing import List, Optional, Union

import sqlalchemy as sa
from pendulum import now
from singer_sdk.sql.connector import FullyQualifiedName
from sqlalchemy.util import asbool
from target_postgres.sinks import PostgresSink

Expand All @@ -20,9 +19,6 @@ class CrateDBSink(PostgresSink):

connector_class = CrateDBConnector

soft_delete_column_name = "__sdc_deleted_at"
version_column_name = "__sdc_table_version"

def __init__(self, *args, **kwargs):
"""Initialize SQL Sink. See super class for more details."""
super().__init__(*args, **kwargs)
Expand All @@ -32,91 +28,6 @@ def __init__(self, *args, **kwargs):
# operations on the target table.
self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT

# Record processing

def _add_sdc_metadata_to_record(
self,
record: dict,
message: dict,
context: dict,
) -> None:
"""Populate metadata _sdc columns from incoming record message.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html

Args:
record: Individual record in the stream.
message: The record message.
context: Stream partition or context dictionary.
"""
record["__sdc_extracted_at"] = message.get("time_extracted")
record["__sdc_received_at"] = datetime.datetime.now(
tz=datetime.timezone.utc,
).isoformat()
record["__sdc_batched_at"] = (
context.get("batch_start_time", None) or datetime.datetime.now(tz=datetime.timezone.utc)
).isoformat()
record["__sdc_deleted_at"] = record.get("__sdc_deleted_at")
record["__sdc_sequence"] = int(round(time.time() * 1000))
record["__sdc_table_version"] = message.get("version")
record["__sdc_sync_started_at"] = self.sync_started_at

def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"__sdc_extracted_at",
"__sdc_received_at",
"__sdc_batched_at",
"__sdc_deleted_at",
):
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in ("__sdc_sequence", "__sdc_table_version", "__sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
"""Remove _sdc metadata columns.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"__sdc_extracted_at",
"__sdc_received_at",
"__sdc_batched_at",
"__sdc_deleted_at",
"__sdc_sequence",
"__sdc_table_version",
"__sdc_sync_started_at",
):
properties_dict.pop(col, None)

def _remove_sdc_metadata_from_record(self, record: dict) -> None:
"""Remove metadata _sdc columns from incoming record message.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html

Args:
record: Individual record in the stream.
"""
record.pop("__sdc_extracted_at", None)
record.pop("__sdc_received_at", None)
record.pop("__sdc_batched_at", None)
record.pop("__sdc_deleted_at", None)
record.pop("__sdc_sequence", None)
record.pop("__sdc_table_version", None)
record.pop("__sdc_sync_started_at", None)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.

Expand Down Expand Up @@ -303,7 +214,8 @@ def activate_version(self, new_version: int) -> None:
if not self.connector.table_exists(self.full_table_name):
return

deleted_at = now()
deleted_at = datetime.datetime.now(tz=datetime.timezone.utc)

# Different from SingerSDK as we need to handle types the
# same as SCHEMA messsages
datetime_type = self.connector.to_sql_type({"type": "string", "format": "date-time"})
Expand Down Expand Up @@ -388,10 +300,12 @@ def refresh_table(self, table: Union[sa.Table, str]):
Synchronize write operations on CrateDB.
"""
with self.connector._connect() as connection:
if isinstance(table, sa.Table):
if isinstance(table, FullyQualifiedName):
table_full = str(table)
elif isinstance(table, sa.Table):
table_full = f'"{table.schema}"."{table.name}"'
elif isinstance(table, str):
table_full = table
else:
raise TypeError(f"Unknown type for `table`: {table}")
raise TypeError(f"Unknown type `{type(table)}` for table: {table}")
connection.exec_driver_sql(f"REFRESH TABLE {table_full};")
39 changes: 20 additions & 19 deletions target_cratedb/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from _decimal import Decimal
from datetime import datetime
from typing import Any, Union

import crate.client.http
import sqlalchemy as sa
from crate.client.http import CrateJsonEncoder
from crate.client.sqlalchemy.dialect import ARRAY, TYPES_MAP, DateTime
from crate.client.sqlalchemy.types import _ObjectArray
from sqlalchemy.sql import sqltypes
from sqlalchemy_cratedb.dialect import TYPES_MAP, DateTime
from sqlalchemy_cratedb.type.array import _ObjectArray


def patch_sqlalchemy():
Expand All @@ -19,20 +19,21 @@ def patch_types():

TODO: Upstream to crate-python.
"""
TYPES_MAP["bigint"] = sqltypes.BIGINT
TYPES_MAP["bigint_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["long"] = sqltypes.BIGINT
TYPES_MAP["long_array"] = ARRAY(sqltypes.BIGINT)
TYPES_MAP["real"] = sqltypes.DOUBLE
TYPES_MAP["real_array"] = ARRAY(sqltypes.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sqltypes.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sqltypes.TIMESTAMP
# abc()
TYPES_MAP["bigint"] = sa.BIGINT
TYPES_MAP["bigint_array"] = sa.ARRAY(sa.BIGINT)
TYPES_MAP["long"] = sa.BIGINT
TYPES_MAP["long_array"] = sa.ARRAY(sa.BIGINT)
TYPES_MAP["real"] = sa.DOUBLE
TYPES_MAP["real_array"] = sa.ARRAY(sa.DOUBLE)
TYPES_MAP["timestamp without time zone"] = sa.TIMESTAMP
TYPES_MAP["timestamp with time zone"] = sa.TIMESTAMP

# TODO: Can `ARRAY` be inherited from PostgreSQL's
# `ARRAY`, to make type checking work?

def as_generic(self, allow_nulltype: bool = False):
return sqltypes.ARRAY
return sa.ARRAY

_ObjectArray.as_generic = as_generic

Expand All @@ -58,14 +59,14 @@ def patch_json_encoder():
TODO: Upstream to crate-python.
"""

json_encoder_default = CrateJsonEncoder.default
json_encoder_default = crate.client.http.json_encoder

def default(self, o):
if isinstance(o, Decimal):
return float(o)
return json_encoder_default(o)
def json_encoder_new(obj: Any) -> Union[int, str, float]:
if isinstance(obj, Decimal):
return float(obj)
return json_encoder_default(obj)

CrateJsonEncoder.default = default
crate.client.http.json_encoder = json_encoder_new


def polyfill_refresh_after_dml_engine(engine: sa.Engine):
Expand Down
Loading