如何用多线程方式,提高rabbitmq消息处理效率?
Posted Java知音_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何用多线程方式,提高rabbitmq消息处理效率?相关的知识,希望对你有一定的参考价值。
点击关注公众号,实用技术文章及时了解
来源:blog.csdn.net/qq_37936542/
article/details/82012991
问题描述:项目中接收到rabbitmq消息后,先进行一系列的处理,等所有处理完成后,将消息推送到前台,但是在处理消息的过程中,每个方法中都有与数据库交互的代码,直接导致消息推送不及时。
单线程代码模型:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MqHandler implements MessageListener {
//消息处理
public void onMessage(Message msg) {
//处理A
dealA(msg);
//处理B
dealB(msg);
//处理C
dealC(msg);
//处理D
dealD(msg);
//处理E
dealE(msg);
//处理F
dealF(msg);
}
public void dealA(Message msg){}
public void dealB(Message msg){}
public void dealC(Message msg){}
public void dealD(Message msg){}
public void dealE(Message msg){}
public void dealF(Message msg){}
}
单线程处理图示:
解决方案:采用多线程的方式,每个线程处理一个或多个逻辑,提高CPU的使用率,优化消息响应时间。
多线程处理图示:
代码实现
一:配置spring线程池
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="10" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="50" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="1000" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="300" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
二:线程类
ABProcess:将方法A和方法B中的逻辑交给该线程处理
import java.util.ArrayList;
import java.util.List;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class ABProcess implements Runnable {
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
private List<Message> ABList = new ArrayList<Message>();
// 初始化方法:启动线程
public void init() {
taskExecutor.execute(this);
}
/**
* 对外提供添加数据的方法
* ABList是共享资源,主线程MqHandler对此进行添加,子线程ABProcess对此进行删除,存在线程安全问题,所以需要加同步
* notify():此方法必须在synchronized修饰的代码块或者方法中使用
* @param msg
*/
public synchronized void addList(Message msg) {
ABList.add(msg);
notify(); // 唤醒在此对象监视器(锁)上等待的单个线程,
}
@Override
public void run() {
while (true) {
try {
thread(); //调用实现方法
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 因为涉及共享资源的操作,需要同步
* wait():此方法必须在synchronized修饰的代码块或者方法中使用
* @throws Exception
*/
public synchronized void thread() throws Exception {
if (ABList.size() > 0) { // 判断集合中是否有消息
dealA(ABList.get(0)); //方法A
dealB(ABList.get(0));//方法B
ABList.remove(0); // 处理完后,删除这条数据
System.out.println("dealABSuccess");
} else {
wait(); // 若集合中没有消息,让线程等待,
}
}
public void dealA(Message msg) {
}
public void dealB(Message msg) {
}
}
spring配置配置Bean,并初始化init方法
<bean id="ABProcess" class="com.thread.ABProcess" init-method="init"/>
CDProcess:将方法C和方法D的逻辑交给该线程处理,具体实现与ABProcess一致
三:修改主线程MqHandler逻辑
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
public class MqHandler implements MessageListener {
@Autowired
private ABProcess acProcess;
@Autowired
private CDProcess cdProcess;
//消息处理
public void onMessage(Message msg) {
acProcess.addList(msg);//主线程将消息添加到集合,交给子线程ABProcess处理
cdProcess.addList(msg);//主线程将消息添加到集合,交给子线程CDProcess处理
//E和F逻辑代码简单,直接交给主线程
dealE(msg);
dealF(msg);
}
public void dealE(Message msg){}
public void dealF(Message msg){}
}
推荐:
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
以上是关于如何用多线程方式,提高rabbitmq消息处理效率?的主要内容,如果未能解决你的问题,请参考以下文章