使用Easystream 实现网站UV、PV、转化率指标的实时统计

  • 网站的独立访客数量 UV
  • 网站商品页面的单击量 PV
  • 转化率(转化率 = 成交次数 / 单击量)

测试数据

测试数据需要直接打入Kafka去任务开发中使用。

# 购买记录
{
  "record_type":0,  # 0 表示浏览记录
  "user_id": 6, 
  "client_ip": "100.0.0.6", 
  "product_id": 101, 
  "create_time": "2021-12-06 16:00:00"
}

{
  "record_type":1, # 1 表示购买记录
  "user_id": 6, 
  "client_ip": "100.0.0.8", 
  "product_id": 101, 
  "create_time": "2021-12-08 18:00:00"
}

定义source 表

Source表定义可通过流表注册、SQL代码注册两种方式进行。

流表注册

  1. 数仓中选择数据源管理,单击登记数据源按键。
  2. 在弹出的新建数据源窗口中选择数据源类型为“Kafka”,并根据必选配置信息进行连接。
  3. 确认连接成功后,进入流表管理,单击创建逻辑数据库与数据表。在数据表中维护具体的Kafka字段及 'Connector With' 部分参数。
  4. 完成流表注册后,可在SQL代码中添加Source段,选各种已创建的流表进行开发使用。

SQL代码

直接在SQL任务中使用 create table 语法进行表创建。

CREATE TABLE `input_web_record` (
    `record_type`    INT,
    `user_id`        INT,
    `client_ip`      VARCHAR,
    `product_id`     INT,
    `create_time`    TIMESTAMP,
    `times`          AS create_time,
    WATERMARK FOR times AS times - INTERVAL '10' MINUTE 
 ) WITH (
     'connector' = 'kafka',
     'topic' = 'uvpv_log',  
     'scan.startup.mode' = 'earliest-offset', 
     'properties.bootstrap.servers' = 'xxx:9092',  
     'properties.group.id' = 'WebRecordGroup',  -- 必选参数, 一定要指定 Group ID
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',       -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'     -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );

定义sink

Sink表创建与Source表一致,只是在具体开发过程中由SQL写法决定其用途。

-- UV sink
为了方便测试使用print 连接进行演示,实际开发使用Easystream处理后可以在支持更新输出库通过主键更新达到了对同一访客的数据去重的目的。以实际业务需求选择对应的sink connector
CREATE TABLE `output_uv` (  
 `userids`      STRING,
  uv_c          BIGINT
) WITH (
  'connector' = 'print'
);

-- PV sink
CREATE TABLE `output_pv` (  
 `pagevisits`       STRING,
 `product_id`       STRING,
 `hour_count`       BIGINT
) WITH (
  'connector' = 'print'
);

-- 转化率 sink
CREATE TABLE `output_conversion_rate` (  
 `conversion_rate`  STRING,
 `rate`             STRING
) WITH (
  'connector' = 'print'
);

SQL 实现

在SQL任务中,单独添加SQL段,并添加如下代码内容:

-- 加工得到 UV 指标,统计所有一天内的 UV
INSERT INTO output_uv 
SELECT 
 'uv' as userids,
  count(distinct user_id)  AS uv_c 
FROM input_web_record 
group by TUMBLE(times, INTERVAL '24' HOUR) ;

-- 加工并得到 PV 指标,统计每 10 分钟内的 PV
INSERT INTO output_pv 
SELECT 
  'pagevisits'               AS `pagevisits`, 
  CAST(product_id AS string) AS product_id, 
  SUM(product_id) AS hour_count
FROM input_web_record WHERE record_type = 0 
GROUP BY 
  HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 
  product_id, 
  user_id;

-- 加工并得到转化率指标,统计每 10 分钟内的转化率
INSERT INTO output_conversion_rate 
SELECT 
  'conversion_rate' AS `conversion_rate`, 
  CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string) 
FROM (SELECT * FROM input_web_record where record_type = 1) AS a
GROUP BY  
  HOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), 
  product_id;

任务提交并发布

  1. 完成代码开发后,单击调试对代码语法进行校验,并通过采样数据集进行逻辑验证。
  2. 确认无误后,单击提交并发布,将代码发布至任务运维页面。
  3. 在任务运维列表,选择对应任务单击配置按键,为任务分配对应的集群及队列信息,用户也可以在这一步进行高级参数的配置。
  4. 完成所有配置后,单击保存并运行,任务即可正常启动。