Skip to content

Commit e845dec

Browse files
committed
Checkpoint 50 - Remove code duplication via overloading
1 parent 94e6930 commit e845dec

File tree

2 files changed

+3
-61
lines changed

2 files changed

+3
-61
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/event/HudiRecordEventSerializer.java

Lines changed: 1 addition & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -134,49 +134,7 @@ public HoodieFlinkInternalRow serialize(Event event, String fileId, String insta
134134
*/
135135
@Override
136136
public HoodieFlinkInternalRow serialize(Event event) {
137-
if (event instanceof CreateTableEvent) {
138-
CreateTableEvent createTableEvent = (CreateTableEvent) event;
139-
schemaMaps.put(createTableEvent.tableId(), createTableEvent.getSchema());
140-
// Clear field getter cache for this table since schema changed
141-
fieldGetterCache.remove(createTableEvent.tableId());
142-
// Schema events don't produce records
143-
return null;
144-
145-
} else if (event instanceof SchemaChangeEvent) {
146-
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
147-
Schema existingSchema = schemaMaps.get(schemaChangeEvent.tableId());
148-
if (existingSchema != null
149-
&& !SchemaUtils.isSchemaChangeEventRedundant(
150-
existingSchema, schemaChangeEvent)) {
151-
Schema newSchema =
152-
SchemaUtils.applySchemaChangeEvent(existingSchema, schemaChangeEvent);
153-
schemaMaps.put(schemaChangeEvent.tableId(), newSchema);
154-
// Clear field getter cache for this table since schema changed
155-
fieldGetterCache.remove(schemaChangeEvent.tableId());
156-
}
157-
// Schema events don't produce records
158-
return null;
159-
160-
} else if (event instanceof DataChangeEvent) {
161-
DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
162-
Schema schema = schemaMaps.get(dataChangeEvent.tableId());
163-
164-
if (schema == null) {
165-
throw new IllegalStateException(
166-
"No schema available for table "
167-
+ dataChangeEvent.tableId()
168-
+ ". CreateTableEvent should arrive before DataChangeEvent.");
169-
}
170-
171-
// Convert DataChangeEvent to HoodieFlinkInternalRow using utility function
172-
// Use temporary values that will be overridden later
173-
return RowDataUtils.convertDataChangeEventToHoodieFlinkInternalRow(
174-
dataChangeEvent, schema, zoneId, "temp", "temp");
175-
} else {
176-
throw new IllegalArgumentException(
177-
"Unsupported event type for Hudi serialization: "
178-
+ event.getClass().getSimpleName());
179-
}
137+
return serialize(event, "temp", "temp");
180138
}
181139

182140
/**
@@ -199,22 +157,6 @@ public boolean hasSchema(TableId tableId) {
199157
return schemaMaps.containsKey(tableId);
200158
}
201159

202-
/**
203-
* Get cached field getters for a table, creating them if needed.
204-
*
205-
* @param tableId The table identifier
206-
* @return List of field getters or null if schema not available
207-
*/
208-
public List<RecordData.FieldGetter> getFieldGetters(TableId tableId) {
209-
Schema schema = schemaMaps.get(tableId);
210-
if (schema == null) {
211-
return null;
212-
}
213-
214-
return fieldGetterCache.computeIfAbsent(
215-
tableId, k -> RowDataUtils.createFieldGetters(schema, zoneId));
216-
}
217-
218160
/**
219161
* Set schema for a table. Used to initialize table-specific serializers with schema.
220162
*

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/function/MultiTableEventStreamWriteFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,8 @@ private EventBucketStreamWriteFunction createTableFunction(TableId tableId) thro
436436
tableFunction.setCorrespondent(tableCorrespondent);
437437
tableFunction.setTableId(tableId);
438438

439-
// This is the key change: instead of passing the raw gateway, we pass a proxy
440-
// that intercepts and enhances events with the table path.
439+
// Instead of passing the raw gateway, we pass a proxy that intercepts and enhances events
440+
// with the table path
441441
String tablePath = tableConfig.getString(FlinkOptions.PATH);
442442
tableFunction.setOperatorEventGateway(
443443
new InterceptingGateway(this.getOperatorEventGateway(), tablePath));

0 commit comments

Comments
 (0)