SpringCloud项目整合RabbitMQ

Posted 秃秃头头

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud项目整合RabbitMQ相关的知识,希望对你有一定的参考价值。

Spring Cloud是一个基于Spring框架的开源微服务框架,它提供了一套完整的微服务解决方案。而RabbitMQ是一个高度可扩展、可靠的消息中间件,它支持消息传递、消息路由、消息确认和消息持久化等功能。

在Spring Cloud中,我们可以通过使用Spring Cloud Stream模块来集成RabbitMQ。Spring Cloud Stream提供了一种简单的方式来构建消息驱动的应用程序。通过使用Spring Cloud Stream和RabbitMQ,我们可以轻松地实现异步消息传递,从而提高应用程序的可伸缩性和可靠性。

以下是使用Spring Cloud和RabbitMQ实现异步消息传递的步骤:

1.添加依赖
在pom.xml文件中添加以下依赖:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.配置RabbitMQ
在application.yml文件中添加RabbitMQ的配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3.定义消息通道
通过定义消息通道,我们可以在应用程序中定义输入和输出通道来实现异步消息传递。在Spring Cloud Stream中,我们可以通过使用@Input和@Output注释来定义消息通道。

public interface MyMessageChannels 
    String INPUT = "my-input-channel";
    String OUTPUT = "my-output-channel";

    @Input(INPUT)
    SubscribableChannel myInputChannel();

    @Output(OUTPUT)
    MessageChannel myOutputChannel();

4.发送消息
在需要发送消息的地方,我们可以通过注入MessageChannel并使用send()方法来发送消息。

@Autowired
private MessageChannel myOutputChannel;

public void sendMessage(String message) 
    myOutputChannel.send(MessageBuilder.withPayload(message).build());

5.接收消息
在需要接收消息的地方,我们可以通过注入SubscribableChannel并使用subscribe()方法来接收消息。

@Autowired
private SubscribableChannel myInputChannel;

@StreamListener(MyMessageChannels.INPUT)
public void receiveMessage(String message) 
    System.out.println("Received message: " + message);

通过使用Spring Cloud和RabbitMQ,我们可以轻松地实现异步消息传递,从而提高应用程序的可伸缩性和可靠性。

6.配置绑定器
在Spring Cloud Stream中,我们需要配置一个绑定器来连接应用程序和消息中间件之间的通道。在本例中,我们使用RabbitMQ作为消息中间件,因此我们需要配置一个RabbitMQ绑定器。

在application.yml文件中添加以下配置:

spring:
  cloud:
    stream:
      bindings:
        my-input-channel:
          destination: my-queue
          binder: rabbit
        my-output-channel:
          destination: my-queue
          binder: rabbit
      rabbit:
        bindings:
          my-input-channel:
            consumer:
              bindingRoutingKey: my-queue
          my-output-channel:
            producer:
              bindingRoutingKey: my-queue

7.使用注解处理器
在我们定义消息通道时,我们使用了@Input和@Output注释。Spring Cloud Stream提供了一个注解处理器,它可以自动创建所需的Bean,以便我们无需手动创建它们。

我们只需要在应用程序的主类上添加@EnableBinding注解,并指定我们定义的消息通道接口即可。

@SpringBootApplication
@EnableBinding(MyMessageChannels.class)
public class MyApplication 
    public static void main(String[] args) 
        SpringApplication.run(MyApplication.class, args);
    

现在,我们已经成功地集成了RabbitMQ和Spring Cloud,可以实现异步消息传递了。
当我们发送消息时,消息将被发送到名为“my-queue”的RabbitMQ队列中。当我们应用程序中定义的SubscribableChannel上有消息到达时,@StreamListener方法将被调用,从而处理接收到的消息。通过使用此方法,我们可以实现对消息的处理。

8.处理消息
我们可以通过使用@StreamListener注解来处理接收到的消息。我们可以在任何类中定义一个或多个带有@StreamListener注解的方法,并指定接收的消息通道。
在我们的示例中,我们已经定义了一个名为“myInputChannel”的输入通道,并在MyMessageChannels接口中定义了它。我们可以使用@StreamListener注解来处理此通道上接收到的消息。

@StreamListener(MyMessageChannels.INPUT)
public void handleMessage(String message) 
    System.out.println("Received message: " + message);

在上面的示例中,我们使用@StreamListener注解来处理名为“myInputChannel”的输入通道上接收到的消息。当有消息到达时,handleMessage()方法将被调用,并打印接收到的消息。

9.发布消息
我们可以通过使用MessageChannel接口来发布消息。我们可以通过注入MessageChannel并使用send()方法来发布消息。

@Autowired
private MessageChannel myOutputChannel;

public void publishMessage(String message) 
    myOutputChannel.send(MessageBuilder.withPayload(message).build());

在上面的示例中,我们注入了名为“myOutputChannel”的输出通道,并在publishMessage()方法中使用它来发布消息。我们使用MessageBuilder来创建消息,然后使用MessageChannel的send()方法来发布它。

10.使用监听器
我们可以使用ApplicationListener接口来监听与Spring Cloud Stream相关的事件。例如,我们可以使用ApplicationStartedEvent来监听应用程序启动事件,并在应用程序启动时执行某些操作。

@Component
public class MyApplicationListener implements ApplicationListener<ApplicationStartedEvent> 
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) 
        System.out.println("Application started!");
    

在上面的示例中,我们实现了ApplicationListener接口,并使用@Component注解将其声明为Spring Bean。我们实现了onApplicationEvent()方法来处理ApplicationStartedEvent事件,并在应用程序启动时打印一条消息。
以上是使用Spring Cloud和RabbitMQ实现异步消息传递的步骤。我们可以通过定义消息通道、发送和接收消息以及配置绑定器来实现异步消息传递。使用Spring Cloud和RabbitMQ,我们可以轻松地构建可靠的分布式应用程序。

11.完整示例
下面是一个完整的示例,演示了如何使用Spring Cloud和RabbitMQ实现异步消息传递。

11.1 添加依赖项

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

11.2 创建消息通道接口

public interface MyMessageChannels 
    String INPUT = "my-input-channel";
    String OUTPUT = "my-output-channel";

    @Input(INPUT)
    SubscribableChannel input();

    @Output(OUTPUT)
    MessageChannel output();

11.3 配置RabbitMQ连接

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

11.4 定义消息发送方法

@Autowired
private MessageChannel myOutputChannel;

public void publishMessage(String message) 
    myOutputChannel.send(MessageBuilder.withPayload(message).build());

11.5 定义消息处理方法

@StreamListener(MyMessageChannels.INPUT)
public void handleMessage(String message) 
    System.out.println("Received message: " + message);

11.6 启用消息绑定器

@SpringBootApplication
@EnableBinding(MyMessageChannels.class)
public class MyApplication 
    public static void main(String[] args) 
        SpringApplication.run(MyApplication.class, args);
    

通过上述步骤,我们就成功地使用Spring Cloud和RabbitMQ实现了异步消息传递。在应用程序中,我们可以使用publishMessage()方法来发布消息,并使用handleMessage()方法来处理接收到的消息。同时,我们还可以使用ApplicationListener接口来监听与Spring Cloud Stream相关的事件。

#yyds干货盘点# springcloud整合stream,rabbitmq实现消息驱动功能

springcloud整合stream,rabbitmq实现消息驱动功能

1.代码实现:

创建项目stream

添加依赖

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.cxh</groupId>
<artifactId>stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>stream</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>8</java.version>
<spring-cloud-alibaba-dependencies.version>2021.1</spring-cloud-alibaba-dependencies.version>
<spring-cloud-dependencies.version>2021.0.0</spring-cloud-dependencies.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>$spring-cloud-dependencies.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>$spring-cloud-alibaba-dependencies.version</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>

监听类

@EnableBinding(Sink.class)
public class SinkReceiver

private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

@StreamListener(Sink.INPUT)
public void receive(String payload)
logger.info("Received: " + payload);


2.实现效果:

启动rabbitmq, 项目stream

打开浏览器​​http://localhost:15672/​​,使用账号密码guest登录rabbitmq, 在队列中发现消息:

测试send

#yyds干货盘点#

 查看控制台消息:

com.cxh.stream.SinkReceiver              : Received: 测试send


以上是关于SpringCloud项目整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

18.SpringCloud实战项目- 整合OpenFeign实现声明式远程调用

SpringCloud整合WebSocket实现用户监控

#yyds干货盘点# springcloud整合ribbon实现服务负载均衡

#yyds干货盘点# springcloud整合feign实现服务负载均衡,断路器

Nacos学习笔记 Nacos整合SpringCloud流程

Nacos学习笔记 Nacos整合SpringCloud流程