Skip to content

Commit 6254c7f

Browse files
committed
Checkpoint 51 - Refactor and remove Event*Functions
1 parent e845dec commit 6254c7f

File tree

9 files changed

+308
-1089
lines changed

9 files changed

+308
-1089
lines changed

.idea/vcs.xml

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ limitations under the License.
3131
<name>flink-cdc-pipeline-connector-hudi</name>
3232

3333
<properties>
34-
<hudi.version>1.1.0-SNAPSHOT</hudi.version>
34+
<hudi.version>1.2.0-SNAPSHOT</hudi.version>
3535
<hadoop.version>2.10.2</hadoop.version>
3636
<mockito.version>3.4.6</mockito.version>
3737
<metrics.version>4.1.1</metrics.version>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.schema.Schema;
2323
import org.apache.flink.cdc.connectors.hudi.sink.event.CreateTableOperatorEvent;
2424
import org.apache.flink.cdc.connectors.hudi.sink.event.EnhancedWriteMetadataEvent;
25+
import org.apache.flink.cdc.connectors.hudi.sink.event.SchemaChangeOperatorEvent;
2526
import org.apache.flink.cdc.connectors.hudi.sink.util.RowDataUtils;
2627
import org.apache.flink.configuration.Configuration;
2728
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -453,6 +454,8 @@ public void handleEventFromOperator(
453454
() -> {
454455
if (operatorEvent instanceof CreateTableOperatorEvent) {
455456
handleCreateTableEvent((CreateTableOperatorEvent) operatorEvent);
457+
} else if (operatorEvent instanceof SchemaChangeOperatorEvent) {
458+
handleSchemaChangeEvent((SchemaChangeOperatorEvent) operatorEvent);
456459
} else if (operatorEvent instanceof EnhancedWriteMetadataEvent) {
457460
handleEnhancedWriteMetadataEvent(
458461
(EnhancedWriteMetadataEvent) operatorEvent);
@@ -507,6 +510,71 @@ private void handleCreateTableEvent(CreateTableOperatorEvent createTableOperator
507510
});
508511
}
509512

513+
/**
514+
* Handles schema change events from the sink functions. Updates the cached schema and recreates
515+
* the write client to ensure it uses the new schema.
516+
*
517+
* @param event The schema change event containing the table ID and new schema
518+
*/
519+
private void handleSchemaChangeEvent(SchemaChangeOperatorEvent event) {
520+
TableId tableId = event.getTableId();
521+
Schema newSchema = event.getNewSchema();
522+
523+
LOG.info(
524+
"Received schema change event for table {}: {} columns",
525+
tableId,
526+
newSchema.getColumnCount());
527+
528+
// Update the cached schema
529+
tableSchemas.put(tableId, newSchema);
530+
LOG.info("Updated coordinator's schema cache for table: {}", tableId);
531+
532+
// Get the existing table context
533+
TableContext oldContext = tableContexts.get(tableId);
534+
if (oldContext == null) {
535+
LOG.warn(
536+
"Received schema change for unknown table: {}. Skipping write client update.",
537+
tableId);
538+
return;
539+
}
540+
541+
try {
542+
// Close the old write client
543+
if (oldContext.writeClient != null) {
544+
oldContext.writeClient.close();
545+
LOG.info("Closed old write client for table: {}", tableId);
546+
}
547+
548+
// Create new configuration with updated schema
549+
Configuration tableConfig = createTableSpecificConfig(tableId);
550+
551+
// Create new write client with updated schema
552+
HoodieFlinkWriteClient<?> newWriteClient =
553+
FlinkWriteClients.createWriteClient(tableConfig);
554+
LOG.info("Created new write client with updated schema for table: {}", tableId);
555+
556+
// Update the table context with the new write client
557+
// Keep the same eventBuffers, tableState, and tablePath
558+
TableContext newContext =
559+
new TableContext(
560+
newWriteClient,
561+
oldContext.eventBuffers,
562+
oldContext.tableState,
563+
oldContext.tablePath);
564+
tableContexts.put(tableId, newContext);
565+
566+
LOG.info("Successfully updated write client for table {} after schema change", tableId);
567+
} catch (Exception e) {
568+
LOG.error("Failed to update write client for table {} after schema change", tableId, e);
569+
context.failJob(
570+
new HoodieException(
571+
"Failed to update write client for table "
572+
+ tableId
573+
+ " after schema change",
574+
e));
575+
}
576+
}
577+
510578
private void handleEnhancedWriteMetadataEvent(EnhancedWriteMetadataEvent enhancedEvent) {
511579
String tablePath = enhancedEvent.getTablePath();
512580
WriteMetadataEvent event = enhancedEvent.getOriginalEvent();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.hudi.sink.event;
19+
20+
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.schema.Schema;
22+
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
23+
24+
/**
25+
* An operator event that encapsulates a schema change and the resulting new schema.
26+
*
27+
* <p>This event is sent from the {@code MultiTableEventStreamWriteFunction} to the {@code
28+
* MultiTableStreamWriteOperatorCoordinator} to signal that a table's schema has changed in the CDC
29+
* stream. The coordinator uses this event to update its cached schema and recreate the write client
30+
* to ensure subsequent operations use the correct schema.
31+
*/
32+
public class SchemaChangeOperatorEvent implements OperatorEvent {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
private final TableId tableId;
37+
private final Schema newSchema;
38+
39+
/**
40+
* Constructs a new SchemaChangeOperatorEvent.
41+
*
42+
* @param tableId The ID of the table whose schema changed
43+
* @param newSchema The new schema after applying the schema change
44+
*/
45+
public SchemaChangeOperatorEvent(TableId tableId, Schema newSchema) {
46+
this.tableId = tableId;
47+
this.newSchema = newSchema;
48+
}
49+
50+
/**
51+
* Gets the ID of the table whose schema changed.
52+
*
53+
* @return The table ID
54+
*/
55+
public TableId getTableId() {
56+
return tableId;
57+
}
58+
59+
/**
60+
* Gets the new schema after the change.
61+
*
62+
* @return The new schema
63+
*/
64+
public Schema getNewSchema() {
65+
return newSchema;
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "SchemaChangeOperatorEvent{"
71+
+ "tableId="
72+
+ tableId
73+
+ ", newSchema columns="
74+
+ newSchema.getColumnCount()
75+
+ '}';
76+
}
77+
}

0 commit comments

Comments
 (0)