RocketMQ 消息过滤
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 消息过滤相关的知识,希望对你有一定的参考价值。
说明
消息过滤的两种方式:Tag和SQL表达式。
生产者:对Message设置Tag、用户属性
消费者:subscribe时指定Tag、SQL表达式
生产端
@Test
public void sendMessage() throws Exception
DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
Message msg = new Message(RocketMqUtil.TOPIC, "filter", "filter-message-1".getBytes(Charset.forName("UTF-8")));
//设置消息的属性
msg.putUserProperty("age", "30");
msg.putUserProperty("name", "张三");
SendResult sendResult = defaultMQProducer.send(msg);
log.info("发送结果:", sendResult.getSendStatus().name());
defaultMQProducer.shutdown();
消费端
@Test
public void consumer() throws Exception
DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
//根据Tag过滤消息
//defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "filter || Tag1");
//根据Tag+SQL表达式过滤消息
defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, MessageSelector.byTag("filter || Tag1").bySql("age > 20 AND name = 'zhangsan'"));
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
msgs.stream().map(MessageExt::getBody).map(String::new).forEach(System.out::println);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
defaultMQPushConsumer.start();
Thread.sleep(5000L);
defaultMQPushConsumer.shutdown();
以上是关于RocketMQ 消息过滤的主要内容,如果未能解决你的问题,请参考以下文章