RabbitMq-Java-实现传输
Posted CaoPengCheng&
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq-Java-实现传输相关的知识,希望对你有一定的参考价值。
RabbitMq-实现传输
所用依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
RabbitMqConstant常量类
package rabbitmq.constant;
/**
* RabbitMq消息队列常量
*
* @author 曹鹏程
* @date 2023/3/3 23:01
*/
public class RabbitMqConstant
/**
* 主机名
*/
public static final String RABBIT_MQ_HOST = "127.0.0.1";
/**
* 端口号
*/
public static final Integer RABBIT_MQ_PORT = 5672;
/**
* vhost
*/
public static final String RABBIT_MQ_VHOST = "/";
/**
* 审批模板。
*/
public static final String RABBIT_MQ_USERNAME="guest";
/**
* 审批项目。
*/
public static final String RABBIT_MQ_PASSWORD="guest";
RabbitMq消息队列工具类
package rabbitmq.units;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.util.StringUtils;
import rabbitmq.constant.RabbitMqConstant;
import java.io.*;
/**
* RabbitMq消息队列工具类
*
* @author 曹鹏程
* @date 2023/3/3 22:57
*/
public class RabbitMqUnits
/**
* 建立RabbitMq连接(原生连接)
*
* @param
* @return @link ConnectionFactory
* @author 曹鹏程
* @date 2023/3/3 23:03
*/
public static ConnectionFactory createFactory() throws Exception
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数
factory.setHost(RabbitMqConstant.RABBIT_MQ_HOST);//主机名
factory.setPort(RabbitMqConstant.RABBIT_MQ_PORT);//端口号
factory.setVirtualHost(RabbitMqConstant.RABBIT_MQ_VHOST);//vhost
factory.setUsername(RabbitMqConstant.RABBIT_MQ_USERNAME);//用户名
factory.setPassword(RabbitMqConstant.RABBIT_MQ_PASSWORD);//密码
return factory;
/**
* 对象转化为字节码(序列化)
*
* @param obj
* @return @link byte[]
* @author 曹鹏程
* @date 2023/3/3 22:58
*/
public static byte[] getBytesFromObject(Serializable obj) throws Exception
if (obj == null)
return null;
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
/**
* 字节码转化为对象(反序列化)
*
* @param objBytes
* @return @link Object
* @author 曹鹏程
* @date 2023/3/3 22:59
*/
public static Object getObjectFromBytes(byte[] objBytes) throws Exception
if (objBytes == null || objBytes.length == 0)
return null;
ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
ObjectInputStream oi = new ObjectInputStream(bi);
return oi.readObject();
/**
* 字节数组转成16进制表示格式的字符串
*
* @param byteArray 需要转换的字节数组
* @return @link String 16进制表示格式的字符串
* @author 曹鹏程
* @date 2023/3/5 22:20
*/
public static String toHexString(byte[] byteArray)
if (byteArray == null || byteArray.length < 1)
throw new IllegalArgumentException("this byteArray must not be null or empty");
final StringBuilder hexString = new StringBuilder();
for (int i = 0; i < byteArray.length; i++)
if ((byteArray[i] & 0xff) < 0x10)//0~F前面不零
hexString.append("0");
hexString.append(Integer.toHexString(0xFF & byteArray[i]));
return hexString.toString().toLowerCase();
/**
* 字符串转字节数组转成16进制表示格式
*
* @param hexString 需要转换的16进制表示格式的字符串
* @return @link byte[] 字节数组
* @author 曹鹏程
* @date 2023/3/5 22:21
*/
public static byte[] toByteArray(String hexString)
if (StringUtils.isEmpty(hexString))
throw new IllegalArgumentException("this hexString must not be empty");
hexString = hexString.toLowerCase();
final byte[] byteArray = new byte[hexString.length() / 2];
int k = 0;
for (int i = 0; i < byteArray.length; i++) //因为是16进制,最多只会占用4位,转换成字节需要两个16进制的字符,高位在先
byte high = (byte) (Character.digit(hexString.charAt(k), 16) & 0xff);
byte low = (byte) (Character.digit(hexString.charAt(k + 1), 16) & 0xff);
byteArray[i] = (byte) (high << 4 | low);
k += 2;
return byteArray;
消息的消费者
package rabbitmq.protogenesis;
import com.rabbitmq.client.*;
import rabbitmq.units.RabbitMqUnits;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息的消费者
*
* @author 曹鹏程
* @date 2023/3/3 18:17
*/
public class ConsumerTest
public static void main(String[] args) throws IOException, TimeoutException
try
// 1.建立连接
ConnectionFactory factory = RabbitMqUnits.createFactory();
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
//设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
//第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
);
System.out.println("等待接收消息。。。。");
catch (Exception e)
e.printStackTrace();
消息的生产者
package rabbitmq.protogenesis;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import rabbitmq.units.RabbitMqUnits;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息的发送者
*
* @author 曹鹏程
* @date 2023/3/3 16:50
* Copyright(c) YIJIAN All Rights Reserved
*/
public class PublisherTest
@Test
public void testSendMessage() throws IOException, TimeoutException
Connection connection = null;
Channel channel = null;
try
// 1.建立连接
ConnectionFactory factory = RabbitMqUnits.createFactory();
// 1.2.建立连接
connection = factory.newConnection();
// 2.创建通道Channel
channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
//设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
//第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "你好, 消息队列!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
catch (Exception e)
e.printStackTrace();
finally
// 5.关闭通道和连接
channel.close();
connection.close();
启动消息消费者
启动消息生产者
以上是关于RabbitMq-Java-实现传输的主要内容,如果未能解决你的问题,请参考以下文章