Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support of insert_overwrite in cluster setup #394

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
{% do adapter.drop_relation(distributed_intermediate_relation) or '' %}
{% set need_swap = true %}

{% elif inserts_only or unique_key is none -%}
{% elif inserts_only -%}
-- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing
-- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter
-- specific configurable that is used to avoid creating an expensive intermediate table.
Expand All @@ -91,6 +91,15 @@
{% set need_swap = true %}
{% elif incremental_strategy == 'delete_insert' %}
{% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %}
{% elif incremental_strategy == 'insert_overwrite' %}
{%- set partition_by = config.get('partition_by') -%}
{% if partition_by is none or partition_by|length == 0 %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %}
{% endif %}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %}
{% elif incremental_strategy == 'append' %}
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
{% elif incremental_strategy == 'insert_overwrite' %}#}
{% elif incremental_strategy == 'insert_overwrite' %}
{%- set partition_by = config.get('partition_by') -%}
{% if partition_by is none or partition_by|length == 0 %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %}
{% endif %}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, False) %}
{% endif %}
{% endif %}

Expand Down Expand Up @@ -246,41 +246,58 @@
{{ drop_relation_if_exists(distributed_new_data_relation) }}
{% endmacro %}

{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
{% macro clickhouse__incremental_insert_overwrite(existing_relation, partition_by, is_distributed=False) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{{ drop_relation_if_exists(new_data_relation) }}
{% call statement('create_new_data_temp') -%}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{%- endcall %}
{% call statement('main') -%}
create table {{ intermediate_relation }} as {{ existing_relation }}
{%- endcall %}
{% call statement('insert_new_data') -%}
insert into {{ intermediate_relation }} select * from {{ new_data_relation }}
{%- endcall %}
{%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%}


{%- set local_suffix = adapter.get_clickhouse_local_suffix() -%}
{%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%}
{% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %}

{% if is_distributed %}
{{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }}
{% else %}
{% call statement('main') %}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{% endcall %}
{% endif %}

{# Get the parts from the cluster table, since the partitions between shards may not overlap due to distribution #}
{% if execute %}
{% set select_changed_partitions %}
SELECT DISTINCT partition_id
FROM system.parts
{% if is_distributed %}
FROM cluster({{ adapter.get_clickhouse_cluster_name() }}, system.parts)
{% else %}
FROM system.parts
{% endif %}
WHERE active
AND database = '{{ intermediate_relation.schema }}'
AND table = '{{ intermediate_relation.identifier }}'
AND database = '{{ new_data_relation.schema }}'
AND table = '{{ new_data_relation.identifier }}'
{% endset %}
{% set changed_partitions = run_query(select_changed_partitions).rows %}
{% else %}
{% set changed_partitions = [] %}
{% endif %}

{% if changed_partitions %}
{% call statement('replace_partitions') %}
alter table {{ existing_relation }}
{%- for partition in changed_partitions %}
replace partition id '{{ partition['partition_id'] }}'
from {{ intermediate_relation }}
{{- ', ' if not loop.last }}
{%- endfor %}
{% call statement('replace_partitions') %}
{% if is_distributed %}
alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }}
{% else %}
alter table {{ existing_relation }}
{% endif %}
{%- for partition in changed_partitions %}
replace partition id '{{ partition['partition_id'] }}'
from {{ new_data_relation }}
{{- ', ' if not loop.last }}
{%- endfor %}
{% endcall %}
{% endif %}
{% do adapter.drop_relation(intermediate_relation) %}

{% do adapter.drop_relation(distributed_new_data_relation) %}
{% do adapter.drop_relation(new_data_relation) %}
{% endmacro %}
69 changes: 62 additions & 7 deletions tests/integration/adapter/incremental/test_base_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def models(self):
SELECT partitionKey1, partitionKey2, orderKey, value
FROM VALUES(
'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String',
(1, 'p1', 1, 'a'), (1, 'p1', 1, 'b'), (2, 'p1', 1, 'c'), (2, 'p2', 1, 'd')
(1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd')
)
{% else %}
SELECT partitionKey1, partitionKey2, orderKey, value
Expand All @@ -207,7 +207,7 @@ def models(self):
"""


class TestInsertReplaceIncremental:
class TestInsertOverwriteIncremental:
@pytest.fixture(scope="class")
def models(self):
return {"insert_overwrite_inc.sql": insert_overwrite_inc}
Expand All @@ -220,9 +220,9 @@ def test_insert_overwrite_incremental(self, project):
)
assert result == [
(1, 'p1', 1, 'a'),
(1, 'p1', 1, 'b'),
(2, 'p1', 1, 'c'),
(2, 'p2', 1, 'd'),
(1, 'p1', 2, 'b'),
(2, 'p1', 3, 'c'),
(2, 'p2', 4, 'd'),
]
run_dbt()
result = project.run_sql(
Expand All @@ -231,7 +231,62 @@ def test_insert_overwrite_incremental(self, project):
)
assert result == [
(1, 'p1', 2, 'e'),
(2, 'p1', 1, 'c'),
(2, 'p2', 1, 'd'),
(2, 'p1', 3, 'c'),
(2, 'p2', 4, 'd'),
(3, 'p1', 2, 'f'),
]

# "ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}/{uuid}/', '{replica}')"
insert_overwrite_replicated_inc = """
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by=['partitionKey1', 'partitionKey2'],
order_by=['orderKey'],
engine="ReplicatedMergeTree('/clickhouse/tables/{uuid}/one_shard', '{server_index}')"
)
}}
{% if not is_incremental() %}
SELECT partitionKey1, partitionKey2, orderKey, value
FROM VALUES(
'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String',
(1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd')
)
{% else %}
SELECT partitionKey1, partitionKey2, orderKey, value
FROM VALUES(
'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String',
(1, 'p1', 2, 'e'), (3, 'p1', 2, 'f')
)
{% endif %}
"""


class TestInsertOverwriteReplicatedIncremental:
@pytest.fixture(scope="class")
def models(self):
return {"insert_overwrite_replicated_inc.sql": insert_overwrite_replicated_inc}

def test_insert_overwrite_replicated_incremental(self, project):
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 1, 'a'),
(1, 'p1', 2, 'b'),
(2, 'p1', 3, 'c'),
(2, 'p2', 4, 'd'),
]
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 2, 'e'),
(2, 'p1', 3, 'c'),
(2, 'p2', 4, 'd'),
(3, 'p1', 2, 'f'),
]
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,60 @@ def models(self):
)
def test_incremental_not_schema_change(self, project):
super().test_incremental_not_schema_change(project)


insert_overwrite_dist_inc = """
{{ config(
materialized='distributed_incremental',
incremental_strategy='insert_overwrite',
partition_by=['partitionKey'],
order_by=['orderKey'],
sharding_key='shardingKey'
)
}}
{% if not is_incremental() %}
SELECT shardingKey, partitionKey, orderKey, value
FROM VALUES(
'shardingKey UInt8, partitionKey String, orderKey UInt8, value String',
(1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd')
)
{% else %}
SELECT shardingKey, partitionKey, orderKey, value
FROM VALUES(
'shardingKey UInt8, partitionKey String, orderKey UInt8, value String',
(1, 'p1', 2, 'e'), (3, 'p1', 2, 'f')
)
{% endif %}
"""


class TestInsertOverwriteDistributedIncremental:
@pytest.fixture(scope="class")
def models(self):
return {"insert_overwrite_dist_inc.sql": insert_overwrite_dist_inc}

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_insert_overwrite_distributed_incremental(self, project):
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 1, 'a'),
(1, 'p1', 2, 'b'),
(2, 'p1', 3, 'c'),
(2, 'p2', 4, 'd'),
]
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 2, 'e'),
(2, 'p2', 4, 'd'),
(3, 'p1', 2, 'f'),
]