RocketMQ中ACL权限控制
Posted 李晓LOVE向阳
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ中ACL权限控制相关的知识,希望对你有一定的参考价值。
1、什么是ACL?
ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?
- 用户
用户是访问控制的基础要素,也不难理解,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。 - 资源
资源,需要保护的对象,在RocketMQ中,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。 - 权限
针对资源,能进行的操作, - 角色
RocketMQ中,只定义两种角色:是否是管理员。
另外,RocketMQ还支持按照客户端IP进行白名单设置。
2、ACL基本流程图
在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:
对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。
3、如何配置ACL
3.1 acl配置文件
acl默认的配置文件名:plain_acl.yml,需要放在$ROCKETMQ_HOME/store/config目录下。下面对其配置项一一介绍。
3.1.1 globalWhiteRemoteAddresses
全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:
- 空
表示不设置白名单,该条规则默认返回false。 - "*"
表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用。 - 192.168.0.100,101
多地址配置模式,ip地址的最后一组,使用,大括号中多个ip地址,用英文逗号(,)隔开。 - 192.168.1.100,192.168.2.100
直接使用,分隔,配置多个ip地址。 - 192.168..或192.168.100-200.10-20
每个IP段使用 "*" 或"-"表示范围。
3.1.2 accounts
配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
3.1.2.1 accessKey
登录用户名,长度必须大于6个字符。
3.1.2.2 secretKey
登录密码。长度必须大于6个字符。
3.1.2.3 whiteRemoteAddress
用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。
3.1.2.4 admin
boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。
- UPDATE_AND_CREATE_TOPIC
更新或创建主题。 - UPDATE_BROKER_CONFIG
更新Broker配置。 - DELETE_TOPIC_IN_BROKER
删除主题。 - UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
更新或创建订阅组信息。 - DELETE_SUBSCRIPTIONGROUP
删除订阅组信息。
3.1.2.5 defaultTopicPerm
默认topic权限。该值默认为DENY(拒绝)。
3.1.2.6 defaultGroupPerm
默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。
3.1.2.7 topicPerms
设置topic的权限。其类型为数组,其可选择值在下节介绍。
3.1.2.8 groupPerms
设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。
3.2 RocketMQ ACL权限可选值
- DENY
拒绝。 - PUB
拥有发送权限。 - SUB
拥有订阅权限。
3.3、权限验证流程
上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。
4、使用示例
4.1 Broker端安装
首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到$ROCKETMQ_HOME/conf目录。
broker.conf的配置文件如下:
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10915
storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
aclEnable=true
plain_acl.yml文件内容如下:
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- TopicTest=PUB
groupPerms:
# the group should convert to retry topic
- oms_consumer_group=DENY
- accessKey: admin
secretKey: 12345678
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。
4.2 消息发送端示例
public class AclProducer
public static void main(String[] args) throws MQClientException, InterruptedException
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++)
try
Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
catch (Exception e)
e.printStackTrace();
Thread.sleep(1000);
producer.shutdown();
static RPCHook getAclRPCHook()
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
运行效果如图所示:
4.3 消息消费端示例
public class AclConsumer
public static void main(String[] args) throws InterruptedException, MQClientException
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4", getAclRPCHook(),new AllocateMessageQueueAveragely());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context)
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
System.out.printf("Consumer Started.%n");
static RPCHook getAclRPCHook()
return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
发现并不没有消费消息,符合预期。
参考Demo:消费者
@Override
public void run(String... args) throws Exception
System.out.println("通过实现CommandLineRunner接口,在spring boot项目启动后打印参数");
// AccessKey 身份验证
AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey,secretKey));
consumer = new DefaultMQPushConsumer(consumerGroupId, rpcHook,new AllocateMessageQueueAveragely());
//consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
//设置 TCP 接入域名
consumer.setNamesrvAddr(nameserverMQ);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setVipChannelEnabled(false);
consumer.setConsumeTimeout(3);
consumer.setMessageModel(MessageModel.BROADCASTING);//广播订阅方式
//订阅主题和 标签( * 代表所有标签)下信息
String later = "_" + rocketmqAreacode + "_" + hikvisionId;
consumer.subscribe(MQIOTDEV001 + later, MqServerTypeConstant.YCCK);
// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
try
for (Message msg : msgs)
//消费者获取消息 这里只输出 不做后面逻辑处理
String body = new String(msg.getBody(), "utf-8");
log.info("Consumer-获取消息-主题topic为=, 消费消息为=", msg.getTopic(), body);
catch (Exception e)
log.error("0x80517800消费远程操控接口失败"+e.getMessage());
catch (UnsupportedEncodingException e)
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
System.out.println("消费者 启动成功=======");
消费者:
public void sendControl(BaseResponse baseResponse)
try
String later = "_" + rocketmqAreacode + "_" + hikvisionId;
// DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer("my_test_group");
AclClientRPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(remoteProperties.getAccessKey(),remoteProperties.getSecretKey()));
DefaultMQProducer producer = new DefaultMQProducer(producerGroupIOTDEV001, rpcHook);
producer.setNamesrvAddr(nameserverMQ);
producer.setInstanceName(RunTimeUtil.getRocketMqUniqeInstanceName());
producer.start();
Message msg = new Message(
MQIOTDEV001 + later, MqServerTypeConstant.YCCK,
// Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
MqServerTypeConstant.YCCK,
// Message Body
// 任何二进制形式的数据, MQ不做任何干预,
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
JsonUtils.object2Json(baseResponse).getBytes("UTF-8"));
//同步发送消息
producer.send(msg);
// 在callback返回之前即可取得msgId。
// 在应用退出前,销毁Producer对象。注意:如果不销毁也没有问题
log.info("推送反控响应信息到rocketmq:同步发送消息");
producer.shutdown();
catch (Exception e)
log.error("0x80517800推送响应信息到rocketmq失败:"+e.getMessage());
e.printStackTrace();
关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。
以上是关于RocketMQ中ACL权限控制的主要内容,如果未能解决你的问题,请参考以下文章