From 94e9242983f45364c424c9ef4c5278a839e929d9 Mon Sep 17 00:00:00 2001 From: "f.abapolov" Date: Thu, 28 Nov 2024 13:54:14 +0300 Subject: [PATCH] 1) Support of insert_overwrite on cluster 2) Test for insert_overwrite with "incremental" and RepicatedMergeTree 3) Test for insert_overwrite with "distributed_incremental" 4) Changed naming of tests to actual 5) Refactored old test to be more determinate --- .../incremental/distributed_incremental.sql | 11 ++- .../incremental/incremental.sql | 65 ++++++++++------- .../incremental/test_base_incremental.py | 69 +++++++++++++++++-- .../test_distributed_incremental.py | 57 +++++++++++++++ 4 files changed, 170 insertions(+), 32 deletions(-) diff --git a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql index ef31a76c..0ed4dec9 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/distributed_incremental.sql @@ -65,7 +65,7 @@ {% do adapter.drop_relation(distributed_intermediate_relation) or '' %} {% set need_swap = true %} - {% elif inserts_only or unique_key is none -%} + {% elif inserts_only -%} -- There are no updates/deletes or duplicate keys are allowed. Simply add all of the new rows to the existing -- table. It is the user's responsibility to avoid duplicates. Note that "inserts_only" is a ClickHouse adapter -- specific configurable that is used to avoid creating an expensive intermediate table. @@ -91,6 +91,15 @@ {% set need_swap = true %} {% elif incremental_strategy == 'delete_insert' %} {% do clickhouse__incremental_delete_insert(existing_relation, unique_key, incremental_predicates, True) %} + {% elif incremental_strategy == 'insert_overwrite' %} + {%- set partition_by = config.get('partition_by') -%} + {% if partition_by is none or partition_by|length == 0 %} + {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %} + {% endif %} + {% if inserts_only or unique_key is not none or incremental_predicates is not none %} + {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %} + {% endif %} + {% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, True) %} {% elif incremental_strategy == 'append' %} {% call statement('main') %} {{ clickhouse__insert_into(target_relation, sql) }} diff --git a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql index c4dcb1b8..cd25e9db 100644 --- a/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql +++ b/dbt/include/clickhouse/macros/materializations/incremental/incremental.sql @@ -76,7 +76,7 @@ {% call statement('main') %} {{ clickhouse__insert_into(target_relation, sql) }} {% endcall %} - {% elif incremental_strategy == 'insert_overwrite' %}#} + {% elif incremental_strategy == 'insert_overwrite' %} {%- set partition_by = config.get('partition_by') -%} {% if partition_by is none or partition_by|length == 0 %} {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy requires nonempty partition_by. Current partition_by is ' ~ partition_by) %} @@ -84,7 +84,7 @@ {% if inserts_only or unique_key is not none or incremental_predicates is not none %} {% do exceptions.raise_compiler_error(incremental_strategy + ' strategy does not support inserts_only, unique_key, and incremental predicates.') %} {% endif %} - {% do clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} %} + {% do clickhouse__incremental_insert_overwrite(existing_relation, partition_by, False) %} {% endif %} {% endif %} @@ -246,41 +246,58 @@ {{ drop_relation_if_exists(distributed_new_data_relation) }} {% endmacro %} -{% macro clickhouse__incremental_insert_overwrite(existing_relation, intermediate_relation, partition_by) %} - {% set new_data_relation = existing_relation.incorporate(path={"identifier": model['name'] +{% macro clickhouse__incremental_insert_overwrite(existing_relation, partition_by, is_distributed=False) %} + {% set new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_new_data_' + invocation_id.replace('-', '_')}) %} {{ drop_relation_if_exists(new_data_relation) }} - {% call statement('create_new_data_temp') -%} - {{ get_create_table_as_sql(False, new_data_relation, sql) }} - {%- endcall %} - {% call statement('main') -%} - create table {{ intermediate_relation }} as {{ existing_relation }} - {%- endcall %} - {% call statement('insert_new_data') -%} - insert into {{ intermediate_relation }} select * from {{ new_data_relation }} - {%- endcall %} + {%- set distributed_new_data_relation = existing_relation.incorporate(path={"identifier": existing_relation.identifier + '__dbt_distributed_new_data'}) -%} + + + {%- set local_suffix = adapter.get_clickhouse_local_suffix() -%} + {%- set local_db_prefix = adapter.get_clickhouse_local_db_prefix() -%} + {% set existing_local = existing_relation.incorporate(path={"identifier": this.identifier + local_suffix, "schema": local_db_prefix + this.schema}) if existing_relation is not none else none %} + + {% if is_distributed %} + {{ create_distributed_local_table(distributed_new_data_relation, new_data_relation, existing_relation, sql) }} + {% else %} + {% call statement('main') %} + {{ get_create_table_as_sql(False, new_data_relation, sql) }} + {% endcall %} + {% endif %} + + {# Get the parts from the cluster table, since the partitions between shards may not overlap due to distribution #} {% if execute %} {% set select_changed_partitions %} SELECT DISTINCT partition_id - FROM system.parts + {% if is_distributed %} + FROM cluster({{ adapter.get_clickhouse_cluster_name() }}, system.parts) + {% else %} + FROM system.parts + {% endif %} WHERE active - AND database = '{{ intermediate_relation.schema }}' - AND table = '{{ intermediate_relation.identifier }}' + AND database = '{{ new_data_relation.schema }}' + AND table = '{{ new_data_relation.identifier }}' {% endset %} {% set changed_partitions = run_query(select_changed_partitions).rows %} {% else %} {% set changed_partitions = [] %} {% endif %} + {% if changed_partitions %} - {% call statement('replace_partitions') %} - alter table {{ existing_relation }} - {%- for partition in changed_partitions %} - replace partition id '{{ partition['partition_id'] }}' - from {{ intermediate_relation }} - {{- ', ' if not loop.last }} - {%- endfor %} + {% call statement('replace_partitions') %} + {% if is_distributed %} + alter table {{ existing_local }} {{ on_cluster_clause(existing_relation) }} + {% else %} + alter table {{ existing_relation }} + {% endif %} + {%- for partition in changed_partitions %} + replace partition id '{{ partition['partition_id'] }}' + from {{ new_data_relation }} + {{- ', ' if not loop.last }} + {%- endfor %} {% endcall %} {% endif %} - {% do adapter.drop_relation(intermediate_relation) %} + + {% do adapter.drop_relation(distributed_new_data_relation) %} {% do adapter.drop_relation(new_data_relation) %} {% endmacro %} diff --git a/tests/integration/adapter/incremental/test_base_incremental.py b/tests/integration/adapter/incremental/test_base_incremental.py index 0c522df7..a31cbb05 100644 --- a/tests/integration/adapter/incremental/test_base_incremental.py +++ b/tests/integration/adapter/incremental/test_base_incremental.py @@ -195,7 +195,7 @@ def models(self): SELECT partitionKey1, partitionKey2, orderKey, value FROM VALUES( 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', - (1, 'p1', 1, 'a'), (1, 'p1', 1, 'b'), (2, 'p1', 1, 'c'), (2, 'p2', 1, 'd') + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') ) {% else %} SELECT partitionKey1, partitionKey2, orderKey, value @@ -207,7 +207,7 @@ def models(self): """ -class TestInsertReplaceIncremental: +class TestInsertOverwriteIncremental: @pytest.fixture(scope="class") def models(self): return {"insert_overwrite_inc.sql": insert_overwrite_inc} @@ -220,9 +220,9 @@ def test_insert_overwrite_incremental(self, project): ) assert result == [ (1, 'p1', 1, 'a'), - (1, 'p1', 1, 'b'), - (2, 'p1', 1, 'c'), - (2, 'p2', 1, 'd'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), ] run_dbt() result = project.run_sql( @@ -231,7 +231,62 @@ def test_insert_overwrite_incremental(self, project): ) assert result == [ (1, 'p1', 2, 'e'), - (2, 'p1', 1, 'c'), - (2, 'p2', 1, 'd'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + (3, 'p1', 2, 'f'), + ] + +# "ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/{table}/{uuid}/', '{replica}')" +insert_overwrite_replicated_inc = """ +{{ config( + materialized='incremental', + incremental_strategy='insert_overwrite', + partition_by=['partitionKey1', 'partitionKey2'], + order_by=['orderKey'], + engine="ReplicatedMergeTree('/clickhouse/tables/{uuid}/one_shard', '{server_index}')" + ) +}} +{% if not is_incremental() %} + SELECT partitionKey1, partitionKey2, orderKey, value + FROM VALUES( + 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') + ) +{% else %} + SELECT partitionKey1, partitionKey2, orderKey, value + FROM VALUES( + 'partitionKey1 UInt8, partitionKey2 String, orderKey UInt8, value String', + (1, 'p1', 2, 'e'), (3, 'p1', 2, 'f') + ) +{% endif %} +""" + + +class TestInsertOverwriteReplicatedIncremental: + @pytest.fixture(scope="class") + def models(self): + return {"insert_overwrite_replicated_inc.sql": insert_overwrite_replicated_inc} + + def test_insert_overwrite_replicated_incremental(self, project): + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 1, 'a'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + ] + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_replicated_inc order by partitionKey1, partitionKey2, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 2, 'e'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), (3, 'p1', 2, 'f'), ] diff --git a/tests/integration/adapter/incremental/test_distributed_incremental.py b/tests/integration/adapter/incremental/test_distributed_incremental.py index f132933d..e704dc4a 100644 --- a/tests/integration/adapter/incremental/test_distributed_incremental.py +++ b/tests/integration/adapter/incremental/test_distributed_incremental.py @@ -203,3 +203,60 @@ def models(self): ) def test_incremental_not_schema_change(self, project): super().test_incremental_not_schema_change(project) + + +insert_overwrite_dist_inc = """ +{{ config( + materialized='distributed_incremental', + incremental_strategy='insert_overwrite', + partition_by=['partitionKey'], + order_by=['orderKey'], + sharding_key='shardingKey' + ) +}} +{% if not is_incremental() %} + SELECT shardingKey, partitionKey, orderKey, value + FROM VALUES( + 'shardingKey UInt8, partitionKey String, orderKey UInt8, value String', + (1, 'p1', 1, 'a'), (1, 'p1', 2, 'b'), (2, 'p1', 3, 'c'), (2, 'p2', 4, 'd') + ) +{% else %} + SELECT shardingKey, partitionKey, orderKey, value + FROM VALUES( + 'shardingKey UInt8, partitionKey String, orderKey UInt8, value String', + (1, 'p1', 2, 'e'), (3, 'p1', 2, 'f') + ) +{% endif %} +""" + + +class TestInsertOverwriteDistributedIncremental: + @pytest.fixture(scope="class") + def models(self): + return {"insert_overwrite_dist_inc.sql": insert_overwrite_dist_inc} + + @pytest.mark.skipif( + os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + ) + def test_insert_overwrite_distributed_incremental(self, project): + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 1, 'a'), + (1, 'p1', 2, 'b'), + (2, 'p1', 3, 'c'), + (2, 'p2', 4, 'd'), + ] + run_dbt() + result = project.run_sql( + "select * from insert_overwrite_dist_inc order by shardingKey, partitionKey, orderKey", + fetch="all", + ) + assert result == [ + (1, 'p1', 2, 'e'), + (2, 'p2', 4, 'd'), + (3, 'p1', 2, 'f'), + ]