在SQL任务中使用UDTF
更新时间: 2024-12-04 15:43:10
在SQL任务中使用UDTF
UDTF函数是flink 提供的自定义函数中的一类表值函数,和标量函数不同的是,它可以返回任意多行,返回的行也可以包含1到多列。 要定义一个表值函数,需要扩展flink.table.functions 下的TableFunction,可以实现多个名为eval 的方法对求值进行重载。 详情可查看UTDF
函数开发
以解析原始日志为例,编写UDTF 从原始日志中提取appid,uid,cid等信息。原始日志如下:
2021-11-20 12:34:25,346 48135186 [app-2-4] INFO - {"appid":3,"uid":207051,"consid":"d6eaf66c-f2eb-4fhc-b9f0-c238b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":11,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382865000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}
2020-11-20 12:35:00,346 48135186 [app-2-4] INFO - {"appid":7,"uid":207351,"consid":"d6eaf66c-f2eb-4fbc-b9f0-c298b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":17,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382900000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}
2021-11-20 12:35:49,346 48135186 [app-2-4] INFO - {"appid":1,"uid":207451,"consid":"d4eaf66c-f2eb-4fbc-b9f0-c438b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":18,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382949000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}
pom.xml
<properties>
<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope> <!-- 这个依赖参与编译,不参与运行和打包-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
程序代码:
package com.netease;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;
/**
* @author ZhiYuan
* @date 2021/12/7 10:38 上午
**/
public class UDTFLogSplit extends TableFunction <Tuple3<Long, Integer, Integer>> {
public void eval(String message) {
Tuple3<Long, Integer, Integer> row = new Tuple3<>();
String[] split = message.split("- ");
String json = split[1].substring(0, split[1].length());
JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject();
long uid = jsonObject.get("uid").getAsLong();
int sid = jsonObject.get("sid").getAsInt();
int cid = jsonObject.get("cid").getAsInt();
Tuple3<Long,Integer,Integer> tuple3 = Tuple3.of(uid,sid,cid);
collect(tuple3);
}
}
打包上传
jdk版本使用1.8
使用maven 工具,单击package 对当前项目进行打包即可。注意flink 集群依赖不参与打包
上传函数包到平台,为了分类管理不同业务的函数,可以创建文件夹,并将函数上传到指定的目录位置。
函数创建&注册
在“UDF管理“ 中新增函数
在任务开发中添加新增的函数到函数依赖中
通过create function语句申明使用
create function splitAppLog as 'com.netease.UDTFLogSplit';
SQL 写法
使用lateral table 语句:
insert into sink_print
select
T.uid,
T.sid,
T.cid
from appLog, lateral table(splitAppLog(log)) as T(uid,sid,cid);
使用 left join on true 配合lateral table
insert into sink_print
select
T.uid,
T.sid,
T.cid
from appLog
left join lateral table(splitAppLog(log)) as T(uid,sid,cid) on true;
需要注意的是在程序中如果未指定与字段对应的变量名称,默认按照顺序输出字段。如果出现字段值对应错误问题,请检查字段顺序与程序解析字段顺序。
完整sql:
--Comment: 请输入业务注释信息
--********************************************************************--
create table appLog(
log String
)with(
'connector' = 'kafka',
'topic' = 'app_log',
'properties.bootstrap.servers' = 'xxx:9092',
'properties.group.id' = 'logs_moudle',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'format' = 'raw'
);
create table sink_print(
uid bigint,
sid int,
cid int
)with(
'connector' = 'print'
);
-- 创建函数声明
create function splitAppLog as 'com.netease.UDTFLogSplit';
insert into sink_print
select
T.uid,
T.sid,
T.cid
from appLog
left join lateral table(splitAppLog(log)) as T(uid,sid,cid) on true;
输出结果:
+I(207451,6,18)
+I(207451,6,18)
+I(207451,6,18)
+I(207451,3,11)
+I(207451,6,18)
+I(207451,6,18)