RabbitMQ Client封装连接及业务处理接口
Posted HiveDark
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ Client封装连接及业务处理接口相关的知识,希望对你有一定的参考价值。
一、RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
<更新明细>
- 20210109:factory加入断线重连机制。
二、rabbitMQ安装
三、封装RabbitMqClient.java
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.mina.proxy.utils.ByteUtilities;
import org.slf4j.Logger;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.extern.slf4j.Slf4j;
/**
* RabbitMQ连接客户端工具类
* @author david(857332533@qq.com)
* 2020年12月31日
*/
@Slf4j
public class RabbitMqClient implements Runnable
private final static Logger rabbitLogger = org.slf4j.LoggerFactory.getLogger("rabbit");
private Connection connection = null;
private Channel channel = null;
private String host = "";
private int port = 5672;
private String userName = "";
private String password = "";
private String virtualHost = "";
private String queueName = "";
private boolean isConnected = false;
private MessageHandler messageHandler;
private Thread thread;
public RabbitMqClient(String host,int port,String queueName,MessageHandler messageHandler)
this(host, port, null, queueName, messageHandler);
public RabbitMqClient(String host,int port,String virtualHost,String queueName,MessageHandler messageHandler)
this(host,port,null,null,virtualHost,queueName,messageHandler);
public RabbitMqClient(String host,int port,String userName,String password,String virtualHost,String queueName,MessageHandler messageHandler)
this.host = host;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.queueName = queueName;
this.messageHandler = messageHandler;
//启动
start();
//启动线程
this.thread = new Thread(this);
this.thread.start();
public void setMessageHandler(MessageHandler messageHandler)
this.messageHandler = messageHandler;
public Thread getThread()
return thread;
public String getQueueName()
return queueName;
public boolean isConnected()
return isConnected;
/**
* #启动监听
*/
public void start()
try
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
if(StringUtils.isNotBlank(userName))
factory.setUsername(userName);
if(StringUtils.isNotBlank(password))
factory.setPassword(password);
if(StringUtils.isNotBlank(virtualHost))
factory.setVirtualHost(virtualHost);
factory.setExceptionHandler(new DefaultExceptionHandler()
@Override
public void handleConnectionRecoveryException(Connection conn, Throwable exception)
// TODO Auto-generated method stub
super.handleConnectionRecoveryException(conn, exception);
try
log.info("自动检测到连接断开,尝试重连");
close();
catch (Exception e)
finally
isConnected = false;
);
connection = factory.newConnection();
channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
handleDeliveryClient(consumerTag, envelope, properties, body);
;
// channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
channel.basicConsume(queueName, true, consumer);
isConnected = true;
log.info("start "+queueName+" success");
catch (Exception e)
log.error("启动MQ监听异常",e);
isConnected = false;
close();
/**
* #释放资源
*/
public void close()
if(null != channel)
try
channel.close();
channel = null;
catch (Exception e)
if(null != connection)
try
connection.close();
connection = null;
catch (Exception e)
isConnected = false;
log.info("close "+queueName+" success");
/**
* #消息数据解析处理
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
public void handleDeliveryClient(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
try
rabbitLogger.info("exchang->,routingKey->,queueName->->recv:",
envelope.getExchange(),envelope.getRoutingKey(),this.queueName,ByteUtilities.asHex(body));
if(null != this.messageHandler)
this.messageHandler.handlerMessage(this.queueName,body);
catch (Exception e)
log.error("RabbitMQ消息解析异常",e);
@Override
public void run()
// TODO Auto-generated method stub
while (true)
try
log.info(this.queueName+" is running");
if(!isConnected)
log.info("重连MQ:",this.queueName);
start();
Thread.sleep(1000*60);
catch (Exception e)
四、消息处理MessageHandler.java
public interface MessageHandler
/**
* 处理MQ消息数据
* @param body
*/
public void handlerMessage(String queueName,byte[] body);
五、总结
封装RabbitMqClient对象,用于连接RabbitMQ,自动ACK确认消息,开启线程自动心跳检测,如果connect失败,则会自动重连。
定义MessageHandler接口,用于处理业务数据。
以上是关于RabbitMQ Client封装连接及业务处理接口的主要内容,如果未能解决你的问题,请参考以下文章