Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit

Posted 微笑点燃希望

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit相关的知识,希望对你有一定的参考价值。

https://blog.csdn.net/m0_37867405/article/details/80793601

四、docker中使用rabbitmq

1. 搭建和启动

使用地址:rabbitmq docker

#1. 拉去rabbitmq的镜像
docker pull hub.c.163.com/library/rabbitmq:3.6.11-management
#2. 由于rabbitmq远程访问是不允许guest的,所以启动时候需要设置一个用户名和密码
docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 hub.c.163.com/library/rabbitmq:3.6.11-management
# 提示信息:d525b1c1004f50284ca5bab76e8e5e2fea55462b72d9f923cea0da1a29e9aa9dnetstat
  • 1
  • 2
  • 3
  • 4
  • 5

在浏览器中访问:192.168.186.135:15672然后输入用户名和密码

2. java Hello world

简单的一对一生产消息

这里写图片描述

官网地址示例

1.引入 pom.xml

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.0.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.消息提供者

package com.itcloud.concurrency.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MessageProvider {

    private static final String HOST = "192.168.186.135";

    private static final int PORT = 5672; // rabbitmq端口是5672

    private static final String USERNAME = "admin";

    private static final String PASSWORD = "admin";

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" 生产消息:\'" + message + "\'");

    }

}

3.消费端

package com.itcloud.concurrency.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class MessageConsumer {

    private static final String HOST = "192.168.186.135";

    private static final int PORT = 5672; // rabbitmq端口是5672

    private static final String USERNAME = "admin";

    private static final String PASSWORD = "admin";

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费端接收消息:" + message);
            }

        };
        //true  异步接收消息
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

4.先后启动生产者和消费者

3. 一对多

官网示例

这里写图片描述

4. SpringBoot整合RabbitMQ

第一种交换模式:Direct

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

application.yml

spring:
  rabbitmq:
    host: 192.168.186.135
    port: 5672
    username: admin
    password: admin

2.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

@Configuration
public class MQConfig {

    public static final String QUEUE = "queue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

}

 

  1. 【MQSender.java】消息发送端
@Component
@Slf4j
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String msg) {
        log.info("发送消息:" + msg);
        amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    }

}

4.【MQReceiver.java】消息接收端

@Component
@Slf4j
public class MQReceiver {

    //绑定队列名称
    @RabbitListener(queues = { MQConfig.QUEUE })
    public void receive(String message) {
        log.info("springboot rabbitmq recevie message:" + message);
    }

5.简单的controller测试

    @PostMapping("/send")
    public String sendMsg(String msg) {
        mqSender.sendMsg(msg);
        return "success";
    }

第二种交换模式:Topic

:biking_woman:特点:

:ballot_box_with_check: 可以根据routing_key自由的绑定不同的队列

:ballot_box_with_check:发送端不需要知道发送到哪个队列,由routing_key去分发到队列中

2.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

    public static final String TOPIC_EXCHANGE = "topic_exchange";

    public static final String QUEUE_HELLO = "topic_hello";

    public static final String QUEUE_WORLD = "topic_world";

    public static final String ROUT_HELLO = "hello_key";
    public static final String ROUT_WORLD = "world_key";
    ...
    //=============定义两个队列========================    
    @Bean
    public Queue queueHello() {
        return new Queue(QUEUE_HELLO);
    }

    @Bean
    public Queue queueWorld() {
        return new Queue(QUEUE_WORLD);
    }
    //===================声明topinc交换模式=========================
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);

    }
    //队列绑定到交换模式上
    @Bean
    @Autowired
    public Binding bindingHello(Queue queueHello, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueHello).to(topicExchange).with(ROUT_HELLO);
    }

    @Bean
    @Autowired
    public Binding bindingWorld(Queue queueWorld, TopicExchange topicExchange) {
        return BindingBuilder.bind(queueWorld).to(topicExchange).with(ROUT_WORLD);
    }
  1. 【MQSender.java】消息发送端
    public void sendHello(String msg) {
        log.info("hello topic send messsgae:" + msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_HELLO, msg);
    }

    public void sendWorld(String msg) {
        log.info("world topic send messsgae:" + msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_WORLD, msg);
    }

4.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_HELLO)
    public void receiveHello(String msg) {
        log.info("springboot rabbitmq recevie message topicHello:" + msg);
    }

    @RabbitListener(queues = MQConfig.QUEUE_WORLD)
    public void receiveWorld(String msg) {
        log.info("springboot rabbitmq recevie message topicWorld:" + msg);
    }

5.简单的controller测试

    @PostMapping("/send/topic/hello")
    public String sendHello(String msg) {
        mqSender.sendHello(msg);
        return "success";
    }

    @PostMapping("/send/topic/world")
    public String sendWorld(String msg) {
        mqSender.sendWorld(msg);
        return "success";
    }

第三种交换:Fanout

:biking_woman: 特点:只要绑定该交换机的消费者都可以接受到消息

1.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

    public static final String FANOUT_EXCHANGE = "fanout_exchange";
    public static final String QUEUE_FANOUT = "queue_fanout";
    public static final String QUEUE_FANOUT2 = "queue_fanout2";
....
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Queue queueFanout() {
        return new Queue(QUEUE_FANOUT);
    }

    @Bean
    public Queue queueFanout2() {
        return new Queue(QUEUE_FANOUT2);
    }

    @Bean
    @Autowired
    public Binding bindingFanout(FanoutExchange fanoutExchange, Queue queueFanout) {
        return BindingBuilder.bind(queueFanout).to(fanoutExchange);
    }

    @Bean
    @Autowired
    public Binding bindingFanout2(FanoutExchange fanoutExchange, Queue queueFanout2) {
        return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
    }   

 

2.【MQSender.java】消息发送端

    public void sendFanout(String msg) {
        log.info("fanout send messsgae:" + msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", "自定义消息");
    }

3.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_FANOUT)
    public void receiveFanout(String msg) {
        log.info("springboot rabbitmq recevie message fanout:" + msg);
    }

    @RabbitListener(queues = MQConfig.QUEUE_FANOUT2)
    public void receiveFanout2(String msg) {
        log.info("springboot rabbitmq recevie message fanout2:" + msg);
    }

第四种交换模式:Headers

1.【MQConfig.java】

//======================headers交换模式============================

    public static final String HEADERS_EXCHANGE = "headers_exchange";

    public static final String QUEUE_HEADERS = "queue_headers";

    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHANGE);
    }

    @Bean
    public Queue headersQueue() {
        return new Queue(QUEUE_HEADERS, true);
    }

    @Bean
    @Autowired
    public Binding headersBind(HeadersExchange headersExchange, Queue headersQueue ) {
        Map<String, Object> headers = new HashMap<>();
        headers.put("key1", "value1");
        headers.put("key2", "value2");
        return BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match();

2.【MQSender.java】消息发送端

    public void sendHeaders(String msg) {
        log.info("headers send messsgae:" + msg);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader("key1", "value1");
        messageProperties.setHeader("key3", "value2");
        Message message = new Message(msg.getBytes(), messageProperties);
        amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", message);
    }

3.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_HEADERS)
    public void receiveHeaders(byte [] bytes) {
        log.info("springboot rabbitmq recevie message headers:" + new String(bytes));
    }

参考博客:

博客1 博客2

6. SpringCloud Stream

1. 创建生产者和消费者

1. 消息生产者:

新建【stream】springcloud项目

pom.xml

​ 其他依赖略

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
//注意导包
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;


//消息发送端接口
public interface SendMsg {

    public void send(Object obj);

}

//接口实现类
@EnableBinding(Source.class)
public class SendMsgImpl implements SendMsg {

    @Autowired
    private MessageChannel output;

    @Override
    public void send(Object obj) {
        this.output.send(MessageBuilder.withPayload(obj).build());
    }

}

【application.yml】文件

server:
  port: 8083
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment: #配置rabbimq连接环境
            spring: 
              rabbitmq:
                host: 192.168.186.135
                username: admin
                password: admin
                virtual-host: / 
      bindings: 
        output:
         destination: myExchange  #exchange名称,交换模式默认是topic
         content-type: application/json
         binder: defaultRabbit

 

//测试使用的Controller
@RestController
public class SendController {

    @Autowired
    private SendMsg sendMsg;

    @GetMapping("/send")
    public String  msgSend(String msg) {
        this.sendMsg.send(msg);
        return "success";
    }
}

2 .消息消费者

新建**【input】**springlcloud项目

//接口
public interface ReceiveMsg {

    public void receive(Message<Object> message);

}
//实现类
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMsgImpl implements ReceiveMsg {

    @Override
    @StreamListener(Sink.INPUT) 
    public void receive(Message<Object> message) {
        System.out.println("接收消息" + message.getPayload());
    }
}

 

 

application.yml

server:
  port: 8085

spring:
  cloud:
    stream:
      binders:
        defaultRabbit: 
          type: rabbit
          environment:
            spring: 
              rabbitmq:
                host: 192.168.186.135
                username: admin
                password: admin
                virtual-host: / 
      bindings: 
        input:
         destination: myExchange
         content-type: application/json
         binder: defaultRabbit

以上是关于Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合rabbitmq

SpringBootSpringBoot 整合RabbitMQ(二十)

springboot整合RabbitMQ Mqtt

SpringBoot整合RabbitMQ报错 org.springframework.amqp.AmqpIOException: java.io.IOException

SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ -fanout模式