RabbitMq-Java-实现对象传输
Posted CaoPengCheng&
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq-Java-实现对象传输相关的知识,希望对你有一定的参考价值。
RabbitMq-Java-实现对象传输
所用依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.6</version>
</dependency>
对象序列化与反序列化
/**
* 对象转化为字节码(序列化)
*
* @param obj
* @return @link byte[]
* @author 曹鹏程
* @date 2023/3/3 22:58
*/
public static byte[] getBytesFromObject(Serializable obj) throws Exception
if (obj == null)
return null;
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
/**
* 字节码转化为对象(反序列化)
*
* @param objBytes
* @return @link Object
* @author 曹鹏程
* @date 2023/3/3 22:59
*/
public static Object getObjectFromBytes(byte[] objBytes) throws Exception
if (objBytes == null || objBytes.length == 0)
return null;
ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
ObjectInputStream oi = new ObjectInputStream(bi);
return oi.readObject();
测试对象
package rabbitmq.objectTransfer.entity;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
/**
* rabbitmq对象传输vo
*
* @author 曹鹏程
* @date 2023/3/5 10:04
*/
@Data
public class TestEntity implements Serializable
private static final long serialVersionUID = -6194946875333262575L;
private Long id;
private String name;
private String code;
private BigDecimal money;
private Date createDate;
消息接收端
package rabbitmq.objectTransfer;
import com.rabbitmq.client.*;
import org.springframework.amqp.utils.SerializationUtils;
import rabbitmq.objectTransfer.entity.FileEntity;
import rabbitmq.objectTransfer.entity.TestEntity;
import rabbitmq.units.FileConvertBase64;
import rabbitmq.units.RabbitMqUnits;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* rabbitmq对象传输(消息的消费者)
*
* @author 曹鹏程
* @date 2023/3/3 18:17
* Copyright(c) YIJIAN All Rights Reserved
*/
public class ConsumerTest
public static void main(String[] args) throws IOException, TimeoutException
try
// 1.建立连接
ConnectionFactory factory = RabbitMqUnits.createFactory();
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "object.queue";
//设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
//第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
// 5.处理消息
//将传递过来的对象反序列化
@SuppressWarnings("deprecation")
TestEntity entity = (TestEntity) SerializationUtils.deserialize(body);
System.out.println("接收到消息:【" + entity + "】");
);
System.out.println("等待接收消息。。。。");
catch (Exception e)
e.printStackTrace();
消息发送端
package rabbitmq.objectTransfer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.utils.SerializationUtils;
import rabbitmq.objectTransfer.entity.DetailEntity;
import rabbitmq.objectTransfer.entity.FileEntity;
import rabbitmq.objectTransfer.entity.TestEntity;
import rabbitmq.units.FileConvertBase64;
import rabbitmq.units.RabbitMqUnits;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* rabbitmq对象传输(消息的发送者)
*
* @author 曹鹏程
* @date 2023/3/3 16:50
* Copyright(c) YIJIAN All Rights Reserved
*/
public class PublisherTest
@Test
public void testSendMessage() throws IOException, TimeoutException
Connection connection = null;
Channel channel = null;
try
// 1.建立连接
ConnectionFactory factory = RabbitMqUnits.createFactory();
// 1.2.建立连接
connection = factory.newConnection();
// 2.创建通道Channel
channel = connection.createChannel();
// 3.创建队列
String queueName = "object.queue";
//设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
//第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
TestEntity entity = new TestEntity();
entity.setId(123456L);
entity.setName("对象传输");
entity.setCode("rabbitmq");
entity.setCreateDate(new Date());
entity.setMoney(new BigDecimal("9999.8888"));
channel.basicPublish("", queueName, null, SerializationUtils.serialize(entity));
System.out.println("发送消息成功:【" + entity.toString() + "】");
catch (Exception e)
e.printStackTrace();
finally
// 5.关闭通道和连接
assert channel != null;
channel.close();
connection.close();
以上是关于RabbitMq-Java-实现对象传输的主要内容,如果未能解决你的问题,请参考以下文章