RockeyMQ消息处理

Posted huangxinglei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RockeyMQ消息处理相关的知识,希望对你有一定的参考价值。

1:topic下面的queue 奇偶数会影响消费端的消费数量,

假设 4个队列,8个消息,每个节点各自消费两条,如果不对等的话,就没办法做一个负载均衡

 

队列的数量要大于consumer数量,不然多出来的consumer无法分配到queue,消息没办法消费,需要控制队列大于消费端

 

2:集群模式和广播模式

集群模式:消息只会被一个consumser消费一次,比如下单业务等;

广播模式:消息会被每个Consumer消费,比如群发公告等,广播模式中的ConsumerGroup暂时无用

通过 setMessageModel 进行修改

技术图片

 

 

3:一个Message里面只有一个topic,一个topic可以有多个tag,比如一个任务, 分为垫付任务,礼品任务,高权重任务,topic就是任务大类,tag就是任务大类的细分,二级类目,consumer端接受消息过滤有很多种

1:通过制定的tag接受

product

技术图片

 

 

consumer

技术图片

 

 

 

2:通过 * 监听,获取topic下面的全部消息

技术图片

 

 

 

3:通过 || 运算符监听,只要匹配到就接受

技术图片

 

 

4:通过SLQ92监听,支持 sql运算符  < >  >= <= =  is null or and not  等

 

启动报错

org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92

 

技术图片

 

 

解决方法:增加配置

进入 /distribution/target/apache-rocketmq/conf/2m-2s-async  主修改 

broker-a.properties

添加配置

enablePropertyFilter=true

 

技术图片

 

停掉之前的brokey服务   sh bin/mqshutdown broker

技术图片

 

 

重启brokey

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &

技术图片

 

 

从节点的话进入  /distribution/target/apache-rocketmq/conf/2m-2s-async

修改   

broker-a-s.properties 添加配置
 
 
回到 
mqbroker目录  停掉之前服务  启动  
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &

 

再次启动服务

技术图片

 

 

product

message.putUserProperty("orderNum","1");

技术图片

 

 

consumer

技术图片

 

 

过滤的话分为brokey过滤和consumer端过滤

brokey过滤:能减少一些无用的消息发送,节约带宽,会增加Brokey的负担

consumer过滤:能够减少brokey的压力

 

在brokey过滤首先product把tag转成哈希码发送到brokey,然后会去遍历queue里面的tag,和订阅方传递过来的tag进行hashcode匹配,不一样的劵就跳过,一样的话就发送到consumer,

然后consumer 防止hash 碰撞 会把tag 哈希码装换成原生的字符串进行对比过滤

 

以上是关于RockeyMQ消息处理的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

没有带有拉动刷新的互联网消息 webview 片段

论如何设计一款端对端加密通讯软件

终于懂了:Delphi重定义消息结构随心所欲,只需要前4个字节是消息编号就行了,跟Windows消息虽然尽量保持一致,但其实相互没有特别大的关系。有了这个,就有了主动,带不带句柄完全看需要。(代码片段

处理屏幕旋转上的片段重复(带有示例代码)

在 Python 多处理进程中运行较慢的 OpenCV 代码片段