Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36763 / 36690][runtime] Support new "distributed" schema evolution topology & fix parallelized hang glitch #3801

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented Dec 13, 2024

This closes FLINK-36763 and FLINK-36690.

As explained in #3680, current pipeline design doesn't cooperate well with tables whose data and schema change events are distributed among different partitions, aka. distributed tables.

Sadly, some data sources (like Kafka) are scatterly-distributed naturally, and could not be easily introduced into current pipeline framework.

To resolve this issue while keep backwards compatibility, such changes have been made:

  1. Added another suit of SchemaOperator and SchemaCoordinator for distributed topology. (See details below)
  • Previous operators are still in schema.regular package while new ones are located in schema.distributed package.
  • Common codes have been escalated into an abstract base class SchemaRegistry to reduce duplication.
  1. Added a new @Experimental optional method into DataSource to switch between two topologies.
@PublicEvolving
public interface DataSource {
    // ...

    @Experimental
    default boolean canContainDistributedTables() {
        return false;
    }
}

Composer will detect data source's distribution trait to determine which operator topology to generate.

  1. Extracted schema merging utilities into SchemaMergingUtils, and deprecate corresponding functions in SchemaUtils.

Now, schema merging is required in Transform, Routing, and Schema evolution stages. Sources that support schema inferencing might need it, too. Unifying them in one place would be easier to maintain.

  1. Updated migration test cases to cover CDC 3.2.0+ only.

CDC 3.1.1 was released over 6 months ago. Keeping state compatibility with earlier versions is not really worthwhile.


P.S: A detailed type merging tree would be like:
output

@yuxiqian
Copy link
Contributor Author

yuxiqian commented Dec 13, 2024

Here's a detailed write-up about the new topology for "distributed tables":

Currently, a YAML pipeline job has a typical topology like this:

current_topology

It relies on a basic assumption: Data from a single table must either:

  • only presents and evolves in one single partition...
  • or, presents in multiple partitions, but with a globally static schema.

The underlying reason is we're lacking a coordination mechanism across schema operators. For example, if Schema Operator 1 triggers a schema change event request, other schema operators will not even be aware of that, since operators will only try to communicate with coordinator when it receives a schema change event from upstream.

It would be a problem when handling distributed sources, since each partition could emit a schema change stream on its own, but we must have a globally effective schema to write to downstream.

However, simply request operators to block and align is not viable in current design architecture, because we have a broadcast topology right after schema operator, and might freeze the entire downstream from receiving events, leaving us no chance to flush pending data records (See #3680 for more details about barrier alignment).


To coordinate among schema operators and avoid blocking record stream, schema operator is moved more close to the sink, after being hashed and shuffled. That means data from various partitions will be mixed together, and will no longer satisfy normal schema change semantics.

mixed

So, in distributed topology, we trace schemas independently for each source partition, in which they're guaranteed to be well-formed.

sep-trace

Also, since any schema change event will be broadcast and copied $N$ times (where $N$ is the sink-side parallelism), it is guaranteed that schema operators will initiate $kN$ requests in total, and we don't need to notify each schema operator to block and talk with the coordinator.

The first step is a random schema operator initiates the schema change request:

step_1

Other operators will do so eventually, since any events from upstream will be broadcast to all schema operators, and we don't need to let the coordinator to notify them:

step_2

Notice that when a schema operator receives a schema change event and blocks upstream, it also emits a FlushEvent to sink writers to tell them all pending data change events must be handled and persistently flushed. After that, sink writers will report success to the coordinator directly.

step_3

Now, the coordinator knows that:

  • All upcoming streams are already blocked (since all schema operators have started requests)
  • All pending data change events in the pipeline have been flushed (since it has collected all data writers' success report)
  • Currently-known upstream schemas from all partitions are known (told along with the schema change request)

Now, it can simply deduce a widest schema, apply it to external DB, and broadcast the consensus result to all schema operators when releasing them from blocking.

step_4

@yuxiqian
Copy link
Contributor Author

Polished, and marked this ready for review.

@yuxiqian yuxiqian marked this pull request as ready for review December 16, 2024 12:00
@yuxiqian yuxiqian force-pushed the FLINK-36763-V3 branch 2 times, most recently from d5f38a4 to 7c5f8d8 Compare December 18, 2024 09:51
@lvyanquan
Copy link
Contributor

Thanks @yuxiqian for this contribution, left some comments.

@yuxiqian
Copy link
Contributor Author

Thanks for @lvyanquan and @Shawn-Hx's kindly review, addressed your comments.

Signed-off-by: yuxiqian <[email protected]>

# Conflicts:
#	flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
#	flink-cdc-migration-tests/flink-cdc-release-3.0.0/pom.xml
#	flink-cdc-migration-tests/flink-cdc-release-3.0.1/pom.xml
#	tools/mig-test/datastream/datastream-3.0.1/pom.xml
#	tools/mig-test/datastream/datastream-3.1.0/pom.xml
#	tools/mig-test/datastream/datastream-3.1.1/pom.xml

# Conflicts:
#	flink-cdc-common/src/main/java/org/apache/flink/cdc/common/source/DataSource.java
#	flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
#	flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java
#	flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
#	flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
Copy link
Contributor

@Shawn-Hx Shawn-Hx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuxiqian, LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants