CDC任务数据源配置说明
更新时间: 2024-12-04 15:44:24
CDC 任务数据源配置说明
数据源自定义参数配置
数据源类型 | 作用域 | 自定义参数配置说明 |
---|---|---|
MySQL/Oracle/SQLServer/PostgreSQL/db2 | source | 基于flink-cdc datastream api实现,自定义参数可以填写对应数据源的debezium相关配置参数 |
kafka | source | 基于flink kafka datastream api实现,自定义参数可以填写kafka官方原生的参数配置 |
MySQL/Oracle/SQLServer | sink | 基于flink sql jdbc连接器实现,自定义参数可以填写flink sql jdbc连接器配置参数 |
kafka | sink | 基于flink kafka datastream api实现,自定义参数可以填写kafka官方原生的配置参数 |
流表 | sink | 基于flink kafka sql api实现,自定义参数可以参考flink sql kafka连接器的配置参数 |
作用域 source 是指来源端数据源,sink是指去向端数据源
额外新增自定义参数说明
自定义参数名称 | 默认值 | 支持版本 | 作用域 | 数据源类型 | 描述 |
---|---|---|---|---|---|
useSid | true | 2.0.0+ | source | Oracle | true表示oracle使用sid,false表示oracle使用service name |
server-time-zone | utc | 2.0.0+ | source | MySQL | mysql的timestamp字段类型存储是的UTC时间,会出现时区问题,需要配置为'Asia/Shanghai' |
fields.convert-tinyint-one-boolean.enabled | true | 2.1.5+ | source/sink | MySQL | true时候表示mysql的tinyint(1)类型转为flink的boolean类型, false时候表示mysql的tinyint(1)类型转为flink的tinyint类型 |
json.ignore-parse-errors | false | 2.1.7+ | source | kafka以及增量读取方式为ogg的Oracle、SQLServer | 默认false不忽略kafka消息解析错误并抛出异常终止程序异常信息中带有消息明细,设置为true会忽略该消息 |
json.parse-errors.print | false | 2.1.7+ | source | kafka以及增量读取方式为ogg的Oracle、SQLServer | false表示kafka消息出现解析异常被忽略的时候不会打印消息日志,设置为true息出现解析异常被忽略的时候会打印该消息日志级别为warn |
json.ignore-key-null | false | 2.1.7+ | source | 增量读取方式为ogg的Oracle、SQLServer | 默认false不忽略kafka消息中主键key为null并会抛出异常并打印主键为null的消息,设置为true会忽略该消息 |
json.key-null.print | false | 2.1.7+ | source | 增量读取方式为ogg的Oracle、SQLServer | 默认false不忽略kafka消息中主键key为null并会抛出异常并打印主键为null的消息,设置为true会忽略该消息 |
database.schema.ignore-unique-key | false | 2.1.7+ | source | Oracle | 默认false任务初始化schema时候不忽略唯一索引作为key,为true的时候忽略唯一索引作为key,解决oracle表无主键且联合唯一索引首个字段为函数索引报错问题。 |
库名.表名.primary.key | 无 | 2.1.7+ | sink | MySQL/Oracle/SQLServer | 例如schema.table.primary.key=a,b,c 用于定义目标端逻辑主键,目标端表无主键且只有唯一索引可以通过自定义参数用唯一索引列作为逻辑主键 |
json.timestamp-format.standard | RFC3339 | 2.1.8+ | sink | kafka | timestamp类型字段JSON序列化方式,默认RFC3339可选项SQL\ISO_8601\RFC3339,例如RFC3339序列化为"2022-01-10T10:30:41Z",SQL序列化为"2022-01-10 10:30:41",ISO_8601序列化为"2022-01-10T10:30:41" |
json.key.is-table-name | true | 2.1.9+ | source | oracle | ture表示ogg发往kafka中的key为表名称,false表示可以为非表名称 |
json.fields.event-ts.enabled | false | 2.1.9+ | source | mysql/oracle | 默认false表示不包含事件时间,true表示包含事件时间 |
fields.type.auto-conversion.enabled | false | 2.1.9+ | source | all | 默认false表示不自动将source字段转换成sink字段类型, true表示自动转换字段类型 |
ogg-json.downstream-key.is-table-name-pk | false | 2.1.12+ | source | Oracle/SQLServer | 源端为oggjson 目标端为kakfa时候,是否指定目标端kafka的key为表名称+主键,false表示不是,true表示是 |
其他补充说明
流表选择
source为"kafka"或则sink端为"流表"表示流表,流表的列表下拉框中定义的流表名称仅仅显示canal-json、debezium-json、maxwell-json三种序列化类型,使用的时候需要注意
kafka 数据源作为cdc 目标表时
kafka 对应的表序列化方式只支持选择debezium-json、canal-json、 maxwell-json这3种之一。 这3种不同的序列化方式输出的数据结构各不相同。具体如下,请根据数据使用情况进行选择。
debezium-json
insert
{"before":null,"after":{"id":22,"name":"opt1","age":null},"op":"c"}
update
{"before":{"id":22,"name":"opt1","age":null},"after":null,"op":"d"} {"before":null,"after":{"id":22,"name":"opt1123","age":null},"op":"c"}
delete
{"before":{"id":22,"name":"opt1123","age":null},"after":null,"op":"d"}
canal-json
insert
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"INSERT"}
update
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"DELETE"} {"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"INSERT"}
delete
{"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"DELETE"}
maxwell-json
insert
{"data":{"id":6,"salary":88.8,"gmt_created":"2015-01-01 11:15:10"},"type":"insert"}
update
{"data":{"id":6,"salary":88.8,"gmt_created":"2015-01-01 11:15:10"},"type":"delete"} {"data":{"id":6,"salary":111.8,"gmt_created":"2015-01-01 11:15:10"},"type":"insert"}
delete
{"data":{"id":6,"salary":111.8,"gmt_created":"2015-01-01 11:15:10"},"type":"delete"}
MySQL 作为cdc 源表
- mysql 表字段timestamp 类型时区问题
如果出现时区问题需要在自定义参数里 额外增加参数'server-time-zone' = 'Asia/Shanghai'