-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8780][RFC-83] Incremental Table Service #12514
base: master
Are you sure you want to change the base?
Conversation
rfc/rfc-83/rfc-83.md
Outdated
|
||
### Work Flow for Incremental Table Service | ||
|
||
Table Service Planner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we are using completion time
, should we indicate here whether instant
refers to request time
or completion time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request time I believe. If using completion, it may miss some instant when multi-write.
Also get this getEarliestCommitToRetain can be referenced to CleanPlanner#getEarliestCommitToRetain
rfc/rfc-83/rfc-83.md
Outdated
Table Service Planner | ||
1. Retrieve the instant recorded in the last completed table service commit as **INSTANT 1**. | ||
2. Calculate the current instant to be processed as **INSTANT 2**. | ||
3. Obtain all partitions involved from **INSTANT 1** to **INSTANT 2** as incremental partitions and perform the table service plan operation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we turn on the incremental table service mode, are the various flexible partition selection mechanisms now unavailable? Consider the following scenario:
- on ts_0, write to two partitions: p_1 and p_2
- on ts_1, a schedule compaction compaction with parittion-selection-strategy that only compacts p_2
- on ts_2, write to p_2 again.
- on ts_3 compaction will only process partitions written between ts_1 and ts_3. Compaction will still only merge p_2. When can a compaction occur that compacts p_1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For common strategy, this various flexible partition selection mechanisms still works.
For IncrementalxxxxStrategy,
this flexible partition selection mechanisms will apply to incremental fetched partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also in IncrementalxxxxStrategy
maybe we can record missing partitions
in plan and Process the missing partitions together with the new fetched incremental partitions next time
rfc/rfc-83/rfc-83.md
Outdated
} | ||
``` | ||
|
||
`EarliestCommitToReta` in clean commit meta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retain
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
rfc/rfc-83/rfc-83.md
Outdated
Add `EarliestCommitToReta` in HoodieCommitMetadata extra meta MAP for clustering and compaction operation which are all written-commit | ||
|
||
```text | ||
{"name": "earliestCommitToRetain", "type": "string"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
earliestInstantToRetain or earliestCommitToRetain ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can record this earliestInstantToRetain in clustering/compaction plan request meta file. So no need this change any more.
rfc/rfc-83/rfc-83.md
Outdated
{"name": "earliestCommitToRetain", "type": "string"} | ||
``` | ||
|
||
We also need a unified interface/abstract-class to control the Plan behavior and Commit behavior of the TableService. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate why this is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use
PartitionBaseTableServicePlanStrategy
to control the behavior of getting partitions, filter partitions and generate table service plan etc.
Because we want to control the logic of partition acquisition, partition filtering, and plan generation through different strategies, in the first step, we need to use an abstraction to converge the logic of partition acquisition, partition filtering, and plan generation into the base strategy.
rfc/rfc-83/rfc-83.md
Outdated
* @return | ||
* @throws IOException | ||
*/ | ||
private List<String> getIncrementalPartitionPaths(Option<HoodieInstant> instantToRetain, TableServiceType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't each table service executor already distinguish the service type? Maybe type
can be eliminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
rfc/rfc-83/rfc-83.md
Outdated
return null; | ||
} | ||
|
||
public R buildCommitMeta() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need actually. removed.
rfc/rfc-83/rfc-83.md
Outdated
### Work Flow for Incremental Table Service | ||
|
||
Table Service Planner | ||
1. Retrieve the instant recorded in the last completed table service commit as **INSTANT 1**. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to take care of the cases where INSTANT 1
already been archived?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We record EarliestCommitToRetain
in the TableService Request metadata file and use it as the basis for retrieving incremental partitions.
Therefore, when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline.
Also we can use getAllpartitions as Cover-up plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline.
Not really, for cleaning this is true because there could be data quality issues of wrong files are cleaned, but Compaction and Clustering are just rewrites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhangyue19921010 Thanks for the contribution, let's eliminate the unnecessary refactoring first and focus on the core logic.
I kind of think we should expose the incremental partitions to the specific XXXStrategy
class, because the strategy class can decide the partition filtering itself which should be very related.
Let's clarify the behavior for archived table service commits.
Thanks for your reviewing @danny0405 @yuzhaojing and @TheR1sing3un
Totally agree, We need to implement different behaviors such as partition acquisition, partition filtering, and plan construction through different strategies, so that it is more flexible and controllable. But one of the prerequisites for doing so is the need for a unified abstraction of the above API, which is why a
Maybe we can do some changes in archive service, such as when Incremental Table Service is enabled, we should always ensure that there is a Clustering/Compaction request metadata in the active timeline Also wen can use |
rfc/rfc-83/rfc-83.md
Outdated
|
||
### Abstraction | ||
|
||
Use `PartitionBaseTableServicePlanStrategy` to control the behavior of getting partitions, filter partitions and generate table service plan etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we name it IncrementalPartitionAwareStrategy
to emphasize it is "incremental".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
rfc/rfc-83/rfc-83.md
Outdated
* Returns the earliest commit to retain from instant meta | ||
*/ | ||
public Option<HoodieInstant> getEarliestCommitToRetain() { | ||
throw new UnsupportedOperationException("Not support yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IncrementalPartitionAwareStrategy
should be an user interface IMO, the only API we expose to user is the incremental partitions since last table service. So the logic of following should be removed:
- generate plan (should be responsibility of the planner)
- getEarliestCommitToRetain (should be responsibility of the planner within the plan executor)
And because the implementaion of compaction and clustering are quite different, maybe we just add two new interfaces: IncrementalPartitionAwareCompactionStrategy
and IncrementalPartitionAwareClusteringStrategy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- generate plan and getEarliestCommitToRetain is removed.
- As for base abstraction, although the implementation of compaction and clustering are quite different, but for Partition Aware's Compaction and clustering, they both have the same partition processing logic, that is, first obtain the partition and then filter the partition, so maybe we can use one interface for both to control partition related operations. What do u think :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, Danny, what's your opinion for the logic of incremental partition acquisition?
Option1 : Record a metadata field in the commit to indicate where the last processing was done. The partition acquisition behavior under Option1 is more flexible.
Option2: Directly obtain the last completed table service commit time as the new starting point. Option2 is simpler and does not require modifying and processing commit metadata fields.
rfc/rfc-83/rfc-83.md
Outdated
* @return | ||
* @throws IOException | ||
*/ | ||
List<String> getIncrementalPartitionPaths(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just add one interface List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> allPartitionPaths, List<String> incrementalPartitionPaths);
to that the strategy can decide which partition are choosed.
The getXXXPartitionPaths
should belong to the scope of the executor/planner, let's move them out.
Hi @danny0405, as we discuss offline. All comments are addressed. PTAL. Thanks for your patience |
Change Logs
In Hudi, when scheduling Compaction and Clustering, the default behavior is to scan all partitions under the current table. When there are many historical partitions, such as 640,000 in our production environment, this scanning and planning operation becomes very inefficient. For Flink, it often leads to checkpoint timeouts, resulting in data delays.
As for cleaning, we already have the ability to do cleaning for incremental partitions.
This RFC will draw on the design of Incremental Clean to generalize the capability of processing incremental partitions to all table services, such as Clustering and Compaction.
Impact
no
Risk level (write none, low medium or high below)
low
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist