flink同步库,主要功能是表同步,格式基于debezium。
PS: 不要只看文档,要不然项目可能跑不起来,很多地方不清晰,要文档跟代码一起看,反正代码量也不多。
代码下载下来后,先手动将一些通用的依赖包添加到flink的lib目录下,这样打出的包就不会很大,包括:
- flink连接器
- flink-connector-kafka-1.15.0.jar
- flink-connector-jdbc-1.15.0.jar
- jdbc数据库驱动
- kafka-clients-2.8.1.jar
- mysql-connector-java-8.0.29.jar
- mssql-jdbc-8.2.2.jre8.jar
会打包出flink-table-sync.jar
mvn clean package -DskipTests
在flink web ui界面上传,参数并行度
填个合适的值,
更高的并行度就同步更快(比如cpu核数),
程序参数填--file <file>
,file是文件的绝对路径,
--env <local/prod>
,默认prod,local时会创建本地带web ui的环境,prod时获取服务器环境。
文件格式可以参考example/table_sync
目录下的json文件。
修改pom文件里build.finalName
为flink-sql-sync
,
mainClass
为com.kongkongye.flink.sync.sql.SqlSyncJob
。
会打包出flink-sql-sync.jar
mvn clean package -DskipTests
在flink web ui界面上传,参数并行度
填个合适的值,
更高的并行度就同步更快(比如cpu核数),
程序参数填--file <file>
,file是文件的绝对路径,
--env <local/prod>
,默认prod,local时会创建本地带web ui的环境,prod时获取服务器环境。
文件格式可以参考example/sql_sync
目录下的sql文件。
此库发布时,flink最新的版本是1.15.0。
表同步跟sql同步相比,表同步不用占内存,数据只是在flink里过一下,而sql同步占内存,数据都存在flink状态里(配置合适的状态后端也不占内存?)。
如果本地运行,请将pom文件里的runtime.scope
修改为compile
,如果线上运行,则改为provided
。
此库假设的是kafka上存的都是debezium格式的json数据。
放个kafka连接器配置片段参考一下(不相关配置已省略):
{
"name": "sqlserver-orders",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"decimal.handling.mode": "string",
"tombstones.on.delete": "false"
...
}
}
目前支持mysql
与sqlserver
,其他数据库需要自行适配。
在支持程度上,目前只支持部分字段类型,碰到新的字段类型就改代码添加支持。
官方jdbc连接器目前不支持sqlserver,因此复制了连接器mysql支持的源码改了改来支持。
目前没发现问题,如果后面官方jdbc连接器支持sqlserver了,可以把这部分删掉。
json里可以配置converters参数来添加多个转换器,处理逻辑为按顺序进行检测,如果一个转换器转换成功,后续就不再检测其他转换器。
星号(*
)代表匹配任意列名。
这个很简单,用的是flink自带的sql功能。
语法是flink sql的语法,此库做的就是将写在一个文件里的多个sql用----
分隔,然后分别执行。
开始以为有问题,因为不管表同步还是sql同步,读取同一条主键的修改记录的顺序都很重要,而多线程是乱序取的,因此需要设置kafka source并发度为1。
但后来发现指定了消费者组的情况下,同一个key的信息都会由同一个消费者消费,也就是被同一个线程接收走,因此kafka source并发度可以超过1。
- Serializable是否能再优化,让线程切换资源传递少点?