RabbitMq-Java-实现对象传输

Posted CaoPengCheng&

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq-Java-实现对象传输相关的知识,希望对你有一定的参考价值。

RabbitMq-Java-实现对象传输


所用依赖

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
<!--junit-->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.16.6</version>
</dependency>

对象序列化与反序列化

    /**
     * 对象转化为字节码(序列化)
     *
     * @param obj
     * @return @link byte[]
     * @author 曹鹏程
     * @date 2023/3/3 22:58
     */
    public static byte[] getBytesFromObject(Serializable obj) throws Exception 
        if (obj == null) 
            return null;
        
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = new ObjectOutputStream(bo);
        oo.writeObject(obj);
        return bo.toByteArray();
    

    /**
     * 字节码转化为对象(反序列化)
     *
     * @param objBytes
     * @return @link Object
     * @author 曹鹏程
     * @date 2023/3/3 22:59
     */
    public static Object getObjectFromBytes(byte[] objBytes) throws Exception 
        if (objBytes == null || objBytes.length == 0) 
            return null;
        
        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
        ObjectInputStream oi = new ObjectInputStream(bi);
        return oi.readObject();
    

测试对象

package rabbitmq.objectTransfer.entity;

import lombok.Data;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

/**
 * rabbitmq对象传输vo
 *
 * @author 曹鹏程
 * @date 2023/3/5 10:04
 */
@Data
public class TestEntity implements Serializable 

    private static final long serialVersionUID = -6194946875333262575L;

    private Long id;

    private String name;

    private String code;

    private BigDecimal money;

    private Date createDate;



消息接收端

package rabbitmq.objectTransfer;

import com.rabbitmq.client.*;
import org.springframework.amqp.utils.SerializationUtils;
import rabbitmq.objectTransfer.entity.FileEntity;
import rabbitmq.objectTransfer.entity.TestEntity;
import rabbitmq.units.FileConvertBase64;
import rabbitmq.units.RabbitMqUnits;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * rabbitmq对象传输(消息的消费者)
 *
 * @author 曹鹏程
 * @date 2023/3/3 18:17
 * Copyright(c) YIJIAN All Rights Reserved
 */
public class ConsumerTest 

    public static void main(String[] args) throws IOException, TimeoutException 
        try 
            // 1.建立连接
            ConnectionFactory factory = RabbitMqUnits.createFactory();
            // 1.2.建立连接
            Connection connection = factory.newConnection();
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
            // 3.创建队列
            String queueName = "object.queue";
            //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
            //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel) 
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    // 5.处理消息
                    //将传递过来的对象反序列化
                    @SuppressWarnings("deprecation")
                    TestEntity entity = (TestEntity) SerializationUtils.deserialize(body);
                    System.out.println("接收到消息:【" + entity + "】");
                
            );
            System.out.println("等待接收消息。。。。");
         catch (Exception e) 
            e.printStackTrace();
        
    




消息发送端

package rabbitmq.objectTransfer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.springframework.amqp.utils.SerializationUtils;
import rabbitmq.objectTransfer.entity.DetailEntity;
import rabbitmq.objectTransfer.entity.FileEntity;
import rabbitmq.objectTransfer.entity.TestEntity;
import rabbitmq.units.FileConvertBase64;
import rabbitmq.units.RabbitMqUnits;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
 * rabbitmq对象传输(消息的发送者)
 *
 * @author 曹鹏程
 * @date 2023/3/3 16:50
 * Copyright(c) YIJIAN All Rights Reserved
 */
public class PublisherTest 

    @Test
    public void testSendMessage() throws IOException, TimeoutException 
        Connection connection = null;
        Channel channel = null;
        try 
            // 1.建立连接
            ConnectionFactory factory = RabbitMqUnits.createFactory();
            // 1.2.建立连接
            connection = factory.newConnection();
            // 2.创建通道Channel
            channel = connection.createChannel();
            // 3.创建队列
            String queueName = "object.queue";
            //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
            //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.发送消息
            TestEntity entity = new TestEntity();
            entity.setId(123456L);
            entity.setName("对象传输");
            entity.setCode("rabbitmq");
            entity.setCreateDate(new Date());
            entity.setMoney(new BigDecimal("9999.8888"));
            channel.basicPublish("", queueName, null, SerializationUtils.serialize(entity));
            System.out.println("发送消息成功:【" + entity.toString() + "】");
         catch (Exception e) 
            e.printStackTrace();
         finally 
            // 5.关闭通道和连接
            assert channel != null;
            channel.close();
            connection.close();
        
    




以上是关于RabbitMq-Java-实现对象传输的主要内容,如果未能解决你的问题,请参考以下文章

java如何在网络流中传输对象

java socket之传输实体类对象

对象的传递为啥要 序列化 呢

Java实现对象的序列化

什么是java序列化,如何实现java序列化?

怎样对带有不可序列化属性的Java对象进行序列化