HBase流表
更新时间: 2024-12-04 15:43:48
本文将为您介绍 EasyStream 中如何创建 HBase 类型流表。
操作步骤
- 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
- 单击 创建表 按键,进行表创建。
- 选择 数据源类型 为 HBase。
- 完成剩余表单内容,创建HBase流表。
字段类型映射
HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。
Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes
进行字节数组和 Flink 数据类型转换。
Flink 的 HBase 连接器将所有数据类型(除字符串外)null
值编码成空字节。对于字符串类型,null
值的字面值由null-string-literal
选项值决定。
数据类型映射表如下:
Flink 数据类型 | HBase 转换 |
---|---|
CHAR / VARCHAR / STRING |
byte[] toBytes(String s) String toString(byte[] b) |
BOOLEAN |
byte[] toBytes(boolean b) boolean toBoolean(byte[] b) |
BINARY / VARBINARY |
返回 byte[] 。 |
DECIMAL |
byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) |
TINYINT |
new byte[] { val } bytes[0] // returns first and only byte from bytes |
SMALLINT |
byte[] toBytes(short val) short toShort(byte[] bytes) |
INT |
byte[] toBytes(int val) int toInt(byte[] bytes) |
BIGINT |
byte[] toBytes(long val) long toLong(byte[] bytes) |
FLOAT |
byte[] toBytes(float val) float toFloat(byte[] bytes) |
DOUBLE |
byte[] toBytes(double val) double toDouble(byte[] bytes) |
DATE |
从 1970-01-01 00:00:00 UTC 开始的天数,int 值。 |
TIME |
从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。 |
TIMESTAMP |
从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。 |
ARRAY |
不支持 |
MAP / MULTISET |
不支持 |
ROW |
不支持 |
使用从数据库获取字段功能时,平台会将 rowkey 字段置为 string 类型,将 column family 置为 row 类型,column 字段作为 column family 的二级字段,字段类型置为空。
流表配置
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数值官方默认值 | 参数说明 |
---|---|---|---|---|---|
null-string-literal | 可选 | 源表、目标表 | String | null | 当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。 |
sink.buffer-flush.max-size | 可选 | 目标表 | MemorySize | 2mb | 每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.max-rows | 可选 | 目标表 | Integer | 1000 | 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.interval | 可选 | 目标表 | Duration | 1s | 刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.parallelism | 可选 | 目标表 | Integer | - | 定义 sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
lookup.async | 可选 | 维表 | Boolean | false | 是否启用异步查找。注意:异步方式只支持 hbase-2.2 连接器 |
lookup.cache.max-rows | 可选 | 维表 | Long | -1 | 查找缓存的最大行数,超过这个值,最旧的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的 |
lookup.cache.ttl | 可选 | 维表 | Duration | 0s | 查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。 |
lookup.max-retries | 可选 | 维表 | Integer | 3 | 查找数据库失败时的最大重试次数。 |
lookup.cache.type | 可选 | 维表 | CacheTypeEnum | none | 维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存) |
lookup.cache.metric.enable | 可选 | 维表 | Boolean | false | 是否启用维表缓存指标。默认关闭。 |
is.related.mammunt | 可选 | 源表、目标表 | Boolean | false | 是否关联猛犸。默认为false |
krb.conf | 可选 | 源表、目标表 | String | - | 使用flink-conf配置的krb信息登录 (包括security.kerberos.login.keytab, security.kerberos.login.principal, -Djava.security.krb5.conf) 由于语法检查无法获得上述配置信息,因此需要在connector中配置。如不使用语法检查,可不配置: krb.conf, krb.principal, krb.keytab。 |
krb.principal | 可选 | 源表、目标表 | String | - | krb.principal。带krb认证时必填。 |
krb.keytab | 可选 | 源表、目标表 | String | - | krb.leytab文件。带krb认证时必填。 |
hbase-site | 可选 | 源表、目标表 | String | - | hbase-site 文件名。带krb认证时必填。 |
core-site | 可选 | 源表、目标表 | String | - | core-site 文件名。带krb认证时必填。 |
hdfs-site | 可选 | 源表、目标表 | String | - | hdfs-site 文件名。带krb认证时必填。 |
特殊字段规则
数据源 | 主键 | 特殊字段类型 |
---|---|---|
HBase | 请将HBase表的rowkey(非row类型)定义为主键,column family定义为row类型的一级字段,column qualifier 定义为对应 column family 的嵌套字段。 一级字段不支持 map、array 字段类型;二级字段不支持 map、array、row 字段类型;不支持metadata、计算列字段 |
仅可设置两个字段,第一个为非嵌套类型且为主键,第二个为row类型且子字段不可为嵌套类型 |