RocketMQ ACL使用指南

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ ACL使用指南相关的知识,希望对你有一定的参考价值。

参考技术A ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

另外,RocketMQ还支持按照客户端IP进行白名单设置。

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:

对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。

acl默认的配置文件名:plain_acl.yml,需要放在$ROCKETMQ_HOME/store/config目录下。下面对其配置项一一介绍。

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

登录用户名,长度必须大于6个字符。

登录密码。长度必须大于6个字符。

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

默认topic权限。该值默认为DENY(拒绝)。

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

设置topic的权限。其类型为数组,其可选择值在下节介绍。

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到$ROCKETMQ_HOME/conf目录。

broker.conf的配置文件如下:

plain_acl.yml文件内容如下:

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

运行效果如图所示:

发现并不没有消费消息,符合预期。

关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。

推荐阅读:
1、 RocketMQ实战:生产环境中,autoCreateTopicEnable为什么不能设置为true

2、 RocketMQ 消息发送system busy、broker busy原因分析与解决方案

3、 RocketMQ HA机制(主从同步)

4、 RocketMQ事务消息实战

配置RocketMQ ACL权限

==环境==

系统:Linux Centos7.2

RocketMQ版本:4.6.1

 

==集群形态==

技术图片

 

==修改前配置文件==

broker-a.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=192.168.29.100
namesrvAddr=192.168.29.100:9876;192.168.29.101:9876
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/data/rocketmq/rootdir-a-m
storePathCommitLog=/home/data/rocketmq/commitlog-a-m
storePathConsumerQueue=/home/data/rocketmq/consumequeue-a-m
storePathIndex=/home/data/rocketmq/index-a-m
storeCheckpoint=/home/data/rocketmq/checkpoint-a-m

broker-a-s.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=192.168.29.101
namesrvAddr=192.168.29.100:9876;192.168.29.101:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/data/rocketmq/rootdir-a-s
storePathCommitLog=/home/data/rocketmq/commitlog-a-s
storePathConsumerQueue=/home/data/rocketmq/consumequeue-a-s
storePathIndex=/home/data/rocketmq/index-a-s
storeCheckpoint=/home/data/rocketmq/checkpoint-a-s

broker-b.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=192.168.29.101
namesrvAddr=192.168.29.100:9876;192.168.29.101:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/data/rocketmq/rootdir-b-m
storePathCommitLog=/home/data/rocketmq/commitlog-b-m
storePathConsumerQueue=/home/data/rocketmq/consumequeue-b-m
storePathIndex=/home/data/rocketmq/index-b-m
storeCheckpoint=/home/data/rocketmq/checkpoint-b-m

broker-b-s.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=192.168.29.102
namesrvAddr=192.168.29.100:9876;192.168.29.101:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/data/rocketmq/rootdir-b-s
storePathCommitLog=/home/data/rocketmq/commitlog-b-s
storePathConsumerQueue=/home/data/rocketmq/consumequeue-b-s
storePathIndex=/home/data/rocketmq/index-b-s
storeCheckpoint=/home/data/rocketmq/checkpoint-b-s

 

==增加ACL权限==

1. 在Broker文件中增加aclEnable=true的配置

如下图

技术图片

 

2.重新启动集群

[1][2]nohup sh /home/rocketmq-4.6.1/bin/mqnamesrv >/home/rocketmq-4.6.1/logs/mqnamesrv.log &
[1]nohup sh /home/rocketmq-4.6.1/bin/mqbroker -c /home/rocketmq-4.6.1/conf/rexel/broker-a.properties >/home/rocketmq-4.6.1/logs/broker-a-m.log 2>&1 &
[2]nohup sh /home/rocketmq-4.6.1/bin/mqbroker -c /home/rocketmq-4.6.1/conf/rexel/broker-a-s.properties >/home/rocketmq-4.6.1/logs/broker-a-s.log 2>&1 &
[2]nohup sh /home/rocketmq-4.6.1/bin/mqbroker -c /home/rocketmq-4.6.1/conf/rexel/broker-b.properties >/home/rocketmq-4.6.1/logs/broker-b-m.log 2>&1 &
[3]nohup sh /home/rocketmq-4.6.1/bin/mqbroker -c /home/rocketmq-4.6.1/conf/rexel/broker-b-s.properties >/home/rocketmq-4.6.1/logs/broker-b-s.log 2>&1 &

 

这个时候通过集群查看命令已经会提示acl权限错误了,证明权限已经生效。

命令:sh /home/rocketmq-4.6.1/bin/mqadmin clusterList -n "192.168.29.100:9876;192.168.29.101:9876"

错误日志:

org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1  DESC: org.apache.rocketmq.acl.common.AclException: Check signature failed for accessKey=rocketmq2, org.apache.rocketmq.acl.plain.PlainPermissionManager.validate(PlainPermissionManager.java:410)
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerRuntimeInfo(MQClientAPIImpl.java:1288)
    at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.fetchBrokerRuntimeStats(DefaultMQAdminExtImpl.java:266)
    at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.fetchBrokerRuntimeStats(DefaultMQAdminExt.java:231)
    at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.printClusterBaseInfo(ClusterListSubCommand.java:212)
    at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.execute(ClusterListSubCommand.java:88)
    at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:139)
    at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:90)

 

3.配置权限

文件路径:/home/rocketmq-4.6.1/conf/plain_acl.yml

网上给的例子:

globalWhiteRemoteAddresses:
- 10.10.15.*
- 192.168.0.*
 
accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - topicA=DENY
  - topicB=PUB|SUB
  - topicC=SUB
  groupPerms:
  # the group should convert to retry topic
  - groupA=DENY
  - groupB=PUB|SUB
  - groupC=SUB
 
- accessKey: rocketmq2
  secretKey: 12345678
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true

 

plain_acl.yml文件中相关的参数含义及使用

字段取值含义
globalWhiteRemoteAddresses *;192.168.*.*;192.168.0.1 全局IP白名单
accessKey 字符串 Access Key 用户名
secretKey 字符串 Secret Key 密码
whiteRemoteAddress *;192.168.*.*;192.168.0.1 用户IP白名单
admin true;false 是否管理员账户
defaultTopicPerm DENY;PUB;SUB;PUB|SUB 默认的Topic权限
defaultGroupPerm DENY;PUB;SUB;PUB|SUB 默认的ConsumerGroup权限
topicPerms topic=权限 各个Topic的权限
groupPerms group=权限 各个ConsumerGroup的权限

权限标识符的含义

权限含义
DENY 拒绝
ANY PUB 或者 SUB 权限
PUB 发送权限
SUB 订阅权限

 

我的配置。配置完成之后不需要重启。

globalWhiteRemoteAddresses:
- 192.168.29.100
- 192.168.29.101
- 192.168.29.102

accounts:
- accessKey: rexel_developer
  secretKey: 19@ljWo2iUow
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - rexel_notice=PUB|SUB
  groupPerms:
  - rexel_notice_g1=SUB
  - rexel_notice_p1=PUB

- accessKey: rexel_admin
  secretKey: 98&UIwowu@9o
  whiteRemoteAddress:
  admin: true

 

==编写验证程序==

增加acl的maven依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.6.1</version>
</dependency>

 

生产者代码:

package acl;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class AclProducer {
    public static void main(String[] args)
        throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("rexel_notice_p1", getAclRPCHook());
        producer.setNamesrvAddr("192.168.29.100:9876;192.168.29.101:9876");
        producer.start();
        Message msg = new Message("rexel_notice" ,"*" , ("Hello RocketMQ ").getBytes());
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rexel_developer","19@ljWo2iUow"));
    }
}

 

消费者代码:

package acl;

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class AclConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "rexel_notice_g1", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("rexel_notice", "*");
        consumer.setNamesrvAddr("192.168.29.100:9876;192.168.29.101:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("rexel_developer","19@ljWo2iUow"));
    }
}

 

最后可以看到消费者可以正常消费的日志:

技术图片

 

==补充==

增加了权限之后,貌似就没有办法通过控制台命令上创建topic了。

我目前是通过rocketmq-console来进行Topic及ConsumerGroup管理的。

 

以上是关于RocketMQ ACL使用指南的主要内容,如果未能解决你的问题,请参考以下文章

docker安装rocketmq 开通acl鉴权 rocketmq-dashboard

docker安装rocketmq 开通acl鉴权 rocketmq-dashboard

RocketMQ ACL版本升级过程中的曲折经历(大厂线上环境大规模MQ升级开启ACL实战)

配置RocketMQ ACL权限

RocketMQ中ACL权限控制

RocketMQ中ACL权限控制