流表序列化及映射规则
在流表管理中,消息中间件作为实时数仓构建过程中的核心媒介,由于其序列化方式不同,对应数据的解析结果会存在较大差异。本文将围绕通过消息中间件构建流表进行展开说明。
序列化方式
目前,实时计算平台支持json
、csv
、avro
、canal-json
、debezium-json
、maxwell-json
6种序列化方式,下表列出了各序列化方式支持的消息队列数据源类型和配置时的特殊规则。
序列化方式 | 支持的数据源类型 | 主键规则 | 特殊字段类型 |
---|---|---|---|
JSON | Kafka、Pulsar、RocketMQ | 不可设置主键 | Map 的 Key 必须是 String |
CSV | Kafka、Pulsar | 不可设置主键 | 不支持 Map 字段类型;只支持一层嵌套字段 |
Avro | Kafka、Pulsar | 不可设置主键 | - Map 的 Key 必须是 String; - Timestamp 精度需小于等于3; - 字段名称1-128字符,只能包含字母、数字、下划线,以字母开头; |
Canal-JSON | Kafka、Pulsar | 可设置多个主键 | Map 的 Key 必须是 String |
Debezium-JSON | Kafka、Pulsar | 可设置多个主键 | Map 的 Key 必须是 String |
Maxwell-JSON | Kafka、Pulsar | 可设置多个主键 | Map 的 Key 必须是 String |
数据类型映射关系
以Kafka为例,Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 json,csv,avro等。 因此,数据类型映射取决于使用的格式。可以参阅以下表格或Apache Flink Documentation 以获取更多细节。
1. JSON
目前 JSON Schema 将会自动从 Table Schema 之中自动推导得到。不支持显式地定义 JSON Schema。在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。
下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
Flink SQL 类型 | JSON 类型 |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
string with format: date-time (with UTC time zone) |
INTERVAL |
number |
ARRAY |
array |
MAP / MULTISET |
object |
ROW |
object |
2. CSV
目前 CSV 的 Schema 都是从 Table Schema 推断而来的。暂不支持显式地定义 CSV Schema 。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。
下面的表格列出了flink数据和CSV数据的对应关系。
Flink SQL 类型 | CSV 类型 |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
INTERVAL |
number |
ARRAY |
array |
ROW |
object |
3. Avro
目前,Avro Schema 通常是从 Table Schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。
Flink SQL 类型 | Avro 类型 | Avro 逻辑类型 |
---|---|---|
CHAR / VARCHAR / STRING |
string |
|
BOOLEAN |
boolean |
|
BINARY / VARBINARY |
bytes |
|
DECIMAL |
fixed |
decimal |
TINYINT |
int |
|
SMALLINT |
int |
|
INT |
int |
|
BIGINT |
long |
|
FLOAT |
float |
|
DOUBLE |
double |
|
DATE |
int |
date |
TIME |
int |
time-millis |
TIMESTAMP |
long |
timestamp-millis |
ARRAY |
array |
|
MAP (key 必须是 string/char/varchar 类型) |
map |
|
MULTISET (元素必须是 string/char/varchar 类型) |
map |
|
ROW |
record |
4. Canal-JSON
目前,Canal Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。
5. Debezium-JSON
目前,Canal Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。
6. Maxwell-JSON
目前,Maxwell Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。