RabbitMq-Java-实现传输

Posted CaoPengCheng&

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMq-Java-实现传输相关的知识,希望对你有一定的参考价值。

RabbitMq-实现传输


所用依赖

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>
<!--junit-->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>

RabbitMqConstant常量类

package rabbitmq.constant;

/**
 * RabbitMq消息队列常量
 *
 * @author 曹鹏程
 * @date 2023/3/3 23:01
 */
public class RabbitMqConstant 

    /**
     * 主机名
     */
    public static final String RABBIT_MQ_HOST = "127.0.0.1";

    /**
     * 端口号
     */
    public static final Integer RABBIT_MQ_PORT = 5672;

    /**
     * vhost
     */
    public static final String RABBIT_MQ_VHOST = "/";

    /**
     * 审批模板。
     */
    public static final String RABBIT_MQ_USERNAME="guest";

    /**
     * 审批项目。
     */
    public static final String RABBIT_MQ_PASSWORD="guest";




RabbitMq消息队列工具类

package rabbitmq.units;

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.util.StringUtils;
import rabbitmq.constant.RabbitMqConstant;

import java.io.*;

/**
 * RabbitMq消息队列工具类
 *
 * @author 曹鹏程
 * @date 2023/3/3 22:57
 */
public class RabbitMqUnits 

    /**
     * 建立RabbitMq连接(原生连接)
     *
     * @param
     * @return @link ConnectionFactory
     * @author 曹鹏程
     * @date 2023/3/3 23:03
     */
    public static ConnectionFactory createFactory() throws Exception 
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数
        factory.setHost(RabbitMqConstant.RABBIT_MQ_HOST);//主机名
        factory.setPort(RabbitMqConstant.RABBIT_MQ_PORT);//端口号
        factory.setVirtualHost(RabbitMqConstant.RABBIT_MQ_VHOST);//vhost
        factory.setUsername(RabbitMqConstant.RABBIT_MQ_USERNAME);//用户名
        factory.setPassword(RabbitMqConstant.RABBIT_MQ_PASSWORD);//密码
        return factory;
    

    /**
     * 对象转化为字节码(序列化)
     *
     * @param obj
     * @return @link byte[]
     * @author 曹鹏程
     * @date 2023/3/3 22:58
     */
    public static byte[] getBytesFromObject(Serializable obj) throws Exception 
        if (obj == null) 
            return null;
        
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = new ObjectOutputStream(bo);
        oo.writeObject(obj);
        return bo.toByteArray();
    

    /**
     * 字节码转化为对象(反序列化)
     *
     * @param objBytes
     * @return @link Object
     * @author 曹鹏程
     * @date 2023/3/3 22:59
     */
    public static Object getObjectFromBytes(byte[] objBytes) throws Exception 
        if (objBytes == null || objBytes.length == 0) 
            return null;
        
        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
        ObjectInputStream oi = new ObjectInputStream(bi);
        return oi.readObject();
    

    /**
     * 字节数组转成16进制表示格式的字符串
     *
     * @param byteArray 需要转换的字节数组
     * @return @link String 16进制表示格式的字符串
     * @author 曹鹏程
     * @date 2023/3/5 22:20
     */
    public static String toHexString(byte[] byteArray) 
        if (byteArray == null || byteArray.length < 1)
            throw new IllegalArgumentException("this byteArray must not be null or empty");

        final StringBuilder hexString = new StringBuilder();
        for (int i = 0; i < byteArray.length; i++) 
            if ((byteArray[i] & 0xff) < 0x10)//0~F前面不零
                hexString.append("0");
            hexString.append(Integer.toHexString(0xFF & byteArray[i]));
        
        return hexString.toString().toLowerCase();
    

    /**
     * 字符串转字节数组转成16进制表示格式
     *
     * @param hexString 需要转换的16进制表示格式的字符串
     * @return @link byte[] 字节数组
     * @author 曹鹏程
     * @date 2023/3/5 22:21
     */
    public static byte[] toByteArray(String hexString) 
        if (StringUtils.isEmpty(hexString))
            throw new IllegalArgumentException("this hexString must not be empty");

        hexString = hexString.toLowerCase();
        final byte[] byteArray = new byte[hexString.length() / 2];
        int k = 0;
        for (int i = 0; i < byteArray.length; i++) //因为是16进制,最多只会占用4位,转换成字节需要两个16进制的字符,高位在先
            byte high = (byte) (Character.digit(hexString.charAt(k), 16) & 0xff);
            byte low = (byte) (Character.digit(hexString.charAt(k + 1), 16) & 0xff);
            byteArray[i] = (byte) (high << 4 | low);
            k += 2;
        
        return byteArray;
    




消息的消费者

package rabbitmq.protogenesis;

import com.rabbitmq.client.*;
import rabbitmq.units.RabbitMqUnits;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息的消费者
 *
 * @author 曹鹏程
 * @date 2023/3/3 18:17
 */
public class ConsumerTest 

    public static void main(String[] args) throws IOException, TimeoutException 
        try 
            // 1.建立连接
            ConnectionFactory factory = RabbitMqUnits.createFactory();
            // 1.2.建立连接
            Connection connection = factory.newConnection();
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
            // 3.创建队列
            String queueName = "simple.queue";
            //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
            //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel)
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                
            );
            System.out.println("等待接收消息。。。。");
         catch (Exception e) 
            e.printStackTrace();
        
    




消息的生产者

package rabbitmq.protogenesis;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import rabbitmq.units.RabbitMqUnits;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消息的发送者
 *
 * @author 曹鹏程
 * @date 2023/3/3 16:50
 * Copyright(c) YIJIAN All Rights Reserved
 */
public class PublisherTest 

    @Test
    public void testSendMessage() throws IOException, TimeoutException 
        Connection connection = null;
        Channel channel = null;
        try 
            // 1.建立连接
            ConnectionFactory factory = RabbitMqUnits.createFactory();
            // 1.2.建立连接
            connection = factory.newConnection();
            // 2.创建通道Channel
            channel = connection.createChannel();
            // 3.创建队列
            String queueName = "simple.queue";
            //设置队列的属性第一个参数为队列名。第二个参数为是否创建一个持久队列,第三个是否创建一个专用的队列,
            //第四个参数为是否自动删除队列,第五个参数为其他属性(结构参数)
            channel.queueDeclare(queueName, false, false, false, null);
            // 4.发送消息
            String message = "你好, 消息队列!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
         catch (Exception e) 
            e.printStackTrace();
         finally 
            // 5.关闭通道和连接
            channel.close();
            connection.close();
        
    





启动消息消费者

启动消息生产者


以上是关于RabbitMq-Java-实现传输的主要内容,如果未能解决你的问题,请参考以下文章

一个队列上的多个消费者 RabbitMQ - Java

用UDP实现可靠传输

(200分)扩展UDP实现可靠传输(SR,GBN)

UDP如何实现可靠传输

TCP是如何实现可靠传输的?

如何实现TCP和UDP传输