-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
insert_overwrite (insert+replace) partitions incremental strategy #201
insert_overwrite (insert+replace) partitions incremental strategy #201
Conversation
At the time of publishing the PR is in WIP (work in progress) state since I need an advice from the community. Thus, neither documentation nor tests are updated. |
262eaec
to
143f599
Compare
inserts_only | ||
or unique_key is none | ||
and config.get('incremental_strategy', none) != 'insert+replace' -%} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A mix of legacy (insert_only
) and new ("append"
strategy) approaches to configure the strategy introduces counter-intuitive and even conflicting set of configuration in dbt-clickhouse. To not break the current conditions check flow I have implemented this dirty way of checking, though in general I would suggest refactoring these conditions tree.
And introduce strict config consistency checks: e.g. inserts_only
must prohibit using any incremental_strategy
apart from "append"
(emit a warning in that case since it is redundant) or unspecified. Please let me know if some work here has started: reference an issue or a PR.
To narrow down the reviewed scope, I won't blend the changes into this PR, thus I suggest keeping this dirty-yet-working workaround here.
@@ -234,3 +244,36 @@ | |||
{% do adapter.drop_relation(new_data_relation) %} | |||
{{ drop_relation_if_exists(distributed_new_data_relation) }} | |||
{% endmacro %} | |||
|
|||
{% macro clickhouse__incremental_insert_replace(existing_relation, intermediate_relation, partition_by) %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this strategy should also take care about cluster setup and distributed tables.
We can insert data through Distributed table and then do replace for local table on each shard. This part will be mostly the same as in clickhouse__incremental_delete_insert
macro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe defer the distributed implementation till the next iteration? My immediate requirement was to support the strategy for single-node setups (which we are using currently with my team). We may leave a chance to others implement and test the cluster-specific version. I won't have a quick option to test it, thus I won't be able to confirm it works.
Your suggestion with system.parts
works completely fine, thanks for that! I plan to cover it with tests and document in the next couple of weeks (having a vacation next week :) ).
My team will test the approach on our real use cases to make sure it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about merging without Distributed support, because this is new functionality and mostly experimental, I think this is a question to @genzgd, as he is a maintainer.
But I believe that it should work with cluster from beginning because this is core CH functionality, and most of the production users don't use single server setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use dbt with ClickHouse without a cluster currently, so I have not had an opportunity to test it. I suggest on merging the tested version of my PR now. Once someone needs a clustered version of the strategy, they may contribute it too and test themselves.
If I add the clusters-relates logic, I cannot guarantee that it works. However, if the maintainers are ok to rely on their review, I am completely fine with adding a related code snippet.
Please let me know which code snippet I have to add, if it is required. Once again, my suggestion is to proceed with a tested version leaving a room for someone else's contribution.
{%- endcall %} | ||
{% if execute %} | ||
{% set select_changed_partitions %} | ||
select distinct {{ partition_by|join(', ') }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can do something like this to get partitions
this way we are guaranteed to select all partitions in temp relation
and and will be faster
{% set partitions = get_partitions(relation) %}
...
{% macro get_partitions(relation) %}
{% set cluster = adapter.get_clickhouse_cluster_name() %}
{% set source = 'system.parts' %}
{% if cluster is not none %}
{% set source = "cluster('{{ cluster }}', system.parts)" %}
{% set sql -%}
SELECT DISTINCT partition_id
FROM {{ source }} WHERE active AND database = '{{ relation.schema }}' AND table = '{{ relation.identifier }}'
{%- endset -%}
{{ return(run_query(sql)) }}
{% endmacro %}
Hi @simpl1g and @genzgd, I've wrapped up this PR for Some discussions above are unresolved, waiting on your pearls of wisdom there :) Please take a peek at the PR when you get a chance, and hit me up with your thoughts. Hoping we can merge the feature soon and let the good times roll with the new strategy! :) |
Hi @simpl1g and @genzgd! This is a kind reminder about my PR which is ready for your review. We have successfully battle-tested it internally ✅ We install my version from GitHub currently. It would be nice if you can approve it and release to PyPI. If any adjustments are required, please let me know! 🙏 |
@bryzgaloff I apologize that we haven't yet had the resources to fully review this PR. As you may have noticed we've been focused on bug fixes and compatibility with the new dbt releases. Please know that we very much appreciate the contribution (especially with test cases and real world usage) and your work is next on the roadmap as we get time. If you have a chance to resolve the conflicts over the next few weeks that would be appreciated and make the review just a bit easier. |
create table {{ intermediate_relation }} as {{ existing_relation }} | ||
{%- endcall %} | ||
{% call statement('insert_new_data') -%} | ||
insert into {{ intermediate_relation }} select * from {{ new_data_relation }} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bryzgaloff as you said you has a single node deployment. Here is should be {{ on_cluster_clause()}}. Otherwise error below occurs in my deployment. I'm currently working on adaptation to work on cluster.
15:04:39 :HTTPDriver for http://10.100.0.106:8123 returned response code 400)
15:04:39 Code: 36. DB::Exception: Macro 'uuid' and empty arguments of ReplicatedMergeTree are supported only for ON CLUSTER queries with Atomic database engine. (BAD_ARGUMENTS) (version 24.2.1.2248 (official build))
Also I have to configure model adding allow_nullable_key option because of otherwise another error occurs. Did you meet with that?
14:18:56 :HTTPDriver for http://10.100.0.106:8123 returned response code 400)
14:18:56 Code: 44. DB::Exception: There was an error on [10.100.0.106:9000]: Code: 44. DB::Exception: Partition key contains nullable columns, but merge tree setting `allow_nullable_key` is disabled. (ILLEGAL_COLUMN) (version 24.2.1.2248 (official build)). (ILLEGAL_COLUMN) (version 24.2.1.2248 (official build))
{{
config(
materialized = "incremental",
partition_by = "transaction_date_part",
incremental_strategy = "insert+replace",
engine = 'ReplicatedMergeTree',
order_by = ['city_id', 'transaction_date_part'],
schema = "analytics",
settings = {'allow_nullable_key': 1}
)
}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @dev-mkc19, thank you for your input! I plan to get back to working on the PR next week. For now, I will keep it without cluster support as I do not have quick-to-setup infrastructure to test it. Feel free to make your own PR adding cluster support 🤝 I may review it to ensure it does not break any of the insert+replace
semantics. Tag me as a reviewer once it is published!
@bryzgaloff After syncing I'll review this one right away. |
Hi @BentsiLeviav thank you (and the other reviewers, of course!) for your participation. I am not actively using the plugin right now, but I may get back to handling your review feedback late next week. If the conflicts are not too critical, I might be able to resolve them quickly. |
…ment insert+replace strategy TODO: convert partition_expression to ClickHouse literals
… nonempty partition_by
…ting affected partitions from system.parts This allows to avoid translation of agate.Row to ClickHouse literals.
…m.parts.partition_id -> partition partition_id is a String field with internal partition ID, it cannot be used in REPLACE PARTITION clause. "partition" field is a string representation of partition expression and can be used in a REPLACE PARTITION query as-is.
…ify partition by ID system.parts.partition field does not work for strings. ClickHouse allows to manipulate partitions referencing their IDs.
…sert+replace with tests
According to a PR review comments
8940ca9
to
616ea7a
Compare
Hi @BentsiLeviav I have rebased my contribution onto the latest |
Does this repo has automated tests? I see there are workflows to be approved. I do not have a quick infra to retest the contribution after the rebase, but if there is not testing workflow, I will perform the manual testing. |
Hi @BentsiLeviav and @genzgd — will you have a chance review and merge the PR soon? I have updated everything according to the review feedback and all the checks have passed. I would like to avoid the need to rebase it again 😅 |
Hi @bryzgaloff |
Thank you @BentsiLeviav for the approval! What are the next steps for the PR to be merged? |
Hi @bryzgaloff Before merging this, could you please add to the doc that this feature is experimental, and wasn't tested with cluster setup? Once we are done with that, I'll merge your PR. Thanks again for your work! |
Never mind, I'll take care of it :) |
Thank you for all you help and merge, @BentsiLeviav! 🤝 |
Hi! I’m really interested in contributing to this project, particularly with the cluster functionality. Before diving into development, I want to ensure I write robust tests for the support of ReplicatedMergeTree and Distributed. Here’s my plan: • Add TestInsertReplaceIncremental (adapted for Replicated) to test_base_incremental.py. Does this approach align with the project’s practices, or would you recommend any adjustments? Thanks in advance for your guidance! |
Support of insert_overwrite in cluster setup #394 here it is! |
Summary
This PR implements insert+replace strategy discussed in #128 which does the following:
Advantages:
Checklist
Delete items not relevant to your PR: