Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC-79] Improving reliability of concurrent table service executions and rollbacks #11555

Draft
wants to merge 69 commits into
base: master
Choose a base branch
from

Conversation

kbuci
Copy link
Contributor

@kbuci kbuci commented Jul 3, 2024

Change Logs

Describe context and summary for this change. Highlight if any code was copied.

Impact

Describe any public API or user-facing feature change or any performance impact.

Risk level (write none, low medium or high below)

If medium or high, explain what verification was done to mitigate the risks.

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Jul 3, 2024


## Abstract
In order to improve latency/throughput of writes into a HUDI dataset, HUDI does not require that table service operations (such as clustering and compaction) be serially and sequentially performed before/after an ingestion write. Instead, using HUDI multiwriter, a user can orchesterate seperate writers to potentially execute table service plans concurrently to an ingestion writers. This setup though may face reliability issues for clustering and compaction, as failed executions and rollbacks may cause dataset corruptions or table services plans to be prematurely aborted. This RFC proposes to address these limitations by using HUDI's heartbeating and transaction manager to update the behavior for clustering, compaction, and rollback of failed writes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setup though may face reliability issues for clustering and compaction, as failed executions and rollbacks may cause dataset corruptions or table services plans to be prematurely aborted. This RFC proposes to address these limitations by using HUDI's heartbeating and transaction manager to update the behavior for clustering, compaction, and rollback of failed writes.

guess this is not conveyed properly.

lets reword it as below

In general, lock providers are required to run such concurrent writes to the same Hudi table. In a very large organization with 1000s of Hudi pipelines, such async table services might have to be orchestrated using multiple workers and not just one worker. To support these use-cases, we might need some enhacements and guard rails within Hudi to ensure reliable and smooth table services orchestration and execution.

## Background
The table service operations compact, logcompact, and cluster, have the following multiwriter issues when writers execute or rollback these table service plans (note that “removable-plan” is defined as a table service plan that is configured such that when it is rolled back, its plan is deleted from the timeline and cannot be re-executed - only clustering and logcompaction are expected to support this option)

### Concurrent writers can execute table service plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to set context on mutable and non-mutable plans.
also wrt mutable plans, we need a good write up since we don't have that support in any officially released version yet.

for eg, we need more info about who can mutate the plans. and if yes, how does a concurrent writer could behave while it is trying to execute the same etc. or it can only work w/ single writer scenarios and things like that need to be called out upfront so that further analyzing this RFC might be more meaningful w/ full context.

1. Get the table service plan P (as usual)
2. Start a transaction
3. Check if the instant time P has an active heartbeat, if so abort transaction and throw an exception
4. If P is a removable-plan, check if (a) P is an inflight instant or (b) there is a rollback plan on active timeline that targets P. If either is the case, then hrow an exception and abort the transaction. This is since this instant cannot be executed again but must be rolled back.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If P is not a removable plan? where have you covered this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removable/non-reovable. or mutable or non-mutable plans. lets ensure we use the same standard terminology throughout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still don't get why the plan needs to be removable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of think the hearbeat check should be done for any table sercive plan (no matter whether it is removable or not), and after the check, if the plan execution is allowed, we should put a heartbear immediately in case to unlock the other table service execution jobs.


After a writer schedules a remove-plan table service (such as logcompaction), another writer performing clean can rollback the plan before it has a chance to be executed, deleting the plan from the timeline. This will prevent the table service from ever being executed, and if the table service execution to be async (being deferred for execution later by another writer) then the chance of this happening increases.

## Implementation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a design section calling out how we plan to get this done. Some of your implementation is actually part of design

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a writer schedules a remove-plan table service (such as logcompaction),

So from high-level, why we need the table sercive plan to be removable?Correctness or performance issue?


- The reason (1) needs to be within a transaciton is in order to handle the case where a concurrent writer executing a table service plan might start the heartbeat at the same time

## Test Plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need a RFC for mutable clustering plans in general. I am sure lot of commmunity users might be interested in it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you give context on how rollback for 3 diff table services happen
ie. compaction, clustering and log compaction.

- The check in (3) needs to be within a transaction in order to ensure at most once writer has an active heartbeat against P at any given time. If this was done outside a transaction, then concurrent writers may each check that no heartbeat exists, and then start one at the same time.
- It is possible for a removable-plan P to be in a requested state, and past writer may have scheduled a rollback for P but failed before completing it. In order to handle this case (4) i needed
### Changes to clean's rollback of failed writers
The clean logic for rolling back failed writes will be changed such that a table lock will be acquired while iterating through all existing inflight and pending rollback plans and scheduling new rollback plans. If a pending instant of a non removable-plan table service is encountered, the instant will be skipped (and will not have a rollback scheduled). In addition if the plan is not at least `table_service_rollback_delay` minutes old or has an active heartbeat, it will also be skipped. Otherwise this table service plan is neither a will also be marked for rollback, the same way that ingestion inflights are targeted for rollback, since it is neither a recent removable-plan nor a non removable-plan instant. Once all required rollback plans have been scheduled, the table lock will be released, and any/all scheduled rollback plans will be executed (this means that executions of the rollback plans won’t be under a lock). As a result of this change, the logic for rolling back failed writes will be now split into two steps:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback failed writes or rolled back just log compaction for instance?

### Changes to clean's rollback of failed writers
The clean logic for rolling back failed writes will be changed such that a table lock will be acquired while iterating through all existing inflight and pending rollback plans and scheduling new rollback plans. If a pending instant of a non removable-plan table service is encountered, the instant will be skipped (and will not have a rollback scheduled). In addition if the plan is not at least `table_service_rollback_delay` minutes old or has an active heartbeat, it will also be skipped. Otherwise this table service plan is neither a will also be marked for rollback, the same way that ingestion inflights are targeted for rollback, since it is neither a recent removable-plan nor a non removable-plan instant. Once all required rollback plans have been scheduled, the table lock will be released, and any/all scheduled rollback plans will be executed (this means that executions of the rollback plans won’t be under a lock). As a result of this change, the logic for rolling back failed writes will be now split into two steps:
1. Within a transaction, reload the active timeline and then get all instants that require a rollback plan be scheduled. For each, schedule a rollback plan.
2. Execute any pending rollback plans (be it pending rollback plans that already existed or new pending rollback plans scheduled during (1)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we write 2 sections completely. one for immutable plans. and another one for mutable. its very confusing to consider both cases everywhere.

lets first present a design for immutable plans and ensure design is solid considering all different interplays.



## Abstract
In order to improve latency/throughput of writes into a HUDI dataset, HUDI does not require that table service operations (such as clustering and compaction) be serially and sequentially performed before/after an ingestion write. Instead, by enabling HUDI multiwriter and async table service execution, a user can orchesterate seperate writers to potentially execute table service plans concurrently to an ingestion writers. This setup though may face reliability issues for clustering and compaction, as failed executions and rollbacks can cause delays in table service executions and prevent cluster/compaction/clean operations from being scheduled. This RFC proposes to address these limitations by using HUDI's heartbeating and transaction manager to update the behavior for clustering, compaction, and rollback of failed writes. With these changes users can build an orchestration platform for executing each table service independently without needing to make complicated/expensive changes to prevent multiple job/threads for targeting the same table service plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to improve latency/throughput of writes into a HUDI dataset, HUDI does not require that table service operations (such as clustering and compaction) be serially and sequentially performed before/after an ingestion write

This is not true since 1.x, by using the TrueTime API, the compaction plan can be scheduled in any separate job instead of in the intestion writer.

### Concurrent writers can execute table service plan

When a writer executes a compact, cluster, or logcompaction plan, it will first rollback any existing inflight attempt, and (depending on the type and configuration of the table service) optionally re-execute it. This can lead to dataset corruption if one writer is rolling back the instant while another is still executing it. See https://issues.apache.org/jira/browse/HUDI-7503 for an example.
Independent/outside of HUDI, a user may have an orchestration setup of concurrent writers where sometimes multiple writers can execute the sample plan simultaneously, due to a transient failure (at the orchestration leve) or misconfiguration. While it can be argued that HUDI does not to provide any guarantees of correctness if concurrent writers execute the same plan, updating HUDI to guard against this scenario would reduce the operational overhead and complexity of deploying a large-scale HUDI writer orchestration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it can be argued that HUDI does not to provide any guarantees of correctness if concurrent writers execute the same plan,

I think it still makes sense if the guarantees induces too much complexity, which would confuse a lot.

After a writer schedules a remove-plan table service (such as logcompaction), another writer performing clean can rollback the plan before it has a chance to be executed, deleting the plan from the timeline. This will prevent the table service from ever being executed, and if the table service execution to be async (being deferred for execution later by another writer) then the chance of this happening increases.

## Implementation
In order to resolve these limitations with compact, logcompact, and cluster, a new configuration value `table_service_rollback_delay` will be introduced. The value will indicate the number of minutes that must elapse before a clean attempt is allowed to start a rollback of any removable-plan table service (logcompaction or cluster if they are configured as such). In addition, changes related to heartbeating and table lock transactions will be made to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a little what are the liminations?

![table service lifecycle (1)](https://github.com/user-attachments/assets/4a656bde-4046-4d37-9398-db96144207aa)


If an immutable table service plan is configured for async execution then each of the listed table service instant transitions can potentially be preformed by independent concurrent jobs. Typically this is expected only if an execution attempt fails after performing a step, and from there the next execution attempt performs a rollback and re-attempts creating data files and comitting the instant. But in a distributed orchestration envrionment there may be a scenario where multiple jobs attempt to execute a table service plan at the same time, leading to one job rolling back a table service inflight instant while another is still executing it. This can undo the progress of table service execution (and in the worst case leave the dataset in an invalid state), harming the reliability of executing table services in an async manner. Given this use case, HUDI should enforce that at most on writer will execute a table service plan, and therefore if a writer sees a plan already being executed it should self-abort.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor typo "preformed"

Note that a failed table service should still be able to be safely cleaned up immediately - the goal here is just to make sure an inflight plan won't stay on the timeline for an unbounded amount of time but also won't be likely to be prematurely cleaned up by clean before it has a chance to be executed.

## Design
### Enabling a plan to be cancellable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if its worth serializing the last known completed instant to the clustering plan as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of last known completed, should we serialize all known inflight instants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, this is an existing bug where current stable HUDI 0.x may miss some candidate instants during write conflict detection phase

A writer should be able to explictly cancel a cancellable table service plan that an ongoing concurrent writer is executing, as long as it has not been committed yet. This requirement is needed due to presence of concurrent and async writers for table service execution, as another writer should not need to wait for a table servie writer to execute further or fail before confirming that its cancellation request will be honored. As will be shown later, this not require the writer requesting the cancellation to have the ability to terminate/fail the writer of the target cancellable tale service plan.

### (C) An incomplete cancellable plan should eventually have its partial writes cleaned up
Although cancellation (be it via an explict request or due to a write conflict) can ensure that a table service write is never committed, there still needs to be a mechanism to have its data and instant files cleaned up permenantly. At minumum the table service writer itself should be able to do this cleanup, but this is not sufficient as orchestration/transient failrures/resource allocation can prevent table service writers from re-attempting their write. Clean can be used to guarantee that an incomplete cancellable plan is eventually cleaned up, since datasets that undergo clustering are anyway expected to undergo regular clean operations. Because an inflight plan remaining on the timeline can degrade performance of reads/writes (as mentioned earlier), a cancellable table service plan should be elligible to be targeted for cleanup if HUDI clean deems that it has remained inflight for too long (or some other critera).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are adding heart beats for rollback as well, may be having both cleaner and the async table service to take care of rolling back a cancellable clustering plan is fine.
might be worth considering to add some additional delays to cleaner though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth considering to add some additional delays to cleaner though.
The cancellation policy will be used for this - if a cancellation policy is set to 2 hours then clean won't be able to "clean" the cancellable plan until 2 hours later

To satisfy goal (A), a new config flag "cancellable" can be added to a table service plan. A writer that intends to schedule a cancellable table service plan can enable the flag in the serialized plan metadata. Any writer executing the plan can infer that the plan is cancellable, and when trying to commit the instant should abort if it detects that any ingestion write or table service plan (without cancellable config flag) is targeting the same file groups. As a future optimization, the cancellable table writer can use early conflict detection (instead of waiting until committing the instant) to repeatadly poll for any conflicting write appearing on timeline, and abort earlier if needed.
On the other side in ingestion write, the commit finalization flow for ingestion writers can be updated to ignore any inflight table service plans if they are cancellable.
For the purpose of this design proposal, consider an ingestion job as having three steps:
1. Schedule itself on the timeline with a new instant time in a .requested file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the plan itself will contain some meta to understand whether this clustering instant is cancellable or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will contain

  • A flag indicating whether it is cancellable
  • The cancellation policy
    Only the first is needed for a plan to be considered "cancellable"

2. Process/record tag incoming records, build a workload profile, and write the updating/replaced file groups to a "inflight" instant file on the timeline. Check for conflicts and abort if needed.
3. Perform write conflict checks and commit the instant on the timeline

The aforementioned changes to ingestion and table service flow will ensure that in the event of a conflicting ingestion and cancellable table service writer, the ingestion job will take precedence (and cause the cancellable table service instant to eventually fail) as long as a cancellable table service hasn't be completed before (2). Since if the cancellable table service has already been completed before (2), the ingestion job will see that a completed instant (a cancellable table service action) conflicts with its ongoing inflight write, and therefore it would not be legal to proceed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets ensure that ingestion writer even if it has higher precedence compared to clustering, does not get committed if the concurrent clustering is completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets also ensure that ingestion writer can add request for cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to add that change below in Adding a cancel action and abort state for cancellable plans since that's where the cancel folder is proposed



#### Enabling table service execution and clean to support cancellation and automatic cleanup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is regarding rolling back a "cancelled" clustering.
wondering at the end of clustering, if we detect that there is a cancellation request, can we just abort the transaction.
so next time when we resume, first thing we will check if there is any cancel and then trigger rollback and abort.
that way, rollbacks for clustering is always triggered at one place in the code for a table service and not at many diff places.


1. Start a transaction
2. Reload active timeline
3. If instant is already comitted, abort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify 3. not sure I understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated phrasing


Note that similar to the proposed table service flow, this API will check for heartbeats within a transaction (to guarantee there are no writers still working on this instant) and will transition the state to .aborted within a transaction. The latter is done within a transaction (in both table service flow and this API) to prevent a concurrent request_cancel call from creating a file under a /.cancel of an instant already transtioned to .aborted. Although in this scenario calling execute_cancel or cancellable table service API again after this point would be safe, this guard should prevent files in /.cancel for already-aborted instants from accumulating.

Although any user can invoke request_cancel and execute_cancel, the clean operation will be required to use these APIs in order to cancel any incomplete plans that have satisfied their cancellation policy. This is analagous to how clean schedules and executes the rollback of any failed ingestion writes. Specifically, clean will now perform the following steps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also call out that, once a request for cancellation is placed, we can't take that back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I updated to add that in earlier section


## Goals
### (A) A cancellable table service plan should be capable of preventing itself from committing upon presence of write conflict
The current requirement of HUDI needing to execute a table service plan to completion forces ingestion writers to abort a commit if a table service plan is conflicting. Becuase an ingestion writer typically determines the exact file groups it will be updating/replacing after building a workload profile and performing record tagging, the writer may have already spent a lot of time and resources before realizing that it needs to abort. In the face of frequent table service plans or an old inflight plan, this will cause delays in adding recent upstream records to the dataset as well as unecessairly take away resources (such as Spark executors in the case of the Spark engine) from other applications in the data lake. A cancellable table service plan should avoid this situation by preventing itself from being committed if a conflicting ingestion job has been comitted already, and cancel itself. In conjunction, any ingestion writer or non-cancellable table service writer should be able to infer that a conflicting inflight table service plan is cancellable, and therefore can be ignored when attempting to commit the instant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one opportunity for us here:
when ingestion writer detects a conflicting clustering, but just before adding a cancellation, if clustering completes, the ingestion writer just have to redo the index lookup and just proceed. may be we can take this up later as an enhancement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure let's take this up an an enhancement once we start working on implementation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note on "In conjunction, any ingestion writer or non-cancellable table service writer should be able to infer that a conflicting inflight table service plan is cancellable, and therefore can be ignored when attempting to commit the instant." -> there should not be any conflict b/w mutliple table services. Bcoz, while scheduling clustering or compaction, we ignore all file groups that are part of pending table service plans. So, unless we relax that constraint, this should not be an issue.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given feedback. can you look into them



## Abstract
Table service plans can delay ingestion writes from updating a dataset with recent data if potential write conflicts are detected. Furthermore, a table service plan that isn't executed to completion for a large amount of time (due to repeated failures, application misconfiguration, or insufficient resources) will degrade the read/write performance of a dataset due to delaying clean, archival, and metadata table compaction. This is because currently HUDI table service plans, upon being scheduled, must be executed to completion. And additonally will prevent any ingestion write targeting the same files from succeeding (due to posing as a write conflict) as well as can prevent new table service plans from targeting the same files. Enabling a user to configure a table service plan as "cancellable" can prevent frequent or repeatedly failing table service plans from delaying ingestion. Support for cancellable plans will provide HUDI an avenue to fully cancel a table service plan and allow other table service and ingestion writers to proceed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this is an issue just for clustering. Pending compaction is not an issue anymore w/ NBCC w/ 1.x. We can leave the title as same, but the motivation can focus in clustering. We could call out that w/ NBCC going to become norm with 1.x, this should not be an issue for compaction.


## Goals
### (A) A cancellable table service plan should be capable of preventing itself from committing upon presence of write conflict
The current requirement of HUDI needing to execute a table service plan to completion forces ingestion writers to abort a commit if a table service plan is conflicting. Becuase an ingestion writer typically determines the exact file groups it will be updating/replacing after building a workload profile and performing record tagging, the writer may have already spent a lot of time and resources before realizing that it needs to abort. In the face of frequent table service plans or an old inflight plan, this will cause delays in adding recent upstream records to the dataset as well as unecessairly take away resources (such as Spark executors in the case of the Spark engine) from other applications in the data lake. A cancellable table service plan should avoid this situation by preventing itself from being committed if a conflicting ingestion job has been comitted already, and cancel itself. In conjunction, any ingestion writer or non-cancellable table service writer should be able to infer that a conflicting inflight table service plan is cancellable, and therefore can be ignored when attempting to commit the instant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note on "In conjunction, any ingestion writer or non-cancellable table service writer should be able to infer that a conflicting inflight table service plan is cancellable, and therefore can be ignored when attempting to commit the instant." -> there should not be any conflict b/w mutliple table services. Bcoz, while scheduling clustering or compaction, we ignore all file groups that are part of pending table service plans. So, unless we relax that constraint, this should not be an issue.

The current requirement of HUDI needing to execute a table service plan to completion forces ingestion writers to abort a commit if a table service plan is conflicting. Becuase an ingestion writer typically determines the exact file groups it will be updating/replacing after building a workload profile and performing record tagging, the writer may have already spent a lot of time and resources before realizing that it needs to abort. In the face of frequent table service plans or an old inflight plan, this will cause delays in adding recent upstream records to the dataset as well as unecessairly take away resources (such as Spark executors in the case of the Spark engine) from other applications in the data lake. A cancellable table service plan should avoid this situation by preventing itself from being committed if a conflicting ingestion job has been comitted already, and cancel itself. In conjunction, any ingestion writer or non-cancellable table service writer should be able to infer that a conflicting inflight table service plan is cancellable, and therefore can be ignored when attempting to commit the instant.

### (B) A cancellable table service plan should be eligible for cancellation at any point before committing
A writer should be able to explictly cancel a cancellable table service plan that an ongoing concurrent writer is executing, as long as it has not been committed yet. This requirement is needed due to presence of concurrent and async writers for table service execution, as another writer should not need to wait for a table service writer to execute further or fail before confirming that its cancellation request will be honored. As will be shown later, this not require the writer requesting the cancellation to have the ability to terminate/fail the writer of the target cancellable tale service plan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does "ongoing concurrent writer" here refers to table service execution worker right? sometimes we are overloading the term concurrent writer. On the first look, I thought, its another concurrent ingestion writer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "tale" (last but 2nd word).
also, the last statement is not very clear. can we rephrase that a bit

### (C) An incomplete cancellable plan should eventually have its partial writes cleaned up
Although cancellation (be it via an explict request or due to a write conflict) can ensure that a table service write is never committed, there still needs to be a mechanism to have its data and instant files cleaned up permenantly. At minumum the table service writer itself should be able to do this cleanup, but this is not sufficient as orchestration/transient failrures/resource allocation can prevent table service writers from re-attempting their write. Clean can be used to guarantee that an incomplete cancellable plan is eventually cleaned up, since datasets that undergo clustering are anyway expected to undergo regular clean operations. Because an inflight plan remaining on the timeline can degrade performance of reads/writes (as mentioned earlier), a cancellable table service plan should be elligible to be targeted for cleanup if HUDI clean deems that it has remained inflight for too long (or some other critera).
Note that a failed table service should still be able to be safely cleaned up immediately - the goal here is just to make sure an inflight plan won't stay on the timeline for an unbounded amount of time but also won't be likely to be prematurely cleaned up by clean before it has a chance to be executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know its implicit. but should we call out below as next bullet.

  • A cancelled table service(or even if request for cancellation is success) should not result in aborting any future ingestion writer.


## Design
### Enabling a plan to be cancellable
To satisfy goal (A), a new config flag "cancellable" can be added to a table service plan. A writer that intends to schedule a cancellable table service plan can enable the flag in the serialized plan metadata. Any writer executing the plan can infer that the plan is cancellable, and when trying to commit the instant should abort if it detects that any ingestion write or table service plan (without cancellable config flag) is targeting the same file groups. As a future optimization, the cancellable table writer can use early conflict detection (instead of waiting until committing the instant) to repeatadly poll for any conflicting write appearing on timeline, and abort earlier if needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have answered this above myself. but anyways.
are there chances that two cancellable clustering plans have overlap and both cancels/aborts itself during conflict resolution ?


The aforementioned changes to ingestion and table service flow will ensure that in the event of a conflicting ingestion and cancellable table service writer, the ingestion job will take precedence (and cause the cancellable table service instant to eventually fail) as long as a cancellable table service hasn't be completed before (2). Since if the cancellable table service has already been completed before (2), the ingestion job will see that a completed instant (a cancellable table service action) conflicts with its ongoing inflight write, and therefore it would not be legal to proceed.

### Adding a cancel action and abort state for cancellable plans
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this into two parah. 1st one focussing on .cancel and 2nd one discussing the abort state

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abort state will also be useful in other places.
for eg: while scheduling future compactions/clustering, in general we try to avoid file group which are part of pending clustering plans. But w/ abort state, we should only avoid file group which are part of pending clustering plans which are not yet up for cancellation or not yet aborted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to compensate some actions taken on the table though if a pending clustering is aborted.
for eg: delete partition operation.

lets check on bucket index flows as well to ensure we do are not missing anything w/ the new flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or eg: delete partition operation.
Currently if delete partition tries to target same partition that has file groups included in a (non-cancellable) pending clustering plan, will it also not replace/delete those file groups?


## Design
### Enabling a plan to be cancellable
To satisfy goal (A), a new config flag "cancellable" can be added to a table service plan. A writer that intends to schedule a cancellable table service plan can enable the flag in the serialized plan metadata. Any writer executing the plan can infer that the plan is cancellable, and when trying to commit the instant should abort if it detects that any ingestion write or table service plan (without cancellable config flag) is targeting the same file groups. As a future optimization, the cancellable table writer can use early conflict detection (instead of waiting until committing the instant) to repeatadly poll for any conflicting write appearing on timeline, and abort earlier if needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets ensure we align on one of the approaches.
a. either there is no ".cancel" request. And so the clustering job, at the end, will inspect all other ingestion commits that completed and aborts itself.
OR
b. we have ".cancel" and so the clustering job will never have to inspect other ingestion commits while trying to complete. If there was any overlap b/w ingestion writer and a pending cancellable clustering plan, the ingestion writer needs to ensure it adds a cancel request w/o fail.

I am inclined towards (b), so that clustering does not need to wait until the very end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with (a), ingestion writer cannot deterministically say that a concurrent cancellable clustering instant is about to complete or will abort for sure. But w/ (b), we can be sure w/ the cancel request w/o any non determinism.

### (B) A cancellable table service plan should be eligible for cancellation at any point before committing
A writer should be able to explictly cancel a cancellable table service plan that an ongoing concurrent writer is executing, as long as it has not been committed yet. This requirement is needed due to presence of concurrent and async writers for table service execution, as another writer should not need to wait for a table service writer to execute further or fail before confirming that its cancellation request will be honored. As will be shown later, this not require the writer requesting the cancellation to have the ability to terminate/fail the writer of the target cancellable tale service plan.

### (C) An incomplete cancellable plan should eventually have its partial writes cleaned up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo "cancellable" -> "cancelled"

The new ".aborted" state will allow writers to infer wether a cancelled table service plan needs to still have it's partial data writes cleaned up from the dataset, which is needed for (C). The additional design change below will complete the remaining requirement for (C) of eventual cleanup.

### Handling cancellation of plans
An additional config "cancellation-policy" can be added to the table service plan to indicate when it is ellgible to be permanently cancelled by writers other than the one responsbible for executing the table service. This policy can be a threshold of hours or instants on timeline, where if that # of hours/instants have elapsed since the plan was scheduled, any call to clean will cleanup the instant. This policy should be configured by the writer scheduling a cancellable table service, based on the amount of time they expect the plan to remain on the timeline before being picked up for execution. For example, if the plan is expected to have its execution deferred to a few hours later, then the cancellation-policy should be lenient in allowing the plan to remain many hours on the timeline before being subject to clean's cancellation. Note that this cancellation policy is not a repalacement for determining wether a table service plan is currently being executed - as with ingestion writes, permanent cleanup of a cancellable table service plan will only start once it is confirmed that a ongoing writer is no longer progressing it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case a: scheduled. never got a chance to execute. no cancellation request.
case b: scheduled. cancellation request added. never got a chance to execute.
case c: scheduled. execution attempted. cancellation request added. clustering job crashed. and never resumed.
case d: scheduled. execution keeps on failing on multiple re-attempts. no cancellation request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to account for failed rollback attempts.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending more

@github-actions github-actions bot added size:L PR with lines of changes in (300, 1000] and removed size:M PR with lines of changes in (100, 300] labels Dec 30, 2024

1. Start a transaction
2. Reload active timeline
3. If instant has been already aborted, exit without throwing exception. Cleanup file from /.cancel if still present
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need to account for below scenario.

our clustering table service can go through multiple attempts in general. Which means, there could be multiple rollbacks seen in timeline. so, when processing a request for cancellation, we need to deduce whether a rollback seen for a cancellable clustering plan happened before the cancel request or after.

So that, we can decide whether to trigger another rollback or just proceed onto to moving the state to "abort"

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed an update to the RFC w/ addressing my own feedback.

@nsivabalan
Copy link
Contributor

Can we move the clean policy for threshold based cancellation into an add on section.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants