RocketMQ 消息过滤

Posted 乐观男孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 消息过滤相关的知识,希望对你有一定的参考价值。

说明

消息过滤的两种方式:Tag和SQL表达式。
生产者:对Message设置Tag、用户属性
消费者:subscribe时指定Tag、SQL表达式

生产端

@Test
    public void sendMessage() throws Exception 
        DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
        Message msg = new Message(RocketMqUtil.TOPIC, "filter", "filter-message-1".getBytes(Charset.forName("UTF-8")));
        //设置消息的属性
        msg.putUserProperty("age", "30");
        msg.putUserProperty("name", "张三");
        SendResult sendResult = defaultMQProducer.send(msg);
        log.info("发送结果:", sendResult.getSendStatus().name());
        defaultMQProducer.shutdown();
    

消费端

@Test
    public void consumer() throws Exception 
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        //根据Tag过滤消息
        //defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "filter || Tag1");
        //根据Tag+SQL表达式过滤消息
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, MessageSelector.byTag("filter || Tag1").bySql("age > 20 AND name = 'zhangsan'"));
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                msgs.stream().map(MessageExt::getBody).map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
        defaultMQPushConsumer.shutdown();
    

以上是关于RocketMQ 消息过滤的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 消息详解

RocketMQ系列批量发送与过滤

RocketMQ 消息过滤

RocketMQ 消息过滤

RocketMQ 消息过滤

12 SpringBoot整合RocketMQ实现过滤消息-根据TAG方式过滤消息