基于JMS实现activemq订阅的断线自动重连示例
Posted 10km
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于JMS实现activemq订阅的断线自动重连示例相关的知识,希望对你有一定的参考价值。
在使用基于JMS的ActiveMQ消息系统时,当正常订阅消息后,如果连接中断(如消息服务挂了,或网络中断)发生,订阅者肯定不能再收到订阅消息,即使消息服务重启或网络正常后,也不能正常收到订阅消息
这是因为考虑到应用层的复杂性,JMS消息框架并没有实现自动重连功能,
但是它提供了ExceptionListener
接口用于侦听JMS消息连接异常,JMS的连接接口(javax.jms.Connection)提供了setExceptionListener
方法用于为连接实例指定ExceptionListener
侦听器,如果侦听了Connection
的连接异常,就可以基于此实现自动重连接功能,以下是基于JMS的ExceptionListener
接口实现断线自动重连接的示例:
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
/**
* JMS 重连接实现<br>
* 通过实现@link ExceptionListener接口侦听连接异常,
* 使用定时任务迟延执行重连接尝试直至连接成功
* @author guyadong
* @since 2.3.8
*/
class AutoReconnectAdapter implements ExceptionListener,JmsConstants
private static long START_RECONNECTDELAY = 1;
/**
* 用于执行自动重连的定时器对象
*/
private static final Timer reconnectTimer = new Timer("AMQP Reconnect");
/**
* 定时重连的延迟时间(秒),从1秒开始,每次增加一倍,最大128
*/
private long reconnectDelay = START_RECONNECTDELAY;
/**
* 最大重连延迟时间
*/
private long maxReconnectDelay = 128;
/**
* 应用层实现的重连回调接口
*/
private final JMSReconnectCallback jmsReconnectCallback;
public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback)
this.jmsReconnectCallback = jmsReconnectCallback;
@Override
public void onException(JMSException exception)
if(null != jmsReconnectCallback)
try
jmsReconnectCallback.onConnectionLost();
scheduleReconnectCycle();
catch (Exception e)
logger.error(e.getMessage(),e);
/**
* 尝试将客户端重新连接到服务器。如果成功,它将确保不再计划重新连接。
* 但是,如果连接失败,延迟将增加一倍(最大128秒),并将在延迟后重新安排重新连接。
*/
private void attemptReconnect()
if(null != jmsReconnectCallback)
try
jmsReconnectCallback.tryReconnecting();
// restore to default value
reconnectDelay = START_RECONNECTDELAY;
catch (Exception e)
if(e instanceof JMSException || e.getCause() instanceof JMSException )
reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);
scheduleReconnectCycle();
else
logger.error(e.getMessage(),e);
/**
* 安排在@link #reconnectDelay指定的延迟时间后执行重连接尝试
*/
private void scheduleReconnectCycle()
logger.info(" Scheduling reconnect timer, delay seconds",jmsReconnectCallback.ownerName(), reconnectDelay);
reconnectTimer.schedule(new TimerTask()
@Override
public void run()
attemptReconnect();
, reconnectDelay*1000);
为了适应应用层不同的重连接实现需要,我通过定义JMSReconnectCallback
接口,来让断连接和重连实现抽象化,应用层可以根据自己的需要,实现此接口,执行断开连接和重连的动作
/**
* JMS 重连机制回调接口
* @author guyadong
* @since 2.3.8
*/
public interface JMSReconnectCallback
/**
* 连接异常侦听
* @throws Exception
*/
public void onConnectionLost() throws Exception;
/**
* 尝试重连动作
* @throws Exception 失败抛出异常 JMSException 或异常原因(cause)为JMSException 则继续异步执行重试
*/
public void tryReconnecting() throws Exception;
/**
* 返回当前接口对象所属的模块名,用于日志输出
*/
public String ownerName();
AutoReconnectAdapter 的使用示例:
/**
* JMS消息分发器抽象实现(线程安全)<br>
* @author guyadong
*
*/
abstract class BaseJmsDispatcher implements JmsConstants,JMSReconnectCallback
private final AutoReconnectAdapter autoReconnectAdapter = new AutoReconnectAdapter(this);
@Override
protected void doInit() throws JMSException
Connection c = getConnection();
/** 成功获取连接后设置异常侦听器 */
new ExceptionListenerContainer(autoReconnectAdapter).bind(c);
c.start();
session = c.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
/**
* 当连接中断时执行反初始化动作
*/
@Override
public void onConnectionLost() throws Exception
synchronized (consumers)
for(MessageConsumer messageConsumer : consumers.values())
try
messageConsumer.close();
catch (Exception e)
logger.error(e.getMessage());
consumers.clear();
uninit();
/**
* 尝试重装初始化
*/
@Override
public void tryReconnecting() throws Exception
init();
/** 重新初始化并订阅所有频道 */
subscribe();
@Override
public String ownerName()
return getClass().getSimpleName();
完整代码参见码云仓库:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java
以上是关于基于JMS实现activemq订阅的断线自动重连示例的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot的JMS发送和接收队列消息,基于ActiveMQ
干货 | 从零开始学 Java - Spring 集成 ActiveMQ 配置