Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chDB as a driver #369

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,5 @@ dbt-tut
dev/
.python-version
*_project/
.user.yml
chdb_state
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### Release [1.8.5], 2024-11-22
### Improvement
* Added an adapter for chDB in dbt-clickhouse [#369](https://github.com/ClickHouse/dbt-clickhouse/pull/369) and updated documentation for the new feature.

### Release [1.8.4], 2024-09-17
### Improvement
* The S3 help macro now support a `role_arn` parameter as an alternative way to provide authentication for S3 based models. Thanks to
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ your_profile_name:
schema: [default] # ClickHouse database for dbt models

# optional
driver: [http] # http or native. If not set this will be autodetermined based on port setting
driver: [http] # http, native or chdb. If not set this will be autodetermined based on port setting
host: [localhost]
port: [8123] # If not set, defaults to 8123, 8443, 9000, 9440 depending on the secure and driver settings
user: [default] # User for all database operations
Expand All @@ -92,6 +92,10 @@ your_profile_name:
# Native (clickhouse-driver) connection settings
sync_request_timeout: [5] # Timeout for server ping
compress_block_size: [1048576] # Compression block size if compression is enabled

# chDB (clikchouse-driver) path settings
chdb_state_dir: [<empty string>] # The path where chdb will leave it's states
chdb_dump_dir: [<empty string>] # The path where dbt-clickhouse will find the dumps to initialise chdb

```

Expand Down
181 changes: 181 additions & 0 deletions dbt/adapters/clickhouse/chdbclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import json
from pathlib import Path
from typing import List

import pkg_resources
from chdb import ChdbError, session
from chdb.dbapi import converters
from dbt_common.exceptions import DbtDatabaseError

from dbt.adapters.clickhouse import ClickHouseColumn, ClickHouseCredentials
from dbt.adapters.clickhouse.dbclient import ChClientWrapper
from dbt.adapters.clickhouse.logger import logger

try:
driver_version = pkg_resources.get_distribution("chdb").version
except pkg_resources.ResolutionError:
driver_version = "unknown"


class ChDBClient(ChClientWrapper):
def query(self, sql, **kwargs):
# TODO: we might need to preprocess `sql`
try:
result = self._client.query(sql, "JSON", **kwargs)
result = CHDBResult(result=result)
result.read()
return result
except CHDBResultError as ex:
raise DbtDatabaseError(
f"reading result from chdb query using json failed: {str(ex).strip()}"
) from ex
except ChdbError as ex:
raise DbtDatabaseError(f"chdb query failed with exception: {str(ex).strip()}") from ex
except Exception as ex:
raise DbtDatabaseError(str(ex).strip()) from ex

def command(self, sql, **kwargs):
try:
result = self._client.query(sql, **kwargs)
if result.has_error():
raise DbtDatabaseError(str(result.error_message.strip()))
elif result.size() == 0:
return True
else:
result = int(result.data())
return result
except Exception as ex:
raise DbtDatabaseError(f"chdb command failed with exception: {str(ex).strip()}") from ex

def columns_in_query(self, sql: str, **kwargs) -> List[ClickHouseColumn]:
try:
query_result = self._client.query(
f"SELECT * FROM ( \n" f"{sql} \n" f") LIMIT 0",
**kwargs,
)
return [
ClickHouseColumn.create(name, ch_type.name)
for name, ch_type in zip(query_result.column_names, query_result.column_types)
]
except ChdbError as ex:
raise DbtDatabaseError(
f"chdb columns_in_query failed with exception: {str(ex).strip()}"
) from ex
except Exception as ex:
raise DbtDatabaseError(str(ex).strip()) from ex

def get_ch_setting(self, setting_name):
try:
result = self._client.query(
f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'",
"JSON",
)
if result.has_error():
raise DbtDatabaseError(str(result.error_message.strip()))
else:
result = json.loads(result.data())
result = result["data"][0]
return (result["value"], int(result["readonly"])) if result else (None, 0)
except Exception as ex:
logger.warning("Unexpected error retrieving ClickHouse server setting", ex)
return None

def close(self):
pass
# self._client.cleanup()

def _create_client(self, credentials: ClickHouseCredentials):
# We want to append the path below to target_dir to have relative paths implementation in the configuration
chdb_state_dir = Path(credentials.chdb_state_dir)

if not chdb_state_dir.exists():
logger.debug(f"Provided chdb_state_dir doesn't exist: {chdb_state_dir}")
chdb_state_dir.mkdir(parents=True, exist_ok=True)

session_dir = chdb_state_dir / f"{self._conn_settings['session_id']}"
logger.info(f"Provided session_dir: {session_dir}")
client = session.Session(path=session_dir.as_posix())

# We want to append the path below to target_dir to have relative paths implementation in the configuration
chdb_dump_dir = Path.cwd() / credentials.chdb_dump_dir
chdb_dump_files = list(chdb_dump_dir.glob("**/*.sql"))
if len(chdb_dump_files) == 0:
logger.warning(f"Provided chdb_dump_files is empty: {chdb_dump_files}")
return

for chdb_dump_file in chdb_dump_files:
sql_content = chdb_dump_file.read_text()
try:
client.query(sql_content)
except ChdbError as ex:
raise DbtDatabaseError(
f"client creation failed with exception: {str(ex).strip()}"
) from ex
return client

def _set_client_database(self):
pass

def _server_version(self):
return self._client.query("select version()").data().strip().replace('"', "")


class CHDBResultError(Exception):
pass


# TODO: This is from https://github.com/chdb-io/chdb/blob/e326128df44248b187b4f421bf6a5c796791b2dc/chdb/dbapi/connections.py#L175C1-L217C70
# We might want to use the dbApi instead
class CHDBResult:
def __init__(self, result):
"""
:type connection: Connection
"""
self.result = result
self.affected_rows = 0
self.insert_id = None
self.warning_count = 0
self.message = None
self.field_count = 0
self.description = None
self.rows = None
self.has_next = None
self.result_set = None
self.column_names = None

def read(self):
# Handle empty responses (for instance from CREATE TABLE)
if self.result is None:
return

if self.result.has_error():
raise CHDBResultError(str(self.result.error_message.strip()))

try:
data = json.loads(self.result.data())
except Exception as error:
raise CHDBResultError("Unexpected error when loading query result in JSON") from error

try:
self.field_count = len(data["meta"])
description = []
column_names = []
for meta in data["meta"]:
fields = [meta["name"], meta["type"]]
column_names.append(meta["name"])
description.append(tuple(fields))
self.description = tuple(description)
self.column_names = column_names
rows = []
for line in data["data"]:
row = []
for i in range(self.field_count):
column_data = converters.convert_column_data(
self.description[i][1], line[self.description[i][0]]
)
row.append(column_data)
rows.append(tuple(row))
self.rows = tuple(rows)
self.result_set = tuple(rows)
except Exception as error:
raise CHDBResultError("Read return data err") from error
2 changes: 2 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ClickHouseCredentials(Credentials):
local_db_prefix: str = ''
allow_automatic_deduplication: bool = False
tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False
chdb_state_dir: str = ""
chdb_dump_dir: str = ""

@property
def type(self):
Expand Down
18 changes: 16 additions & 2 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def get_db_client(credentials: ClickHouseCredentials):
elif driver == 'native':
if not port:
port = 9440 if credentials.secure else 9000
elif driver == "chdb":
logger.debug(f"using chdb driver with {credentials}")
else:
raise FailedToConnectError(f'Unrecognized ClickHouse driver {driver}')
raise FailedToConnectError(f"Unrecognized ClickHouse driver {driver}")

credentials.driver = driver
credentials.port = port
Expand All @@ -56,8 +58,20 @@ def get_db_client(credentials: ClickHouseCredentials):
return ChNativeClient(credentials)
except ImportError as ex:
raise FailedToConnectError(
'Native adapter required but package clickhouse-driver is not installed'
"Native adapter required but package clickhouse-driver is not installed"
) from ex
elif driver == "chdb":
try:
import chdb

from dbt.adapters.clickhouse.chdbclient import ChDBClient

return ChDBClient(credentials)
except ImportError as ex:
raise FailedToConnectError(
"chDB adapter required but package chdb is not installed"
) from ex

try:
import clickhouse_connect # noqa

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/logger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger('dbt_clickhouse')
logger = AdapterLogger("dbt_clickhouse")
8 changes: 4 additions & 4 deletions dbt/adapters/clickhouse/nativeclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
from dbt.adapters.clickhouse.logger import logger

try:
driver_version = pkg_resources.get_distribution('clickhouse-driver').version
driver_version = pkg_resources.get_distribution("clickhouse-driver").version
except pkg_resources.ResolutionError:
driver_version = 'unknown'
driver_version = "unknown"


class ChNativeClient(ChClientWrapper):
Expand Down Expand Up @@ -48,7 +48,7 @@ def get_ch_setting(self, setting_name):
f"SELECT value, readonly FROM system.settings WHERE name = '{setting_name}'"
)
except clickhouse_driver.errors.Error as ex:
logger.warn('Unexpected error retrieving ClickHouse server setting', ex)
logger.warning("Unexpected error retrieving ClickHouse server setting", ex)
return None
return (result[0][0], result[0][1]) if result else (None, 0)

Expand Down Expand Up @@ -88,7 +88,7 @@ def _set_client_database(self):
def _server_version(self):
server_info = self._client.connection.server_info
return (
f'{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}'
f"{server_info.version_major}.{server_info.version_minor}.{server_info.version_patch}"
)


Expand Down
6 changes: 6 additions & 0 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.utils import deep_merge

from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier

NODE_TYPE_SOURCE = 'source'
Expand Down Expand Up @@ -127,6 +128,11 @@ def create_from(
relation_config.config.get('engine') if relation_config.config.get('engine') else ''
)
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)
if quoting.credentials.driver == "chdb":
logger.debug("Driver is chDB, forcing engine to be MergeTree")
engine = "MergeTree"
relation_config.config.engine = engine
can_on_cluster = False

return cls.create(
database='',
Expand Down
15 changes: 7 additions & 8 deletions examples/taxis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ CREATE TABLE taxis.trips (
)
ENGINE = MergeTree
ORDER BY trip_id;

SET input_format_skip_unknown_fields = 1;

INSERT INTO taxis.trips
Expand Down Expand Up @@ -62,26 +62,25 @@ FROM s3(

## Create a dbt profile entry

Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt
Use the following profile to create the associated dbt profile in the dbt_profiles.yml in ~/.dbt

```yml
taxis:
outputs:

dev:
type: clickhouse
threads: 4
threads: 4
host: localhost
port: 8123
port: 8123
user: dbt_test
password: dbt_password
use_lw_deletes: true
schema: taxis_dbt
schema: taxis_dbt

target: dev

```

## Run the model

`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding
`dbt run` in this directory should execute the model. Each run will create a somewhat larger dataset (by adding
additional random trip_ids).
23 changes: 23 additions & 0 deletions examples/taxis/dump/taxi.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE DATABASE taxis;

CREATE TABLE taxis.trips (
trip_id UInt32,
pickup_datetime DateTime,
dropoff_datetime DateTime,
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count UInt8,
trip_distance Float32,
fare_amount Float32,
extra Float32,
tip_amount Float32,
tolls_amount Float32,
total_amount Float32,
payment_type LowCardinality(String),
pickup_ntaname LowCardinality(String),
dropoff_ntaname LowCardinality(String)
)
ENGINE = MergeTree
ORDER BY trip_id;
Loading