Skip to content

Commit 94e6930

Browse files
committed
Checkpoint 49 - Use RowDataKeyGen
1 parent 82aba79 commit 94e6930

File tree

2 files changed

+62
-195
lines changed

2 files changed

+62
-195
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/bucket/BucketAssignOperator.java

Lines changed: 5 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717

1818
package org.apache.flink.cdc.connectors.hudi.sink.bucket;
1919

20-
import org.apache.flink.cdc.common.data.RecordData;
2120
import org.apache.flink.cdc.common.event.DataChangeEvent;
2221
import org.apache.flink.cdc.common.event.Event;
2322
import org.apache.flink.cdc.common.event.FlushEvent;
24-
import org.apache.flink.cdc.common.event.OperationType;
2523
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
2624
import org.apache.flink.cdc.common.event.TableId;
2725
import org.apache.flink.cdc.common.schema.Schema;
28-
import org.apache.flink.cdc.common.types.DataType;
2926
import org.apache.flink.cdc.common.utils.SchemaUtils;
27+
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
3028
import org.apache.flink.cdc.connectors.hudi.sink.v2.OperatorIDGenerator;
3129
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
3230
import org.apache.flink.configuration.Configuration;
@@ -46,7 +44,6 @@
4644
import org.slf4j.Logger;
4745
import org.slf4j.LoggerFactory;
4846

49-
import java.util.ArrayList;
5047
import java.util.HashMap;
5148
import java.util.List;
5249
import java.util.Map;
@@ -86,9 +83,6 @@ public class BucketAssignOperator extends AbstractStreamOperator<BucketWrapper>
8683
/** Cache of primary key fields per table. */
8784
private final Map<TableId, List<String>> primaryKeyCache = new HashMap<>();
8885

89-
/** Cache of field getters per table. */
90-
private final Map<TableId, List<RecordData.FieldGetter>> fieldGetterCache = new HashMap<>();
91-
9286
public BucketAssignOperator(Configuration conf, String schemaOperatorUid) {
9387
this.numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
9488
this.schemaOperatorUid = schemaOperatorUid;
@@ -131,8 +125,7 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
131125
Schema newSchema = SchemaUtils.applySchemaChangeEvent(existingSchema, schemaEvent);
132126
schemaCache.put(schemaEvent.tableId(), newSchema);
133127

134-
// Clear caches when schema changes
135-
fieldGetterCache.remove(schemaEvent.tableId());
128+
// Clear primary key cache when schema changes
136129
primaryKeyCache.remove(schemaEvent.tableId());
137130

138131
// Broadcast to all tasks
@@ -219,195 +212,17 @@ private int calculateTaskIndex(DataChangeEvent event) {
219212
"Cannot calculate bucket: table " + tableId + " has no primary keys");
220213
}
221214

222-
// Create final references for use in lambda
223-
final List<String> finalPrimaryKeys = primaryKeys;
224-
225-
// Get or cache field getters
226-
List<RecordData.FieldGetter> fieldGetters =
227-
fieldGetterCache.computeIfAbsent(
228-
tableId,
229-
k -> {
230-
List<RecordData.FieldGetter> getters =
231-
new ArrayList<>(finalPrimaryKeys.size());
232-
for (String primaryKeyField : finalPrimaryKeys) {
233-
int fieldIndex =
234-
finalSchema.getColumnNames().indexOf(primaryKeyField);
235-
if (fieldIndex == -1) {
236-
throw new IllegalStateException(
237-
"Primary key field '"
238-
+ primaryKeyField
239-
+ "' not found in schema for table "
240-
+ tableId);
241-
}
242-
DataType fieldType =
243-
finalSchema.getColumns().get(fieldIndex).getType();
244-
getters.add(RecordData.createFieldGetter(fieldType, fieldIndex));
245-
}
246-
return getters;
247-
});
248-
249-
// Extract record key
250-
String recordKey = extractRecordKey(event, primaryKeys, fieldGetters);
215+
// Use RowDataUtils to extract record key and partition path
216+
String recordKey = RowDataUtils.extractRecordKeyFromDataChangeEvent(event, finalSchema);
217+
String partition = RowDataUtils.extractPartitionPathFromDataChangeEvent(event, finalSchema);
251218

252219
// Calculate bucket using Hudi's logic (0 to numBuckets-1)
253220
String tableIndexKeyFields = String.join(",", primaryKeys);
254221
int bucketNumber = BucketIdentifier.getBucketId(recordKey, tableIndexKeyFields, numBuckets);
255222

256-
// Extract partition path from the event
257-
String partition = extractPartitionPath(event, finalSchema, fieldGetters);
258-
259223
// Use partition function to map bucket to task index for balanced distribution
260224
int taskIndex = partitionIndexFunc.apply(numBuckets, partition, bucketNumber);
261225

262226
return taskIndex;
263227
}
264-
265-
private String extractRecordKey(
266-
DataChangeEvent event,
267-
List<String> primaryKeys,
268-
List<RecordData.FieldGetter> fieldGetters) {
269-
// For DELETE, use 'before' data; for INSERT/UPDATE, use 'after' data
270-
RecordData recordData = event.op() == OperationType.DELETE ? event.before() : event.after();
271-
272-
if (recordData == null) {
273-
throw new IllegalStateException(
274-
"Cannot extract record key: " + event.op() + " event has null data");
275-
}
276-
277-
List<String> recordKeyPairs = new ArrayList<>(primaryKeys.size());
278-
for (int i = 0; i < primaryKeys.size(); i++) {
279-
RecordData.FieldGetter fieldGetter = fieldGetters.get(i);
280-
Object fieldValue = fieldGetter.getFieldOrNull(recordData);
281-
282-
if (fieldValue == null) {
283-
throw new IllegalStateException(
284-
"Primary key field '" + primaryKeys.get(i) + "' is null in record");
285-
}
286-
287-
// Format as "fieldName:value"
288-
recordKeyPairs.add(primaryKeys.get(i) + ":" + fieldValue);
289-
}
290-
291-
return String.join(",", recordKeyPairs);
292-
}
293-
294-
/**
295-
* Extract partition path from the DataChangeEvent based on schema partition keys.
296-
*
297-
* <p>If the schema has partition keys defined:
298-
*
299-
* <ul>
300-
* <li>Extracts partition field values from the record data
301-
* <li>Formats them as "field1=value1/field2=value2" (Hive-style partitioning)
302-
* </ul>
303-
*
304-
* <p>If no partition keys are defined, returns empty string (for unpartitioned tables).
305-
*
306-
* @param event The DataChangeEvent to extract partition from
307-
* @param schema The table schema containing partition key definitions
308-
* @param fieldGetters Field getters for extracting values (not used currently, may be needed
309-
* for optimization)
310-
* @return The partition path string (empty string for unpartitioned tables)
311-
*/
312-
private String extractPartitionPath(
313-
DataChangeEvent event, Schema schema, List<RecordData.FieldGetter> fieldGetters) {
314-
315-
// Check if schema has partition keys defined
316-
List<String> partitionKeys = schema.partitionKeys();
317-
if (partitionKeys == null || partitionKeys.isEmpty()) {
318-
// Hudi convention: unpartitioned tables use empty string, not "default"
319-
return "";
320-
}
321-
322-
// Get the record data to extract from (after for INSERT/UPDATE/REPLACE, before for DELETE)
323-
RecordData recordData;
324-
switch (event.op()) {
325-
case INSERT:
326-
case UPDATE:
327-
case REPLACE:
328-
recordData = event.after();
329-
break;
330-
case DELETE:
331-
recordData = event.before();
332-
break;
333-
default:
334-
throw new IllegalArgumentException("Unsupported operation: " + event.op());
335-
}
336-
337-
if (recordData == null) {
338-
throw new IllegalStateException(
339-
"Cannot extract partition path: " + event.op() + " event has null data");
340-
}
341-
342-
// Extract partition values and build partition path
343-
List<String> partitionParts = new ArrayList<>(partitionKeys.size());
344-
for (String partitionKey : partitionKeys) {
345-
int fieldIndex = schema.getColumnNames().indexOf(partitionKey);
346-
if (fieldIndex == -1) {
347-
throw new IllegalStateException(
348-
"Partition key field '"
349-
+ partitionKey
350-
+ "' not found in schema for table "
351-
+ event.tableId());
352-
}
353-
354-
// Get field value
355-
Object fieldValue;
356-
if (recordData.isNullAt(fieldIndex)) {
357-
// Handle null partition values - use "__HIVE_DEFAULT_PARTITION__" as per Hive
358-
// convention
359-
fieldValue = "__HIVE_DEFAULT_PARTITION__";
360-
} else {
361-
// Get the field value based on the field type
362-
DataType fieldType = schema.getColumns().get(fieldIndex).getType();
363-
fieldValue = getFieldValue(recordData, fieldIndex, fieldType);
364-
}
365-
366-
// Format as "key=value" (Hive-style partitioning)
367-
partitionParts.add(partitionKey + "=" + fieldValue);
368-
}
369-
370-
// Join partition parts with "/"
371-
return String.join("/", partitionParts);
372-
}
373-
374-
/**
375-
* Extract field value from RecordData based on field type. This is a simplified version -
376-
* complex types may need additional handling.
377-
*/
378-
private Object getFieldValue(RecordData recordData, int fieldIndex, DataType fieldType) {
379-
switch (fieldType.getTypeRoot()) {
380-
case CHAR:
381-
case VARCHAR:
382-
return recordData.getString(fieldIndex).toString();
383-
case BOOLEAN:
384-
return recordData.getBoolean(fieldIndex);
385-
case TINYINT:
386-
return recordData.getByte(fieldIndex);
387-
case SMALLINT:
388-
return recordData.getShort(fieldIndex);
389-
case INTEGER:
390-
case DATE:
391-
return recordData.getInt(fieldIndex);
392-
case BIGINT:
393-
return recordData.getLong(fieldIndex);
394-
case FLOAT:
395-
return recordData.getFloat(fieldIndex);
396-
case DOUBLE:
397-
return recordData.getDouble(fieldIndex);
398-
case TIMESTAMP_WITHOUT_TIME_ZONE:
399-
return recordData.getTimestamp(
400-
fieldIndex,
401-
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision(fieldType));
402-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
403-
return recordData.getLocalZonedTimestampData(
404-
fieldIndex,
405-
org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision(fieldType));
406-
default:
407-
// For other types, create a field getter and use it
408-
RecordData.FieldGetter fieldGetter =
409-
RecordData.createFieldGetter(fieldType, fieldIndex);
410-
return fieldGetter.getFieldOrNull(recordData);
411-
}
412-
}
413228
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/util/RowDataUtils.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,51 @@ public static HoodieFlinkInternalRow convertDataChangeEventToHoodieFlinkInternal
243243

244244
/**
245245
* Convert a DataChangeEvent to a HoodieFlinkInternalRow with automatic record key and partition
246-
* path extraction.
246+
* path extraction using Hudi's RowDataKeyGen. This is the preferred method as it uses Hudi's
247+
* built-in key generation logic.
248+
*
249+
* @param dataChangeEvent The DataChangeEvent to convert
250+
* @param schema Schema for the table
251+
* @param zoneId Time zone for timestamp conversion
252+
* @param keyGen Hudi's RowDataKeyGen for extracting record keys and partition paths
253+
* @param fileId The file ID for the record
254+
* @param instantTime The instant time for the record
255+
* @return HoodieFlinkInternalRow containing the converted data
256+
*/
257+
public static HoodieFlinkInternalRow convertDataChangeEventToHoodieFlinkInternalRow(
258+
DataChangeEvent dataChangeEvent,
259+
Schema schema,
260+
ZoneId zoneId,
261+
org.apache.hudi.sink.bulk.RowDataKeyGen keyGen,
262+
String fileId,
263+
String instantTime) {
264+
265+
// Convert DataChangeEvent to RowData using existing utility
266+
List<FieldGetter> fieldGetters = createFieldGetters(schema, zoneId);
267+
RowData rowData = convertDataChangeEventToRowData(dataChangeEvent, fieldGetters);
268+
269+
// Use Hudi's RowDataKeyGen to extract record key and partition path
270+
String recordKey = keyGen.getRecordKey(rowData);
271+
String partitionPath = keyGen.getPartitionPath(rowData);
272+
273+
// Map CDC operation to Hudi operation type
274+
String operationType = mapCdcOperationToHudiOperation(dataChangeEvent.op());
275+
276+
// Create and return HoodieFlinkInternalRow
277+
return new HoodieFlinkInternalRow(
278+
recordKey, // Record key
279+
partitionPath, // Partition path
280+
fileId, // File ID
281+
instantTime, // Instant time
282+
operationType, // Operation type
283+
false, // isIndexRecord
284+
rowData // Row data
285+
);
286+
}
287+
288+
/**
289+
* Convert a DataChangeEvent to a HoodieFlinkInternalRow with automatic record key and partition
290+
* path extraction. Falls back to manual extraction when RowDataKeyGen is not available.
247291
*
248292
* @param dataChangeEvent The DataChangeEvent to convert
249293
* @param schema Schema for the table
@@ -284,8 +328,15 @@ private static String mapCdcOperationToHudiOperation(OperationType cdcOp) {
284328
}
285329
}
286330

287-
/** Extract record key from DataChangeEvent based on primary key fields in schema. */
288-
private static String extractRecordKeyFromDataChangeEvent(
331+
/**
332+
* Extract record key from DataChangeEvent based on primary key fields in schema. Public utility
333+
* method for use by operators that need to calculate record keys.
334+
*
335+
* @param dataChangeEvent The DataChangeEvent to extract record key from
336+
* @param schema The table schema containing primary key definitions
337+
* @return The record key string in format "field1:value1,field2:value2"
338+
*/
339+
public static String extractRecordKeyFromDataChangeEvent(
289340
DataChangeEvent dataChangeEvent, Schema schema) {
290341
List<String> primaryKeyFields = schema.primaryKeys();
291342
if (primaryKeyFields.isEmpty()) {
@@ -346,7 +397,8 @@ private static String extractRecordKeyFromDataChangeEvent(
346397
}
347398

348399
/**
349-
* Extract partition path from DataChangeEvent based on partition key fields in schema.
400+
* Extract partition path from DataChangeEvent based on partition key fields in schema. Public
401+
* utility method for use by operators that need to calculate partition paths.
350402
*
351403
* <p>If the schema has partition keys defined:
352404
*
@@ -361,7 +413,7 @@ private static String extractRecordKeyFromDataChangeEvent(
361413
* @param schema The table schema containing partition key definitions
362414
* @return The partition path string (empty string for unpartitioned tables)
363415
*/
364-
private static String extractPartitionPathFromDataChangeEvent(
416+
public static String extractPartitionPathFromDataChangeEvent(
365417
DataChangeEvent dataChangeEvent, Schema schema) {
366418
List<String> partitionKeys = schema.partitionKeys();
367419
if (partitionKeys == null || partitionKeys.isEmpty()) {

0 commit comments

Comments
 (0)