RabbitMQ Client封装连接及业务处理接口

Posted HiveDark

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ Client封装连接及业务处理接口相关的知识,希望对你有一定的参考价值。

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

<更新明细>

  • 20210109:factory加入断线重连机制。

二、rabbitMQ安装

RabbitMQ Download
参考安装博客

三、封装RabbitMqClient.java

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.mina.proxy.utils.ByteUtilities;
import org.slf4j.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

/**
 * RabbitMQ连接客户端工具类
 * @author david(857332533@qq.com)
 * 2020年12月31日
 */
@Slf4j
public class RabbitMqClient implements Runnable
	private final static Logger rabbitLogger = org.slf4j.LoggerFactory.getLogger("rabbit");
	private Connection connection = null;
	private Channel channel = null;
	private String host = "";
	private int port = 5672;
	private String userName = "";
	private String password = "";
	private String virtualHost = "";
	private String queueName = "";
	private boolean isConnected = false;
	private MessageHandler messageHandler;
	private Thread thread;
	
	public RabbitMqClient(String host,int port,String queueName,MessageHandler messageHandler) 
        this(host, port, null, queueName, messageHandler);
	
	
	public RabbitMqClient(String host,int port,String virtualHost,String queueName,MessageHandler messageHandler) 
        this(host,port,null,null,virtualHost,queueName,messageHandler);
	
	
	public RabbitMqClient(String host,int port,String userName,String password,String virtualHost,String queueName,MessageHandler messageHandler) 
        this.host = host;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.queueName = queueName;
        this.messageHandler = messageHandler;
        //启动
        start();
        //启动线程
        this.thread = new Thread(this);
        this.thread.start();
	
	
	public void setMessageHandler(MessageHandler messageHandler) 
		this.messageHandler = messageHandler;
	
	
	public Thread getThread() 
		return thread;
	
	
	public String getQueueName() 
		return queueName;
	
	
	public boolean isConnected() 
		return isConnected;
	

	/**
	 * #启动监听
	 */
	public void start() 
		try 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            if(StringUtils.isNotBlank(userName)) 
                factory.setUsername(userName);
            
            if(StringUtils.isNotBlank(password)) 
                factory.setPassword(password);
            
            if(StringUtils.isNotBlank(virtualHost)) 
                factory.setVirtualHost(virtualHost);
            
            factory.setExceptionHandler(new DefaultExceptionHandler() 

				@Override
				public void handleConnectionRecoveryException(Connection conn, Throwable exception) 
					// TODO Auto-generated method stub
					super.handleConnectionRecoveryException(conn, exception);
					try 
						log.info("自动检测到连接断开,尝试重连");
						close();
					 catch (Exception e) 
					 finally 
						isConnected = false;
					
				
            );
            connection = factory.newConnection();
            channel = connection.createChannel();
            Consumer consumer = new DefaultConsumer(channel) 
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException 
                	handleDeliveryClient(consumerTag, envelope, properties, body);
                
            ;
            // channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queueName, true, consumer);
            isConnected = true;
    		log.info("start "+queueName+" success");
        catch (Exception e) 
        	log.error("启动MQ监听异常",e);
        	isConnected = false;
        	close();
		
	
	
	/**
	 * #释放资源
	 */
	public void close() 
		if(null != channel) 
			try 
				channel.close();
				channel = null;
			 catch (Exception e) 
		
		if(null != connection) 
			try 
				connection.close();
				connection = null;
			 catch (Exception e) 
		
		isConnected = false;
		log.info("close "+queueName+" success");
	
	
	/**
	 * #消息数据解析处理
	 * @param consumerTag
	 * @param envelope
	 * @param properties
	 * @param body
	 * @throws IOException
	 */
	public void handleDeliveryClient(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException 
		try 
			rabbitLogger.info("exchang->,routingKey->,queueName->->recv:",
					envelope.getExchange(),envelope.getRoutingKey(),this.queueName,ByteUtilities.asHex(body));
			if(null != this.messageHandler) 
				this.messageHandler.handlerMessage(this.queueName,body);
			
		 catch (Exception e) 
			log.error("RabbitMQ消息解析异常",e);
		
	

	@Override
	public void run() 
		// TODO Auto-generated method stub
		while (true) 
			try 
				log.info(this.queueName+" is running");
				if(!isConnected) 
					log.info("重连MQ:",this.queueName);
					start();
				
				Thread.sleep(1000*60);
			 catch (Exception e) 
		
	


四、消息处理MessageHandler.java

public interface MessageHandler 
	
	/**
	 * 处理MQ消息数据
	 * @param body
	 */
	public void handlerMessage(String queueName,byte[] body);


五、总结

封装RabbitMqClient对象,用于连接RabbitMQ,自动ACK确认消息,开启线程自动心跳检测,如果connect失败,则会自动重连。
定义MessageHandler接口,用于处理业务数据。

以上是关于RabbitMQ Client封装连接及业务处理接口的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ接口封装

RabbitMq之工作队列(轮询发送消息)

rabbitMQ队列处理导致连接池耗尽Tomcat假死问题排查处理

进阶盘点Rabbitmq工具特性及常规处理方法

封装php的RabbitMq

记录下关于RabbitMQ常用知识点(持续更新)