13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息
Posted java1234_小锋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息相关的知识,希望对你有一定的参考价值。
SQL表达式方式可以根据发送消息时输入的属性进行一些计算。
RocketMQ的SQL表达式语法 只定义了一些基本的语法功能。
数字比较,如>,>=,<,<=,BETWEEN,=;
字符比较,如:=,<>,IN;
IS NULL or IS NOT NULL;
逻辑运算符:AND, OR, NOT;
常量类型:
数值,如:123, 3.1415;
字符, 如:‘abc’, 必须使用单引号;
NULL,特殊常量
Boolean, TRUE or FALSE;
上实例,发送三个消息,分别带上不同的header头信息;
/**
* 发送SQL表达式头信息消息,测试根据SQL表达式过滤消息
*/
public void sendMessageWithSQL(){
// 构造消息1
Message msg1 = MessageBuilder.withPayload("rocketmq过滤消息测试01").build();
Map<String, Object> headers = new HashMap<>() ;
headers.put("type", "pay") ;
headers.put("a", 10) ;
rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg1, headers);
// 构造消息2
Message msg2 = MessageBuilder.withPayload("rocketmq过滤消息测试02").build();
Map<String, Object> headers2 = new HashMap<>() ;
headers2.put("type", "store") ;
headers2.put("a", 4) ;
rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg2, headers2);
// 构造消息3
Message msg3 = MessageBuilder.withPayload("rocketmq过滤消息测试03").build();
Map<String, Object> headers3 = new HashMap<>() ;
headers3.put("type", "user") ;
headers3.put("a", 7) ;
rocketMQTemplate.convertAndSend("java1234-filter-rocketmq", msg3, headers3);
}
消费者端,selectorExpression = “type=‘user’ or a <7”,selectorType = SelectorType.SQL92 ,指定selectorType 以及设置表达式selectorExpression
@RocketMQMessageListener(topic = "java1234-filter-rocketmq",consumerGroup ="${rocketmq.consumer.group}" ,selectorExpression = "type='user' or a <7",selectorType = SelectorType.SQL92)
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费者:收到消息内容:"+s);
}
}
默认不支持SQL表达式,启动报错:
The broker does not support consumer to filter message by SQL92
找到broker.conf
配置文件
加下:
enablePropertyFilter=true
重新启动borker
.\\bin\\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true -c .\\conf\\broker.conf
运行测试:
过滤,收到2条消息,没问题;
说明:这个是锋哥的RocketMQ备课笔记,等备课完,会发布配套的视频教程,如有需要,可以先加锋哥WX:java1239 欢迎白嫖
没问题!
微信搜一搜公众号【java1234】关注这个放荡不羁的程序员,关注后回复【资料】有我准备的一线大厂笔试面试资料以及简历模板。
以上是关于13 SpringBoot整合RocketMQ实现过滤消息-根据SQL表达式过滤消息的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot(17)---SpringBoot整合RocketMQ