CDC 任务是Easystream用于从不同的数据源库摄取数据变更作业的任务类型。目前支持页面向导模式配置,底层整合NDC订阅能力基于Flink CDC 2.0版本研发,支持多种异构数据源的变更数据捕获,同时也支持历史数据的全量导入。

使用限制

  • 引擎版本:Flink 1.13。
  • 在LTS7.0-Update1.1版本(对应实时计算-v3.9.17版本)更新后, CDC 任务中源端为 MySQL、Oracle、Kafka,目标端为Kafka的创建入口关闭,用户需前往数据传输-实时同步任务模块创建对应数据源类型的任务。

操作步骤

1. 创建CDC作业

  1. 登录 EasyStream实时计算平台
  2. 快捷导航栏 选择 实时开发 ,若 快捷导航栏 中无相关选项可参考 快速导航栏 进行操作。
  3. 切换至目标 工作空间
  4. 单击 新建CDC任务 ,在弹出窗口中填写任务配置信息。
  5. 单击 确定

2.配置Source

以下配置操作将以MySQl为例,其他数据源类型操作过程相似,具体配置不同可参考 CDC任务数据源配置说明

  1. 选择CDC任务Source端右上角,单击下拉列表,切换CDC Source端数据源类型。
    choose_datasource

  2. 根据表单内容完成 CDC Source 部分配置。

表单参数 说明
查找库表 - 库表选择:通过手动方式选择采集库表,支持多选;
- 正则匹配:通过输入正则表达式匹配指定数据源和数据库下的批量数据表。
订阅类型 支持采集 插入(Insert)、更新(Update)、删除(Delete) 操作数据,默认全部勾选。
增量读取方式 - 日志读取:CDC任务默认采用日志采集的方式进行实时数据采集,根据数据源类型不同各有差异,比如MySQL binlog日志,Oracle Logminer日志等。
- 间隔轮询:支持用户对间隔轮询周期、数据量进行设置从而控制采集速度,支持手动设定间隔轮询传输位点。
传输起始位点 - 最新数据:当任务启动时开始进行数据捕获。
- 全量初始化:先全量读取表数据,读取完成后从自动切换任务,开始从任务启动时间记录的点位进行变更数据捕获。
说明:全量初始化时采集数据与最新数据会存在一定延迟,资源充足的情况下会逐渐恢复。
流量限制 默认不限制,如果源端库存在业务压力,可以配置适当限制,缓解源库压力。
读取并行度 除 MySQL、Kafka 数据源外,其余数据源目前均为单并行度读取数据(包括全量初始化过程)。

3. 配置sink

单击sink页面右上角下拉框选择相应的数据源,以mysql 为例。 在此之前必须提前在数据源管理处引入数据源,kafka数据源引入后,可以在数据库创建数仓表,引入时直接选择topic 对应的数仓表即可。详情请参考数仓管理

- 目标数据源  
选择需要导入的sql server 数据源名称
- 导入方式  
支持单表和多表导入,选择单表导入只需要选择相应的库表即可,多表导入,需要为源端的每一张表选择对应的目标端库表,或者通过“批量设置”进行配置,目前批量设置只支持导入到同一个库表中,如果是分库分表,在批量应用后,还需要手动修改对应库表。
- 写入方式  
支持“insert”和“upsert”更新写入,注意更新写入必须配置主键,否则相当于“insert
- 异常处理  
为了防止脏数据导致任务失败,提供了忽略错误记录的选项,可以根据实际需求进行配置
- 批量写入   
对数据进行分批插入,建议开启,同时配置批量的写入规则
- 写入并行度  
当前所有sink 端数据源默认均支持多并行度写入,可以按需求调整作业运行配置的并行数量  
![](/documents/uploads/projects/easystream-v4.8.0/202201/16c8cad20d3dee00.png)
  1. 字段映射
    CDC 作业会根据sink 和souce 的配置生成对应的字段映射关系。默认按照字段名称映射,如果源端与目标表字段名称不一致,源端字段默认为不导入。如果不符合按照字段名称映射的需求,需要手动调整字段映射关系,否则可能出现字段未同步情况。 如果有需求对输出字段进行调整,可以在源端表字段进行手动指定。新增了不导入字段以及自定义表达式,支持在同步过程中对数据进行转换。

    • 不导入
      选择不导入,该字段内容不会写入结果表,结果表字段输出为字段默认值或null
    • 自定义表达式
      CDC 作业增加了自定义表达式功能,可以对字段使用函数进行数据转换。目前支持Flink 的标量(一进一出)内置函数,暂时不支持自定义函数。配置方式选择自定义表达式,书写函数即可。
      如图:
  2. 运行配置检查
    CDC 作业实际运行的是Flink 作业,可以按需对任务资源进行调整配置,以达到期望的性能要求。 作业资源配置可参考Flink 作业推荐配置

  3. 保存并启动


OGG配置

若企业已有OGG(Oracle GoldenGate)产品,平台支持基于OGG实现对应实时采集链路。此方案要求业务方主动将OGG采集任务启动并写入到目标Kafka Topic,Flink CDC 将会从 Kafka中以OGG标准格式对数据内容进行解析,并转写至目标端。

在CDC任务中,选择Source端为Oracle时,用户选择增量读取方式为OGG

此时需要选择一个已经注册在项目中的消息中间件(目前仅支持Kafka)作为数据来源。手动输入Topic名称时,当输入已存在的Topic名称时任务可正常执行;若输入的Topic不存在,系统将自动创建Topic并执行任务。Topic输入名称对大小写敏感。