Skip to content

Commit

Permalink
dbt/include/clickhouse/macros/materializations/incremental.sql: imple…
Browse files Browse the repository at this point in the history
…ment insert+replace strategy

TODO: convert partition_expression to ClickHouse literals
  • Loading branch information
Anton Bryzgalov committed Oct 26, 2023
1 parent b79669a commit 262eaec
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 2 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
if not strategy or strategy == 'default':
strategy = 'delete_insert' if conn.handle.use_lw_deletes else 'legacy'
strategy = strategy.replace('+', '_')
if strategy not in ['legacy', 'append', 'delete_insert']:
if strategy not in ['legacy', 'append', 'delete_insert', 'insert_replace']:
raise DbtRuntimeError(
f"The incremental strategy '{strategy}' is not valid for ClickHouse"
)
Expand Down
45 changes: 44 additions & 1 deletion dbt/include/clickhouse/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@
{% endcall %}
{% set need_swap = true %}

{% elif inserts_only or unique_key is none -%}
{% elif
inserts_only
or unique_key is none
and config.get('incremental_strategy', none) != 'insert+replace' -%}
-- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing
-- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter
-- specific configurable that is used to avoid creating an expensive intermediate table.
-- Insert+replace strategy does not require unique_key => is an exception.
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
Expand Down Expand Up @@ -72,6 +76,12 @@
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
{% elif incremental_strategy == 'insert_replace' %}#}
{%- set partition_by = config.get('partition_by') -%}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_replace(existing_relation, intermediate_relation, partition_by) %} %}
{% endif %}
{% endif %}

Expand Down Expand Up @@ -234,3 +244,36 @@
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
{% endmacro %}

{% macro clickhouse__incremental_insert_replace(existing_relation, intermediate_relation, partition_by) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{{ drop_relation_if_exists(new_data_relation) }}
{% call statement('create_new_data_temp') -%}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{%- endcall %}
{% call statement('main') -%}
create table {{ intermediate_relation }} as {{ existing_relation }}
{%- endcall %}
{% call statement('insert_new_data') -%}
insert into {{ intermediate_relation }} select * from {{ new_data_relation }}
{%- endcall %}
{% if execute %}
{% set select_changed_partitions %}
select distinct {{ partition_by|join(', ') }}
from {{ intermediate_relation }}
{% endset %}
{% set partitions_expressions = run_query(select_changed_partitions).rows %}
{% else %}
{% set partitions_expressions = [] %}
{% endif %}
{% for partition_expression in partitions_expressions %}
{% call statement('replace_partition_' ~ loop.index) %}
alter table {{ existing_relation }}
replace partition {{ partition_expression }}
from {{ existing_relation }}
{% endcall %}
{% endfor %}
{% do adapter.drop_relation(intermediate_relation) %}
{% do adapter.drop_relation(new_data_relation) %}
{% endmacro %}

0 comments on commit 262eaec

Please sign in to comment.