高效开发——内置connectors
内置connectors
flink 1.11 支持了 3种内置的connectors。
connector | 描述 | 使用场景 |
---|---|---|
'connector'='datagen' | 用于生成随机数据的source | 常用于测试 |
'connector'='blackhole' | 不做任何处理的 sink | 常用于性能测试 |
'connector'='print' | 打印到标准输出流(.out文件)的 sink | 常用于调试 |
在外部 connector 环境还没有 ready 时,用户可以选择 datagen source 和 print sink 快速构建作业熟悉 Flink SQL 对于想要测试 Flink SQL 性能的用户,可以使用 blackhole 作为 sink;对于调试排错场景,print sink 会将计算结果打到标准输出(在“任务页面”Flink web ui 查看Task Managers “Stdout” 选项卡),使得定位问题的成本大大降低。
DataGen
datagen 可以根据指定的数据类型,构造source 表数据,支持指定'number-of-rows' 构造有界流,不指定默认为无界流。支持的数据类型及表创建的参数请参考connector DataGen DataGen 可以单独作为source 使用,进行flink 的功能熟悉。但更多的时候是搭配CREATE TABLE LIKE 语法使用,Mock 数据,进行开发测试,能够极大的提升开发的效率。
使用方式如下:
定义源表表结构
CREATE TABLE random_source (
amount int,
course_code string,
`event_time` TIMESTAMP(3) ,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'sensor',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
使用dataGen mock 数据:
create table datagen_source
with( 'connector' = 'datagen',
'number-of-rows' = '10000') like random_source(EXCLUDING OPTIONS);
--如果使用了元数据方式去定义的表结构,like 后指定的表名,使用数据库.表名即可
--在sql 替换源表为datagen_source
insert xxx select * from datagen_source;
Print sink 会将计算结果打到标准输出(在“任务页面”Flink web ui 查看Task Managers “Stdout” 选项卡)中,在测试过程中,可以不用访问远程数据库 进行查看。支持的数据类型及表创建的参数请参考connector Print 搭配CREATE TABLE LIKE 语法使用能够快速测试数据结果 使用方式如下:
create catalog hive_catalog WITH (
'type' = 'hive',
'default-database'='sloth',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'warehouse'='hdfs://bdms-test/user/sloth/hive_db',
'krb.keytab'='sloth.keytab',
'krb.conf'='krb5.conf',
'krb.principal'='sloth/dev@BDMS.163.COM',
'auth.method'='kerberos',
'sys.db.url'='',
'sys.db.user'='',
'sys.db.password'=''
);
使用print :
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE hive_catalog (EXCLUDING ALL)
如果是使用元数据方式注册的表,使用print 可能存在PROCTIME字段类型不匹配的问题,建议使用ddl 方式注册结果表
BlackHole
BlackHole 类似 linux 系统的 /dev/null .其本质是忽略了sink 对性能的影响,测试整个作业消费能力。支持的数据类型及表创建的参数请参考connector BlackHole .BlackHole 和其他两个内置连接器相似,大多情况下使用create table like 语法进行使用。需要注意的是BlackHole和Print 相同只能用在sink 连接器的定义上。
使用BlackHole:
CREATE TABLE blackhole_table WITH ('connector' = 'blackhole')
LIKE sink_table (EXCLUDING ALL)
关于LIKE 子句的说明
LIKE 子句可以用于定义表的多个部分,不仅仅是 schema 部分。
可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,例如可以向Hive 中定义的表添加 watermark 定义。
可以控制合并的表属性如下:
CONSTRAINTS - 主键和唯一键约束
GENERATED - 计算列
OPTIONS - 连接器信息、格式化方式等配置项
PARTITIONS - 表分区信息
WATERMARKS - watermark 定义
并且有三种不同的表属性合并策略:
INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性。
EXCLUDING - 新表不包含源表指定的任何表属性。
OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。
并且你可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 属性才会被包含进新表。
详情请参考LIKE 语句