在SQL 任务中使用kerberos 认证 的Kafka

本文基于flink 1.12 引擎测试

由于有数开发平台集成了kerberos 做集群的安全认证,在任务提交运行过程中,都需要去认证hdfs和yarn. 平台已在flink conf 配置中添加作业运行过程中需要的认证配置,可以在任意任务的flink 运行页面, Job Manager -> Configuration 参数中查看。相关参数请查阅社区参数配置https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#auth-with-external-systems 由于实时计算产品目前不支持多用户的方式进行集群认证,因此kafka 的认证用户 需要和平台配置的认证用户保持一致。目前平台使用sloth 用户登录yarn 和hdfs ,如果需要在平台使用kerberos 认证的kafka ,则需要kafka 管理人员通过ACLs 对作业涉及到的topic、消费者组为sloth用户进行相关授权配置


ACLs授权 相关操作,请参阅kafka 官方文档 https://kafka.apache.org/documentation/#security_authz

授权示例

  1. 查看授权

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -list
  2. 生产者授权

    注意 用户必须指定为sloth

    ./kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka -add --allow-principal User:sloth --producer  --topic test_demo63
  3. 消费者组授权

    注意 用户必须指定为sloth

    /kafka-acls.sh --authorizer-properties zookeeper.connect=bigdata1:2182/kafka-add --allow-principal User:sloth --consumer --group=*  --topic test_demo63

作业配置

flink sql kafka DDL 作业配置示例如下

CREATE TABLE sink_table (
  order_id int,
  order_date VARCHAR,
  customer_name VARCHAR,
  price decimal(10,3),
  product_id int,
  order_status boolean
) WITH (
  'connector' = 'kafka',
  'topic' = 'test_demo63',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'broker1:9092',
  'format' = 'json',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI', 
  'properties.sasl.kerberos.service.name'='kafka'
);

相较于不带kerberos 认证的kafka ,需要增加如下3个参数,无论kafka 作为source 或者sink。 作为sink 时需要注意的是,指定的消费者组必须为sloth 用户做授权配置

  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'GSSAPI', 
  'properties.sasl.kerberos.service.name'='kafka'

同时需要为作业添加一个高级参数配置,以将平台配置的kerberos 认证同时也作为kafka 的认证 参数如下:

security.kerberos.login.contexts KafkaClient

Sloth 版本为384 时,在任务页面右侧的高级参数处添加即可

Sloth 版本为大于3901 时,需要先将任务提交发布,然后在作业配置的,Flink 高级配置处添加

作业完整sql 示例

create table datagen_source(
order_id int,
order_date varchar,
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with(
'connector' = 'datagen',
'rows-per-second' = '1'
);

CREATE TABLE sink_table (
order_id int,
order_date VARCHAR,
customer_name VARCHAR,
price decimal(10,3),
product_id int,
order_status boolean
) WITH (
'connector' = 'kafka',
'topic' = 'test_demo63',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'broker1:9092',
'format' = 'json',
 'properties.security.protocol' = 'SASL_PLAINTEXT',
 'properties.sasl.mechanism' = 'GSSAPI', 
 'properties.sasl.kerberos.service.name'='kafka'
);

insert into sink_table select * from datagen_source;