SpringCloud学习—— SpringCloud Stream 消息驱动
Posted Johnny*
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud学习—— SpringCloud Stream 消息驱动相关的知识,希望对你有一定的参考价值。
Spring Cloud Stream消息驱动
为什么要引进Cloud Stream
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
什么是 Spring Cloud Stream
Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中的binder对象交互。
通过 配置来binding(绑定),而Spring Cloud Stream的binder对象负责消息 中间件交互。
目前仅支持RabbitMQ和Kafaka
通过 定义绑定器Binder作为中间层,实现了应用程序与 消息中间件 细节之间的隔离。
概念
Binder: 很方便的连接中间件,屏蔽差异
Channel: 通道,是队列Queue的一种抽象,在消息通讯 系统中就是实现存储 和转发的媒介,通过Channel对队列进行配置。
Stream的标准流程
编码API和常用注解
消费者cloud-stream-rabbitmq-provider8801
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
application.yml配置
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称 表示这是一个消息生产者
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com # 在信息列表时显示主机名称
prefer-ip-address: true # 访问的路径变为IP地址
主启动类:
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(SpringBootApplication.class,args);
}
}
定义接口及其实现类,该 接口拥有send()方法,该实现类用于与rabbitmq交互,不需要@Service注解
public interface MsgProviderService {
public String send();
}
package com.johnny.springcloud.service.impl;
import com.johnny.springcloud.service.MsgProviderService;
import com.netflix.discovery.converters.Auto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author Johnny Lin
* @date 2021/6/30 19:45
*/
@EnableBinding(Source.class) //定义消息的推送管道 不再是@Service
public class MsgProviderServiceImpl implements MsgProviderService {
@Resource
private MessageChannel output; //消息发送管道
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("serial" + serial);
return null;
}
}
rabbitmq的通讯是从队列中获得消息,不是通过生产者方法的return
分组消费与 持久化
依照8802,clone出来一份8803
存在的问题:
重复消费,8802和8003都收到了
导致原因: 默认分组group是不同的,流水号不一样,被认为是不同组,可以消费。
自定义配置分为 同一组,可以解决重复消费问题。这样8802/8803实现了轮询分组,每次只有一个消费者,8801模块的发的信息只能被8802或8803其中一个接收到,避免了重复消费。
持久化
无分组属性配置的会发生消息丢失,有分组属性的可以打印出MQ上的消息。
以上是关于SpringCloud学习—— SpringCloud Stream 消息驱动的主要内容,如果未能解决你的问题,请参考以下文章
Spring-Boot:Spring Cloud构建微服务架构
企业级 SpringCloud 教程 断路器(Hystrix)