Skip to content

Latest commit

 

History

History
259 lines (204 loc) · 7.26 KB

SqlserverCDC-source.md

File metadata and controls

259 lines (204 loc) · 7.26 KB

SqlserverCDC Source


一、介绍

Sqlservercdc插件支持配置监听表名称以及读取起点读取日志数据。SQLservercdc在checkpoint时保存当前消费的lsn,因此支持续跑。

二、支持版本

SqlServer 2012、2014、2016、2017、2019单机版

三、数据库配置

SqlserverCDC配置

四、SqlserverCDC原理

SqlserverCDC原理

五、插件名称

Sync sqlservercdcreader、sqlservercdcsource
SQL sqlservercdc-x

六、参数说明

1、Sync

  • url
    • 描述:sqlserver数据库的JDBC URL链接
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • username
    • 描述:用户名
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • password
    • 描述:密码
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • tableList
    • 描述: 需要监听的表,如:["schema1.table1","schema1.table2"]
    • 必选:是
    • 字段类型:数组
    • 默认值:无

  • splitUpdate
    • 描述:当数据更新类型为update时,是否将update拆分为两条数据,具体见【六、数据结构说明】
    • 必选:否
    • 字段类型:boolean
    • 默认值:false

  • cat
    • 描述:需要监听的操作数据操作类型,有UPDATE,INSERT,DELETE三种可选,大小写不敏感,多个以,分割
    • 必选:否
    • 字段类型:String
    • 默认值:UPDATE,INSERT,DELETE

  • lsn
    • 描述: 要读取SqlServer CDC日志序列号的开始位置
    • 必选: 否
    • 字段类型:String(00000032:00002038:0005)
    • 默认值:无

  • pollInterval
    • 描述: 监听拉取SqlServer CDC数据库间隔时间,该值越小,采集延迟时间越小,给数据库的访问压力越大
    • 必选:否
    • 字段类型:long(单位毫秒)
    • 默认值:1000

  • pavingData
    • 描述:是否将解析出的json数据拍平,具体见【七、数据结构说明】
    • 必选:否
    • 字段类型:boolean
    • 默认值:false

2、SQL

  • url
    • 描述:sqlserver数据库的JDBC URL链接
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • username
    • 描述:用户名
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • password
    • 描述:密码
    • 必选:是
    • 参数类型:string
    • 默认值:无

  • table
    • 描述:需要解析的数据表。
    • 注意:SQL任务只支持监听单张表,且数据格式为schema.table
    • 必选:否
    • 字段类型:string
    • 默认值:无

  • cat
    • 描述:需要监听的操作数据操作类型,有UPDATE,INSERT,DELETE三种可选,大小写不敏感,多个以,分割
    • 必选:否
    • 字段类型:String
    • 默认值:UPDATE,INSERT,DELETE

  • lsn
    • 描述: 要读取SqlServer CDC日志序列号的开始位置
    • 必选: 否
    • 字段类型:String(00000032:00002038:0005)
    • 默认值:无

  • poll-interval
    • 描述: 监听拉取SqlServer CDC数据库间隔时间,该值越小,采集延迟时间越小,给数据库的访问压力越大
    • 必选:否
    • 字段类型:long(单位毫秒)
    • 默认值:1000

七、数据结构

在2020-01-01 12:30:00(时间戳:1577853000000)执行:

INSERT INTO `tudou`.`kudu`(`id`, `user_id`, `name`) VALUES (1, 1, 'a');

在2020-01-01 12:31:00(时间戳:1577853060000)执行:

DELETE FROM `tudou`.`kudu` WHERE `id` = 1 AND `user_id` = 1 AND `name` = 'a';

在2020-01-01 12:32:00(时间戳:1577853180000)执行:

UPDATE `tudou`.`kudu` SET `id` = 2, `user_id` = 2, `name` = 'b' WHERE `id` = 1 AND `user_id` = 1 AND `name` = 'a';

1、pavingData = true, splitUpdate = false RowData中的数据依次为:

//schema, table, ts, opTime, type, before_id, before_user_id, before_name, after_id, after_user_id, after_name
["tudou", "kudu", 6760525407742726144, 1577853000000, "INSERT", null, null, null, 1, 1, "a"]
["tudou", "kudu", 6760525407742726144, 1577853060000, "DELETE", 1, 1, "a", null, null, null]
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE", 1, 1, "a", 2, 2, "b"]

2、pavingData = false, splitUpdate = false RowData中的数据依次为:

//schema, table, ts, opTime, type, before, after
["tudou", "kudu", 6760525407742726144, 1577853000000, "INSERT", null, {"id":1, "user_id":1, "name":"a"}]
["tudou", "kudu", 6760525407742726144, 1577853060000, "DELETE", {"id":1, "user_id":1, "name":"a"}, null]
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE", {"id":1, "user_id":1, "name":"a"}, {"id":2, "user_id":2, "name":"b"}]

3、pavingData = true, splitUpdate = true RowData中的数据依次为:

//schema, table, ts, opTime, type, before_id, before_user_id, before_name, after_id, after_user_id, after_name
["tudou", "kudu", 6760525407742726144, 1577853000000, "INSERT", null, null, null, 1, 1, "a"]
["tudou", "kudu", 6760525407742726144, 1577853060000, "DELETE", 1, 1, "a", null, null, null]

//schema, table, ts, opTime, type, before_id, before_user_id, before_name
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE_BEFORE", 1, 1, "a"]

//schema, table, ts, opTime, type, after_id, after_user_id, after_name
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE_AFTER", 2, 2, "b"]

4、pavingData = false, splitUpdate = true RowData中的数据依次为:

//schema, table, ts, opTime, type, before, after
["tudou", "kudu", 6760525407742726144, 1577853000000, "INSERT", null, {"id":1, "user_id":1, "name":"a"}]
["tudou", "kudu", 6760525407742726144, 1577853060000, "DELETE", {"id":1, "user_id":1, "name":"a"}, null]
//schema, table, ts, opTime, type, before
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE_BEFORE", {"id":1, "user_id":1, "name":"a"}]
//schema, table, ts, opTime, type, after
["tudou", "kudu", 6760525407742726144, 1577853180000, "UPDATE_AFTER", {"id":2, "user_id":2, "name":"b"}]
  • type:变更类型,INSERT,UPDATE、DELETE
  • opTime:数据库中SQL的执行时间
  • ts:自增ID,不重复,可用于排序,解码后为FlinkX的事件时间,解码规则如下:
long id = Long.parseLong("6760525407742726144");
long res = id >> 22;
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sdf.format(res));		//2021-01-28 19:54:21

八、数据类型

支持 BIT
TINYINT24、INT、INTEGER、FLOAT、DOUBLE、REAL、LONG、BIGINT、DECIMAL、NUMERIC
CHAR、VARCHAR、NCHAR、NVARCHAR、TEXT
DATE、TIME、TIMESTAMP、DATETIME、DATETIME2、SMALLDATETIME
BINARY、VARBINARY
暂不支持 ROWVERSION、UNIQUEIDENTIFIER、CURSOR、TABLE、SQL_VARIANT

九、脚本示例

见项目内flinkx-examples文件夹。