在sql 任务中使用复杂嵌套JSON数据

本文基于flink 1.12 引擎

在日常开发工作中,json 格式数据属于最常用的一类数据格式。通常想要表达复杂的数据关系,json 的结构也会变得异常复杂。flink 提供了json format 解析复杂json的数据类型,能够通过定义schema,达到直接在sql 中随意取用字段的目的。本文将通过一个样例数据,演示如何在 DDL 和 metahub 两种方式下定义 Map、Array、Row 类型的数据,以及在 SQL 开发中如何获取这些字段

样例数据:

{
  "data": {
    "snapshots": [{
      "content_type": "application/x-gzip-compressed-jpeg",
      "url": "https://study.sf.163.com/documents/read/sloth-manual-v3/Content.md"
    }],
    "documents": [{
      "content_type": "documents/read",
      "url": "https://study.sf.163.com/documents/read/sloth-manual-v3/Bast_case.md"
    }]
  },
  "resultMap": {
    "result": {
      "cover": "/data/test/log.txt"
    },
    "isSuccess": true
  },
  "meta": {
    "book_type": "normal"
  },
  "type": 2,
  "timestamp": 1646616632840,
  "arr": [{
    "address": "北京市海淀区",
    "city": "beijing"
  }, {
    "address": "浙江省杭州市",
    "city": "hangzhou"
  }, {
    "address": "安徽省合肥市",
    "city": "hefei"
  }],
  "map": {
    "flink": 456
  },
  "doublemap": {
    "inner_map": {
      "key": 123
    }
  }
}

Flink JSON格式使用jackson databind API解析和生成JSON字符串. 从Flink类型到JSON类型的映射如下表所示。

Flink SQL type JSON type
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object
DDL 定义如下表结构
create table documents_source(
    data ROW<snapshots ARRAY<ROW<content_type string,url string>>,documents ARRAY<ROW<content_type string,url string>>>,
    resultMap ROW<result MAP<string,string> ,isSuccess boolean>,
    meta MAP<string,string>,
    type int,
    `timestamp` bigint,
    arr ARRAY<ROW<address string,city>>,
    map MAP<string,int>,
    doublemap MAP<string,map<string,int>>
)with(
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'test',  -- kafka topic
    'properties.bootstrap.servers' = 'localhost:9092',  -- broker连接信息
    'properties.group.id' = 'documents_json', -- 消费kafkagroup_id
    'scan.startup.mode' = 'latest-offset',  -- 读取数据的位置
    'format' = 'json',  -- 数据源格式为 json
    'json.fail-on-missing-field' = 'true', -- 字段丢失任务不失败
    'json.ignore-parse-errors' = 'false'  -- 解析失败跳过
);
Metahub 定义如下表结构


任务sql

insert into print_table
select
doublemap['inner_map']['key'],
count(data.snapshots[1].url),
`type`,
TUMBLE_START(proctime, INTERVAL '30' second) as t_start
from documents_source
group by TUMBLE(proctime, INTERVAL '30' second),funcName,`type`,doublemap['inner_map']['key']

关键信息

  • Json 中的每个 {} 都需要用 Row 类型来表示
  • Json 中的每个 [] 都需要用 Arrary 类型来表示
  • 数组的下标是从 1 开始的不是 0 如上面 SQL 中的 data.snapshots[1].url
  • 关键字在任何地方都需要加反引号 如上面 SQL 中的 type
  • UDF以及官方的内置函数可以直接在建表语句中使用或者使用计算列进行转换