Skip to content

[Feature] Introduce Merge Engines for Primary-Key Table #212

@wuchong

Description

@wuchong

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

We propose the introduction of merge engines for primary-key tables, enhancing data management and query capabilities. This feature will encompass the following merge engines, each with distinct functionalities:

  • Version-Based Merge: This merge engine will allow data consolidation based on version numbers. It is very similar to ClickHouse ReplacingMergeTree that keeps the row with the highest version number. This is also very similar to HBase checkAndPut that updates records only when the condition is true. In this way, the out-of-order data can be guaranteed to be ultimately consistent with the upstream.

  • First-Row Based Merge: Designed to retain the first occurrence of a row with the same primary key, this engine boosts performance by minimizing writes to the kv store and generates only insert-only changelogs (some downstream jobs only accept insert-only changelogs).

  • Aggregate Based Merge: This engine will facilitate the merging of data by aggregating records, which is particularly useful for summarizing information and analytics purposes. It will support operations like sum, max, min, count, avg, among others, providing robust analytic capabilities.

Solution

Introduce table properties (configured via Flink DDL with options):

'table.merge-engine' = 'first_row|version|aggregate'
'table.merge-engine.version.column' = '<version_column>'
'table.merge-engine.aggregate.<agg_column>' =  'sum | max | min | count | avg | first_value | last_value' 

The merge-engine properties are table storage properties, and therefore can't be changed after table creation.

How to Use

Version-Based Merge

CREATE TABLE fluss_table (
  id BIGINT,
  ts BIGINT,
  data STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'fluss',
  'bootstrap.address' = '...',
  'table.merge-engine' = 'version',
  'table.merge-engine.version.column' = 'ts'  -- updates when new ts >= old ts  
);

First-Row-Based Merge

CREATE TABLE fluss_table (
  id BIGINT,
  version BIGINT,
  data STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'fluss',
  'bootstrap.address' = '...',
  'table.merge-engine' = 'first_row' -- only the first row of the primary key will be retained.
);

Aggregate-Based Merge

CREATE TABLE fluss_table (
    id BIGINT,
    price DOUBLE,
    sales BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'fluss',
    'bootstrap.address' = '...',
    'table.merge-engine' = 'aggregate',
    'table.merge-engine.aggregate.price' = 'max',
    'table.merge-engine.aggregate.sales' = 'sum'
);

Anything else?

Subtasks:

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions