-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36763 / 36690][runtime] Support new "distributed" schema evolution topology & fix parallelized hang glitch #3801
base: master
Are you sure you want to change the base?
Conversation
Here's a detailed write-up about the new topology for "distributed tables": Currently, a YAML pipeline job has a typical topology like this: It relies on a basic assumption: Data from a single table must either:
The underlying reason is we're lacking a coordination mechanism across schema operators. For example, if Schema Operator 1 triggers a schema change event request, other schema operators will not even be aware of that, since operators will only try to communicate with coordinator when it receives a schema change event from upstream. It would be a problem when handling distributed sources, since each partition could emit a schema change stream on its own, but we must have a globally effective schema to write to downstream. However, simply request operators to block and align is not viable in current design architecture, because we have a broadcast topology right after schema operator, and might freeze the entire downstream from receiving events, leaving us no chance to flush pending data records (See #3680 for more details about barrier alignment). To coordinate among schema operators and avoid blocking record stream, schema operator is moved more close to the sink, after being hashed and shuffled. That means data from various partitions will be mixed together, and will no longer satisfy normal schema change semantics. So, in distributed topology, we trace schemas independently for each source partition, in which they're guaranteed to be well-formed. Also, since any schema change event will be broadcast and copied The first step is a random schema operator initiates the schema change request: Other operators will do so eventually, since any events from upstream will be broadcast to all schema operators, and we don't need to let the coordinator to notify them: Notice that when a schema operator receives a schema change event and blocks upstream, it also emits a Now, the coordinator knows that:
Now, it can simply deduce a widest schema, apply it to external DB, and broadcast the consensus result to all schema operators when releasing them from blocking. |
57a89d4
to
1c12c8b
Compare
Polished, and marked this ready for review. |
d5f38a4
to
7c5f8d8
Compare
...ime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
Outdated
Show resolved
Hide resolved
Thanks @yuxiqian for this contribution, left some comments. |
7c5f8d8
to
f65d61f
Compare
b72ee4e
to
a6e9278
Compare
.../src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/cdc/runtime/partitioning/DistributedPrePartitionOperator.java
Outdated
Show resolved
Hide resolved
...apache/flink/cdc/runtime/operators/schema/common/event/common/CoordinationResponseUtils.java
Outdated
Show resolved
Hide resolved
Thanks for @lvyanquan and @Shawn-Hx's kindly review, addressed your comments. |
...c/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
Outdated
Show resolved
Hide resolved
5a12d12
to
419cd01
Compare
Signed-off-by: yuxiqian <[email protected]> # Conflicts: # flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java # flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml # flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml # tools/mig-test/datastream/datastream-3.0.1/pom.xml # tools/mig-test/datastream/datastream-3.1.0/pom.xml # tools/mig-test/datastream/datastream-3.1.1/pom.xml # Conflicts: # flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java # flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java # flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java # flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
… event Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
Also removed unused methods, rewrite outdated JavaDocs. Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
Signed-off-by: yuxiqian <[email protected]>
419cd01
to
35e24a8
Compare
...c/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java
Show resolved
Hide resolved
…s method Signed-off-by: yuxiqian <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @yuxiqian, LGTM.
This closes FLINK-36763 and FLINK-36690.
As explained in #3680, current pipeline design doesn't cooperate well with tables whose data and schema change events are distributed among different partitions, aka. distributed tables.
Sadly, some data sources (like Kafka) are scatterly-distributed naturally, and could not be easily introduced into current pipeline framework.
To resolve this issue while keep backwards compatibility, such changes have been made:
SchemaOperator
andSchemaCoordinator
for distributed topology. (See details below)schema.regular
package while new ones are located inschema.distributed
package.SchemaRegistry
to reduce duplication.@Experimental
optional method intoDataSource
to switch between two topologies.Composer will detect data source's distribution trait to determine which operator topology to generate.
SchemaMergingUtils
, and deprecate corresponding functions inSchemaUtils
.Now, schema merging is required in Transform, Routing, and Schema evolution stages. Sources that support schema inferencing might need it, too. Unifying them in one place would be easier to maintain.
CDC 3.1.1 was released over 6 months ago. Keeping state compatibility with earlier versions is not really worthwhile.
P.S: A detailed type merging tree would be like: