Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子相关的知识,希望对你有一定的参考价值。
1. 声明
当前内容主要为本人学习Apache RocketMQ的使用,主要参考官方文档
主要包括:
- 批量发送Message
- 设置DelayTime(不能再批量发送的时候使用)
- 设置用户属性
- 开启SQL过滤
2. 开启SQL过滤
如果使用了SQL过滤但当前的broker开启SQL过滤就会导致消费者连接时报错:The broker does not support consumer to filter message by SQL92
,经过查询找到需要在broker.conf中设置一个属性enablePropertyFilter=true
1. 手动关闭broker和namesrv(使用mqshutdown方式)
2.找到broker.conf文件(位于conf文件夹中)
3.添加以下内容并保存:
4.重新启动namesrv和broker,注意:这里的broker启动方式必须修改为该配置文件,否则采用默认属性(不会有enablePropertyFilter)
./mqbroker -c ../conf/broker.conf -n localhost:9876
其中-c指明当前broker使用的配置文件
此时使用带有SQL32的过滤的消费者就不会报错了
3. 基本demo
消费者:
public class ScheduledAndBatchFilterMessageConsumer
public static void main(String[] args) throws Exception
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
consumer.setNamesrvAddr("192.168.1.102:9876");
// Subscribe topics
// 使用消息选择器方式进行过滤消息(按照手动设置的属性方式)
// The broker does not support consumer to filter message by SQL92,需要在broker.conf文件中添加enablePropertyFilter = true
consumer.subscribe("TestTopic", /*"*"*/MessageSelector.bySql("index > 50 and index <80"));
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context)
for (MessageExt message : messages)
// Print approximate delay time period
try
System.out.println("Receive message[msg=" + new String(message.getBody(),RemotingHelper.DEFAULT_CHARSET).intern() + "] "
+ (message.getStoreTimestamp() - message.getBornTimestamp() ) + "ms later"+(System.currentTimeMillis() - message.getStoreTimestamp()));
catch (UnsupportedEncodingException e)
// TODO Auto-generated catch block
e.printStackTrace();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// Launch consumer
consumer.start();
System.out.println("comsumer start");
生产者:
public class ScheduledAndBatchFilterMessageProducer
public static void main(String[] args) throws Exception
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("192.168.1.102:9876");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
List<Message> messages=new ArrayList<>(totalMessagesToSend);
for (int i = 0; i < totalMessagesToSend; i++)
Message message = new Message("TestTopic", ("Hello scheduled message index=" + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
// 设置在10秒之后才发送给消费者
// message.setDelayTimeLevel(3); 批量发送方式不支持DelayTime
// 设置用户属性
message.putUserProperty("index",String.valueOf(i));
// Send the message
messages.add(message);
// 使用批量方式发送消息
producer.send(messages);
// Shutdown producer after use.
producer.shutdown();
System.out.println("send message finish");
测试结果:
测试成功
4. 总结
1.批量发送主要为使用producer的时候发送集合数据即可(不可和设置DelayTime一起使用)
2.设置用户属性为:message.putUserProperty("index",String.valueOf(i));
,可以手动添加设置属性
3.SQL过滤,主要为消费者那边使用MessageSelector.bySql("index > 50 and index <80")
,主要为使用用户设置的属性的表达式
以上是关于Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子的主要内容,如果未能解决你的问题,请参考以下文章
Apache RocketMQ:理解官方的Broadcasting的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子