Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

insert_overwrite (insert+replace) partitions incremental strategy #201

Merged
merged 10 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,20 @@ your_profile_name:

## Model Configuration

| Option | Description | Default if any |
|------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
| engine | The table engine (type of table) to use when creating tables | `MergeTree()` |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way | |
| incremental_strategy | Incremental model update strategy of `delete+insert` or `append`. See the following Incremental Model Strategies | `default` |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |
| ttl | A TTL expression to be used with the table. The TTL expression is a string that can be used to specify the TTL for the table. | |
| Option | Description | Default if any |
|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------|
| engine | The table engine (type of table) to use when creating tables | `MergeTree()` |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | `tuple()` |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | |
| sharding_key | Sharding key determines the destination server when inserting into distributed engine table. The sharding key can be random or as an output of a hash function | `rand()`) |
| primary_key | Like order_by, a ClickHouse primary key expression. If not specified, ClickHouse will use the order by expression as the primary key | |
| unique_key | A tuple of column names that uniquely identify rows. Used with incremental models for updates. | |
| inserts_only | If set to True for an incremental model, incremental updates will be inserted directly to the target table without creating intermediate table. It has been deprecated in favor of the `append` incremental `strategy`, which operates in the same way. If `inserts_only` is set, `incremental_strategy` is ignored. | |
| incremental_strategy | Incremental model update strategy: `delete+insert`, `append`, or `insert_overwrite`. See the following Incremental Model Strategies | `default` |
| incremental_predicates | Additional conditions to be applied to the incremental materialization (only applied to `delete+insert` strategy | |
| settings | A map/dictionary of "TABLE" settings to be used to DDL statements like 'CREATE TABLE' with this model | |
| query_settings | A map/dictionary of ClickHouse user level settings to be used with `INSERT` or `DELETE` statements in conjunction with this model | |
| ttl | A TTL expression to be used with the table. The TTL expression is a string that can be used to specify the TTL for the table. | |

## ClickHouse Cluster

Expand Down Expand Up @@ -172,6 +172,20 @@ This strategy replaces the `inserts_only` setting in previous versions of dbt-cl
As a result duplicate rows are not eliminated, and there is no temporary or intermediate table. It is the fastest approach if duplicates are either permitted
in the data or excluded by the incremental query WHERE clause/filter.

### The insert_overwrite Strategy

Performs the following steps:
1. Create a staging (temporary) table with the same structure as the incremental model relation: `CREATE TABLE <staging> AS <target>`.
2. Insert only new records (produced by `SELECT`) into the staging table.
3. Replace only new partitions (present in the staging table) into the target table.

This approach has the following advantages:
- It is faster than the default strategy because it doesn't copy the entire table.
- It is safer than other strategies because it doesn't modify the original table until the INSERT operation completes successfully: in case of intermediate failure, the original table is not modified.
- It implements "partitions immutability" data engineering best practice. Which simplifies incremental and parallel data processing, rollbacks, etc.

The strategy requires `partition_by` to be set in the model configuration. Ignores all other strategies-specific parameters of the model config.

## Additional ClickHouse Macros

### Model Materialization Utility Macros
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def calculate_incremental_strategy(self, strategy: str) -> str:
if not strategy or strategy == 'default':
strategy = 'delete_insert' if conn.handle.use_lw_deletes else 'legacy'
strategy = strategy.replace('+', '_')
if strategy not in ['legacy', 'append', 'delete_insert']:
if strategy not in ['legacy', 'append', 'delete_insert', 'insert_overwrite']:
raise DbtRuntimeError(
f"The incremental strategy '{strategy}' is not valid for ClickHouse"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@
{% endcall %}
{% set need_swap = true %}

{% elif inserts_only or unique_key is none -%}
{% elif
inserts_only
or unique_key is none
and config.get('incremental_strategy', none) != 'insert_overwrite' -%}
-- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing
-- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter
-- specific configurable that is used to avoid creating an expensive intermediate table.
-- insert_overwrite strategy does not require unique_key => is an exception.
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
Expand Down Expand Up @@ -74,6 +78,15 @@
{% call statement('main') %}
{{ clickhouse__insert_into(target_relation, sql) }}
{% endcall %}
{% elif incremental_strategy == 'insert_overwrite' %}#}
{%- set partition_by = config.get('partition_by') -%}
{% if partition_by is none or partition_by|length == 0 %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %}
{% endif %}
{% if inserts_only or unique_key is not none or incremental_predicates is not none %}
{% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %}
{% endif %}
{% do clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} %}
{% endif %}
{% endif %}

Expand Down Expand Up @@ -234,3 +247,42 @@
{% do adapter.drop_relation(new_data_relation) %}
{{ drop_relation_if_exists(distributed_new_data_relation) }}
{% endmacro %}

{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %}
{% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name']
+ '__dbt_new_data_' + invocation_id.replace('-', '_')}) %}
{{ drop_relation_if_exists(new_data_relation) }}
{% call statement('create_new_data_temp') -%}
{{ get_create_table_as_sql(False, new_data_relation, sql) }}
{%- endcall %}
{% call statement('main') -%}
create table {{ intermediate_relation }} as {{ existing_relation }}
{%- endcall %}
{% call statement('insert_new_data') -%}
insert into {{ intermediate_relation }} select * from {{ new_data_relation }}
{%- endcall %}
{% if execute %}
{% set select_changed_partitions %}
SELECT DISTINCT partition_id
FROM system.parts
WHERE active
AND database = '{{ intermediate_relation.schema }}'
AND table = '{{ intermediate_relation.identifier }}'
{% endset %}
{% set changed_partitions = run_query(select_changed_partitions).rows %}
{% else %}
{% set changed_partitions = [] %}
{% endif %}
{% if changed_partitions %}
{% call statement('replace_partitions') %}
alter table {{ existing_relation }}
{%- for partition in changed_partitions %}
replace partition id '{{ partition['partition_id'] }}'
from {{ intermediate_relation }}
{{- ', ' if not loop.last }}
{%- endfor %}
{% endcall %}
{% endif %}
{% do adapter.drop_relation(intermediate_relation) %}
{% do adapter.drop_relation(new_data_relation) %}
{% endmacro %}
54 changes: 54 additions & 0 deletions tests/integration/adapter/incremental/test_base_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,57 @@ def models(self):
"incremental.sql": incremental_sql,
"schema.yml": schema_base_yml,
}


insert_overwrite_inc = """
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by=['partitionKey1', 'partitionKey2'],
order_by=['orderKey'],
)
}}
{% if not is_incremental() %}
SELECT partitionKey1, partitionKey2, orderKey, value
FROM VALUES(
'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String',
(1, 'p1', 1, 'a'), (1, 'p1', 1, 'b'), (2, 'p1', 1, 'c'), (2, 'p2', 1, 'd')
)
{% else %}
SELECT partitionKey1, partitionKey2, orderKey, value
FROM VALUES(
'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String',
(1, 'p1', 2, 'e'), (3, 'p1', 2, 'f')
)
{% endif %}
"""


class TestInsertReplaceIncremental:
@pytest.fixture(scope="class")
def models(self):
return {"insert_overwrite_inc.sql": insert_overwrite_inc}

def test_insert_overwrite_incremental(self, project):
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_inc order by partitionKey1, partitionKey2, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 1, 'a'),
(1, 'p1', 1, 'b'),
(2, 'p1', 1, 'c'),
(2, 'p2', 1, 'd'),
]
run_dbt()
result = project.run_sql(
"select * from insert_overwrite_inc order by partitionKey1, partitionKey2, orderKey",
fetch="all",
)
assert result == [
(1, 'p1', 2, 'e'),
(2, 'p1', 1, 'c'),
(2, 'p2', 1, 'd'),
(3, 'p1', 2, 'f'),
]