-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add Hudi sink connector support #4164
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Add Hudi sink connector support #4164
Conversation
|
@danny0405 @cshuo FYI |
|
Changes here will require Hudi 1.1.0 to be released first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@voonhous thanks for the pr. Can you also describe the scope of pr for the Hudi CDC sink, e.g., what index types and table service(compaction) modes are supported.
| */ | ||
| private void processFlushForTableFunction( | ||
| EventBucketStreamWriteFunction tableFunction, Event flushEvent) { | ||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to use reflection now? call tableFunction.flushRemaining(false); directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
|
|
||
| // Extract record key from event data using cached field getters | ||
| String recordKey = extractRecordKeyFromEvent(dataChangeEvent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record key can be get from HoodieFlinkInternalRow directly by calling HoodieFlinkInternalRow#getRecordKey(). So extractRecordKeyFromEvent is unnecessary, and primaryKeyFieldGetters can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| /** Base infrastructures for streaming writer function to handle Events. */ | ||
| public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event> | ||
| implements EventProcessorFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make minimal changes to StreamWriteFunction and BucketStreamWriteFunction, the generic type should kept as HoodieFlinkInternalRow. We can confine operations of Event within MultiTableEventStreamWriteFunction and StreamWriteFunction only need to provide the following operations:
- processData(HoodieFlinkInternalRow): DataChangeEvent can be converted to HoodieFlinkInternalRow in MultiTableEventStreamWriteFunction.
- flushRemaining(): called when flush event is received.
- updateSchema()?: when shema change event is received, and need update inner schema or related fields, like index fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no need to implement EventProcessorFunction, actually processSchemaChange and processFlush of EventStreamWriteFunction will never be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, i tried this, i remember what the problem was:
Event to HoodieFlinkInternalRow conversion in MultiTableEventStreamWriteFunction.
- HoodieFlinkInternalRow constructor requires fileId and instantTime upfront
- These values come from defineRecordLocation() which needs bucket number
- Cannot create HoodieFlinkInternalRow before calling defineRecordLocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no need to implement
EventProcessorFunction, actuallyprocessSchemaChangeandprocessFlushofEventStreamWriteFunctionwill never be called.
Caused by: java.lang.RuntimeException: Failed to process schema event for table: hudi_inventory_bptbsn.products
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:296)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:167)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processElement(MultiTableEventStreamWriteFunction.java:72)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.cdc.connectors.hudi.sink.bucket.FlushEventAlignmentOperator.processElement(FlushEventAlignmentOperator.java:94)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.UnsupportedOperationException: #processSchemaChange should not be called
at org.apache.flink.cdc.connectors.hudi.sink.function.EventBucketStreamWriteFunction.processSchemaChange(EventBucketStreamWriteFunction.java:158)
at org.apache.flink.cdc.connectors.hudi.sink.function.MultiTableEventStreamWriteFunction.processSchemaChange(MultiTableEventStreamWriteFunction.java:293)
... 24 more
It is being invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * <p>Assumes that CreateTableEvent will always arrive before DataChangeEvent for each table, | ||
| * following the standard CDC pipeline startup sequence. | ||
| */ | ||
| public class HudiRecordEventSerializer implements HudiRecordSerializer<Event> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems HudiRecordEventSerializer is designed to deal with serializing for multiple tables. Like comments in EventStreamWriteFunction, HudiRecordEventSerializer can be a field of MultiTableStreamWriteOperatorCoordinator? serializing data change event to HoodieFlinkInternalRow which are then dispatched to corresponding table write functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // - Data events go to their specific bucket's task | ||
| DataStream<BucketWrapper> partitionedStream = | ||
| bucketAssignedStream.partitionCustom( | ||
| (key, numPartitions) -> key % numPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should also consider data skew problem since there are records from multiple table & partitions. You can refer to BucketIndexUtil#getPartitionIndexFunc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| DataChangeEvent dataChangeEvent, Schema schema) { | ||
| List<String> partitionKeys = schema.partitionKeys(); | ||
| if (partitionKeys == null || partitionKeys.isEmpty()) { | ||
| return "default"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be "" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, good catch, fixed.
|
|
||
| /** Base infrastructures for streaming writer function to handle Events. */ | ||
| public abstract class EventStreamWriteFunction extends AbstractStreamWriteFunction<Event> | ||
| implements EventProcessorFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileId and instantTime are not required to construct HoodieFlinkInternalRow, these two fields are later set in defineRecordLocation().
| } | ||
|
|
||
| /** | ||
| * Calculate bucket from HoodieFlinkInternalRow using the record key. The record key is already |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to support bucket byhoodie.bucket.index.hash.field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet, was planning on standardising eveyrthing to use record keys first. Since there is an orthogonal discussion on config, i wanted to leave this out for a separate exercise.
| String instantTime) { | ||
|
|
||
| // Extract record key from primary key fields | ||
| String recordKey = extractRecordKeyFromDataChangeEvent(dataChangeEvent, schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use RowDataKeyGen to get record key and partition path directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
9f52239 to
6254c7f
Compare
No description provided.