springcloud笔记七Stream
Posted 今夜月色很美
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud笔记七Stream相关的知识,希望对你有一定的参考价值。
stream介绍
为什么使用springcloud stream
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
Binder Implementations 绑定器
通过绑定器Binder作为中间件,实现了应用程序与消息中间件细节的解耦。
Input对应消息生产者
Output对应消息消费者
常用注解
消息生产者
pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yaml
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host:
port: 5672
username:
password:
bindings: # 服务的整合处理
output: # 通道的名字
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka
fetch-registry: true
instance:
instance-id: stream-provider8801
prefer-ip-address: true
lease-renewal-interval-in-seconds: 10
lease-expiration-duration-in-seconds: 30
启动类
添加@SpringBootApplication
controller
@RestController
public class MessageController {
@Resource
private MessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
service
package com.fox.springcloud.service.impl;
import com.fox.springcloud.service.MessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProviderImpl implements MessageProvider {
@Resource
private MessageChannel output;//变量名必须是output,应该是来自配置文件中通道的名字
@Override
public String send() {
String uuid = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(uuid).build());
return null;
}
}
启动项目
消息消费者
pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yaml
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host:
port: 5672
username:
password:
bindings: # 服务的整合处理
input: # 通道的名字
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
register-with-eureka: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka
fetch-registry: true
instance:
instance-id: stream-consumer8802
prefer-ip-address: true
lease-renewal-interval-in-seconds: 10
lease-expiration-duration-in-seconds: 30
启动类
添加@SpringBootApplication
listener
package com.fox.springcloud.listener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1接收到消息:" + message.getPayload() + "\\t port:" + port);
}
}
消息重复消费问题
不同分组对同一条消息会重复消费,同一个分组一个消息只会被一个消费者消费。
持久化
添加group后自动支持持久化,消费者重启不会丢失消息。
问题:消息无法正常发送消费
启动8801后没有如尚硅谷课程中的那样在rabbit中出现配置文件中的studyExchange,但是有一个output名字的exchange,发送消息时看overview,消息也可以发送到rabbitmq,消费者无法正常接收消息。
问题原因:
yaml文件对缩减要求严格,因此需要注意yaml文件中的key层级关系是否正确,博主由于binders和bindings是同级,不小心在bindings前面加了缩进,导致属性无法正确绑定,发生故障
以上是关于springcloud笔记七Stream的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud第二季之Stream,Sleuth学习笔记
SpringCloud Stream消息驱动设计思想以及整合rabbitmq消息队列案例--学习笔记
#yyds干货盘点# springcloud整合stream消费自己生产的消息