springcloud Stream整合rabbitmq消息驱动生产者踩坑
Posted 帆再小也能远航
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud Stream整合rabbitmq消息驱动生产者踩坑相关的知识,希望对你有一定的参考价值。
消息驱动之生产者8801(踩坑记录)
1.首先说一下情况,我是跟着尚硅谷周阳老师的springcloud2020教程学习的,前面也踩了不少坑,但是这个坑,是我找的比较久的坑了,所以希望大家能直接看到我这个,顺顺利利学习下去哈哈哈!
2.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.atguigu.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基础配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
3.application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.213.130
port: 5672
username: admin
password: 123
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
4.主启动类
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
/**
* @author zx
* @create 2023-03-23 9:50
*/
@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801
public static void main(String[] args)
SpringApplication.run(StreamMQMain8801.class,args);
5.发送消息的接口
package com.atguigu.springcloud.service;
/**
* @author zx
* @create 2023-03-23 9:54
*/
public interface IMessageProvider
public String send();
6.接口的实现类
package com.atguigu.springcloud.service.impl;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author zx
* @create 2023-03-23 9:55
*/
@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
@Resource
private MessageChannel output;
@Override
public String send()
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial"+serial);
return null;
7.Controller层代码
package com.atguigu.springcloud.controller;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author zx
* @create 2023-03-23 10:12
*/
@RestController
public class SendMessageController
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
return messageProvider.send();
8.测试,先启动7001,再启动rabbitmq(成功登录进去),然后启动8801
发现已经成功注入进Eureka服务中心
然后发现8801的控制台打印消息(就是这个该死的错误,排除了很多原因,最后终于找到原因了)
9.解决方法(修改application.yml文件)
management:
health:
rabbit:
enabled: false
加入此配置后,能够正常运行消息生产者了。
浏览器访问
http://localhost:8801/sendMessage
10.bug原因
因为使用了spring.cloud.stream.binders.*.environment属性配置rabbitMQ的相关信息,但是没配置spring.rabbitmq。这就导致自动配置检测到类路径下有rabbit相关的类,就配置了rabbit相关的Bean
没有检测到spring.rabbitmq相关的配置,就使用了默认的配置,尝试连接localhost:5672,然后就出现了org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
具体想详细了解的话可以看一下源码(org.springframework.boot.actuate.amqp.RabbitHealthIndicator)
11.心得
遇见困难,就要解决困难,被bug挡住了很难受,但是解决了就非常的爽,加油!
以上是关于springcloud Stream整合rabbitmq消息驱动生产者踩坑的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# springcloud整合stream实现同一通道根据消息内容分发不同的消费逻辑
#yyds干货盘点# springcloud整合stream,rabbitmq实现消息驱动功能
SpringCloud-Stream3.x版本使用教程及如何整合rabbitmq
SpringCloud-Stream3.x版本使用教程及如何整合rabbitmq