Skip to content

Conversation

@voonhous
Copy link
Member

No description provided.

@voonhous
Copy link
Member Author

@danny0405 @cshuo FYI

@voonhous
Copy link
Member Author

Changes here will require Hudi 1.1.0 to be released first.

Copy link

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voonhous thanks for the pr. Can you also describe the scope of pr for the Hudi CDC sink, e.g., what index types and table service(compaction) modes are supported.

*/
private void processFlushForTableFunction(
EventBucketStreamWriteFunction tableFunction, Event flushEvent) {
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use reflection now? call tableFunction.flushRemaining(false); directly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

// Extract record key from event data using cached field getters
String recordKey = extractRecordKeyFromEvent(dataChangeEvent);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record key can be get from HoodieFlinkInternalRow directly by calling HoodieFlinkInternalRow#getRecordKey(). So extractRecordKeyFromEvent is unnecessary, and primaryKeyFieldGetters can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/** Base infrastructures for streaming writer function to handle Events. */
public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event>
implements EventProcessorFunction {
Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make minimal changes to StreamWriteFunction and BucketStreamWriteFunction, the generic type should kept as HoodieFlinkInternalRow. We can confine operations of Event within MultiTableEventStreamWriteFunction and StreamWriteFunction only need to provide the following operations:

  • processData(HoodieFlinkInternalRow): DataChangeEvent can be converted to HoodieFlinkInternalRow in MultiTableEventStreamWriteFunction.
  • flushRemaining(): called when flush event is received.
  • updateSchema()?: when shema change event is received, and need update inner schema or related fields, like index fields.

Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, i tried this, i remember what the problem was:

Event to HoodieFlinkInternalRow conversion in MultiTableEventStreamWriteFunction.

  1. HoodieFlinkInternalRow constructor requires fileId and instantTime upfront
  2. These values come from defineRecordLocation() which needs bucket number
  3. Cannot create HoodieFlinkInternalRow before calling defineRecordLocation

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.

Caused by: java.lang.RuntimeException: Failed to process schema event for table: hudi_inventory_bptbsn.products
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:296)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:167)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:72)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.cdc.connectors.hudi.sink.bucket.FlushEventAlignmentOperator.processElement(FlushEventAlignmentOperator.java:94)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: #processSchemaChange should not be called
	at org.apache.flink.cdc.connectors.hudi.sink.function.EventBucketStreamWriteFunction.processSchemaChange(EventBucketStreamWriteFunction.java:158)
	at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:293)
	... 24 more

It is being invoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* <p>Assumes that CreateTableEvent will always arrive before DataChangeEvent for each table,
* following the standard CDC pipeline startup sequence.
*/
public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> {
Copy link

@cshuo cshuo Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems HudiRecordEventSerializer is designed to deal with serializing for multiple tables. Like comments in EventStreamWriteFunction, HudiRecordEventSerializer can be a field of MultiTableStreamWriteOperatorCoordinator? serializing data change event to HoodieFlinkInternalRow which are then dispatched to corresponding table write functions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// - Data events go to their specific bucket's task
DataStream<BucketWrapper> partitionedStream =
bucketAssignedStream.partitionCustom(
(key, numPartitions) -> key % numPartitions,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should also consider data skew problem since there are records from multiple table & partitions. You can refer to BucketIndexUtil#getPartitionIndexFunc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

DataChangeEvent dataChangeEvent, Schema schema) {
List<String> partitionKeys = schema.partitionKeys();
if (partitionKeys == null || partitionKeys.isEmpty()) {
return "default";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be "" here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, good catch, fixed.


/** Base infrastructures for streaming writer function to handle Events. */
public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event>
implements EventProcessorFunction {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().

}

/**
* Calculate bucket from HoodieFlinkInternalRow using the record key. The record key is already
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to support bucket byhoodie.bucket.index.hash.field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, was planning on standardising eveyrthing to use record keys first. Since there is an orthogonal discussion on config, i wanted to leave this out for a separate exercise.

String instantTime) {

// Extract record key from primary key fields
String recordKey = extractRecordKeyFromDataChangeEvent(dataChangeEvent, schema);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use RowDataKeyGen to get record key and partition path directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@voonhous voonhous force-pushed the hudi-connector-rework-push-to-origin branch from 9f52239 to 6254c7f Compare November 5, 2025 06:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants