基于JMS实现activemq订阅的断线自动重连示例

Posted 10km

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于JMS实现activemq订阅的断线自动重连示例相关的知识,希望对你有一定的参考价值。

在使用基于JMS的ActiveMQ消息系统时,当正常订阅消息后,如果连接中断(如消息服务挂了,或网络中断)发生,订阅者肯定不能再收到订阅消息,即使消息服务重启或网络正常后,也不能正常收到订阅消息
这是因为考虑到应用层的复杂性,JMS消息框架并没有实现自动重连功能,
但是它提供了ExceptionListener接口用于侦听JMS消息连接异常,JMS的连接接口(javax.jms.Connection)提供了setExceptionListener方法用于为连接实例指定ExceptionListener侦听器,如果侦听了Connection的连接异常,就可以基于此实现自动重连接功能,以下是基于JMS的ExceptionListener接口实现断线自动重连接的示例:

import java.util.Timer;
import java.util.TimerTask;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;

/**
 * JMS 重连接实现<br>
 * 通过实现@link ExceptionListener接口侦听连接异常,
 * 使用定时任务迟延执行重连接尝试直至连接成功
 * @author guyadong
 * @since 2.3.8
 */
class AutoReconnectAdapter implements ExceptionListener,JmsConstants
	private static long START_RECONNECTDELAY = 1;
	/**
	 * 用于执行自动重连的定时器对象
	 */
	private static final Timer reconnectTimer = new Timer("AMQP Reconnect"); 
	/**
	 * 定时重连的延迟时间(秒),从1秒开始,每次增加一倍,最大128
	 */
	private long reconnectDelay = START_RECONNECTDELAY; 
	/**
	 * 最大重连延迟时间
	 */
	private long maxReconnectDelay = 128;
	/**
	 * 应用层实现的重连回调接口
	 */
	private final JMSReconnectCallback jmsReconnectCallback;
	public AutoReconnectAdapter(JMSReconnectCallback jmsReconnectCallback) 
		this.jmsReconnectCallback = jmsReconnectCallback;
	

	@Override
	public void onException(JMSException exception) 
		if(null != jmsReconnectCallback) 
			try 
				jmsReconnectCallback.onConnectionLost();
				scheduleReconnectCycle();
			 catch (Exception e) 
				logger.error(e.getMessage(),e);
			
		
	
	/**
	 * 尝试将客户端重新连接到服务器。如果成功,它将确保不再计划重新连接。
	 * 但是,如果连接失败,延迟将增加一倍(最大128秒),并将在延迟后重新安排重新连接。
	 */
	private void attemptReconnect() 
		if(null != jmsReconnectCallback) 
			try 
				jmsReconnectCallback.tryReconnecting();
				// restore to default value
				reconnectDelay = START_RECONNECTDELAY;
				catch (Exception e) 
				if(e instanceof JMSException || e.getCause() instanceof JMSException ) 
					reconnectDelay = Math.min(reconnectDelay*2, maxReconnectDelay);
					scheduleReconnectCycle();
				else 
					logger.error(e.getMessage(),e);
				
			
		
	
	/**
	 * 安排在@link #reconnectDelay指定的延迟时间后执行重连接尝试
	 */
	private void scheduleReconnectCycle() 
		logger.info(" Scheduling reconnect timer, delay  seconds",jmsReconnectCallback.ownerName(), reconnectDelay);
		reconnectTimer.schedule(new TimerTask() 
			@Override
			public void run() 
				attemptReconnect();
			, reconnectDelay*1000);
	


为了适应应用层不同的重连接实现需要,我通过定义JMSReconnectCallback接口,来让断连接和重连实现抽象化,应用层可以根据自己的需要,实现此接口,执行断开连接和重连的动作

/**
 * JMS 重连机制回调接口
 * @author guyadong
 * @since 2.3.8
 */
public interface JMSReconnectCallback
	/**
	 * 连接异常侦听
	 * @throws Exception 
	 */
	public void onConnectionLost() throws Exception;
	/**
	 * 尝试重连动作
	 * @throws Exception 失败抛出异常 JMSException 或异常原因(cause)为JMSException 则继续异步执行重试
	 */
	public void tryReconnecting() throws Exception;
	/**
	 * 返回当前接口对象所属的模块名,用于日志输出
	 */
	public String ownerName();

AutoReconnectAdapter 的使用示例:

/**
 * JMS消息分发器抽象实现(线程安全)<br>
 * @author guyadong
 *
 */
abstract class BaseJmsDispatcher implements JmsConstants,JMSReconnectCallback
	private final AutoReconnectAdapter autoReconnectAdapter = new AutoReconnectAdapter(this);
	@Override
	protected void doInit() throws JMSException
		Connection c = getConnection();
		/** 成功获取连接后设置异常侦听器 */
		new ExceptionListenerContainer(autoReconnectAdapter).bind(c);
		c.start();
		session = c.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
	
	/**
	 * 当连接中断时执行反初始化动作
	 */
	@Override
	public void onConnectionLost() throws Exception 
		synchronized (consumers) 
			for(MessageConsumer messageConsumer : consumers.values()) 
				try 
					messageConsumer.close();
				 catch (Exception e) 
					logger.error(e.getMessage());
				
			
			consumers.clear();
		
		uninit();
	
	/**
	 * 尝试重装初始化
	 */
	@Override
	public void tryReconnecting() throws Exception 
		init();
		/** 重新初始化并订阅所有频道 */
		subscribe();
	
	@Override
	public String ownerName() 
		return getClass().getSimpleName();
	

完整代码参见码云仓库:https://gitee.com/l0km/simplemq/blob/dev/simplemq-jms/src/main/java/gu/simplemq/jms/AutoReconnectAdapter.java

以上是关于基于JMS实现activemq订阅的断线自动重连示例的主要内容,如果未能解决你的问题,请参考以下文章

3,ActiveMQ-入门(基于JMS发布订阅模型)

Spring Boot的JMS发送和接收队列消息,基于ActiveMQ

干货 | 从零开始学 Java - Spring 集成 ActiveMQ 配置

JMS发布/订阅消息传送例子

用python编写与mysql数据库连接并实现断线重连的问题

深入浅出JMS--Spring和ActiveMQ整合的完整实例