-
Notifications
You must be signed in to change notification settings - Fork 422
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
We propose the introduction of merge engines for primary-key tables, enhancing data management and query capabilities. This feature will encompass the following merge engines, each with distinct functionalities:
-
Version-Based Merge: This merge engine will allow data consolidation based on version numbers. It is very similar to ClickHouse ReplacingMergeTree that keeps the row with the highest version number. This is also very similar to HBase checkAndPut that updates records only when the condition is true. In this way, the out-of-order data can be guaranteed to be ultimately consistent with the upstream.
-
First-Row Based Merge: Designed to retain the first occurrence of a row with the same primary key, this engine boosts performance by minimizing writes to the kv store and generates only insert-only changelogs (some downstream jobs only accept insert-only changelogs).
-
Aggregate Based Merge: This engine will facilitate the merging of data by aggregating records, which is particularly useful for summarizing information and analytics purposes. It will support operations like
sum,max,min,count,avg, among others, providing robust analytic capabilities.
Solution
Introduce table properties (configured via Flink DDL with options):
'table.merge-engine' = 'first_row|version|aggregate'
'table.merge-engine.version.column' = '<version_column>'
'table.merge-engine.aggregate.<agg_column>' = 'sum | max | min | count | avg | first_value | last_value' The merge-engine properties are table storage properties, and therefore can't be changed after table creation.
How to Use
Version-Based Merge
CREATE TABLE fluss_table (
id BIGINT,
ts BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'fluss',
'bootstrap.address' = '...',
'table.merge-engine' = 'version',
'table.merge-engine.version.column' = 'ts' -- updates when new ts >= old ts
);First-Row-Based Merge
CREATE TABLE fluss_table (
id BIGINT,
version BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'fluss',
'bootstrap.address' = '...',
'table.merge-engine' = 'first_row' -- only the first row of the primary key will be retained.
);Aggregate-Based Merge
CREATE TABLE fluss_table (
id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'fluss',
'bootstrap.address' = '...',
'table.merge-engine' = 'aggregate',
'table.merge-engine.aggregate.price' = 'max',
'table.merge-engine.aggregate.sales' = 'sum'
);Anything else?
Subtasks:
- [Feature] Introduce first-row merge engine for primary key table #133
- [Feature] Introduce version merge engine for primary key table #213
- Introduce aggregate merge engine for primary key table
Willingness to contribute
- I'm willing to submit a PR!