SpringCloud——Stream(学习与使用)
Posted *^O^*—*^O^*
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud——Stream(学习与使用)相关的知识,希望对你有一定的参考价值。
什么是Spring Cloud Stream
Spring Cloud Stream是用于构建消息驱动微服务应用程序的框架,该框架提供了一个灵活的编程模型,提供了来自多家供应商的中间件的合理配置,包括publish-subscrbe ,消息分组和消息分区的支持。可以做到代码层面对于中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
工作原理
通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现,当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的是更换对应的Binder绑定器而不需要修改任何应用逻辑
环境准备
需要下载一个消息中间件,这里我使用的是RabbitMQ,在docker中,进行安装,可参考以下博客
https://blog.csdn.net/qq_38066812/article/details/122476744
添加依赖
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-rabbit
implementation("org.springframework.cloud:spring-cloud-stream-binder-rabbit:3.0.12.RELEASE")
入门案例
消费者
server:
port: 8002
spring:
application:
name: stream-consumer
rabbitmq:
host: 192.168.10.120
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
#消息接收通道
input:
destination: stream.message #绑定交换机的名称
eureka:
instance:
prefer-ip-address: true
instance-id: $spring.cloud.client,ip-address:$server.port
client:
service-url:
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.StreamListener
import org.springframework.cloud.stream.messaging.Sink
import org.springframework.stereotype.Component
@Component
@EnableBinding(Sink::class)
class MessageConsumer
/**
* 接收消息
*/
@StreamListener(Sink.INPUT)
fun receive(message:String)
println("message======$message")
生产者
server:
port: 8001
spring:
application:
name: stream-producer
rabbitmq:
host: 192.168.10.120
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
#消息发送通道
output:
destination: stream.message #绑定交换机的名称
eureka:
instance:
prefer-ip-address: true
instance-id: $spring.cloud.client,ip-address:$server.port
client:
service-url:
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.messaging.Source
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
@Component
@EnableBinding(Source::class)
class MessageProducer
@Autowired
lateinit var source: Source
/**
* 发送消息
*/
fun send(message:String)
source.output().send(MessageBuilder.withPayload(message).build())
import com.stream_producer.util.MessageProducer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/producer")
class TestController
@Autowired
lateinit var messageProducer : MessageProducer
@GetMapping
fun testSend(message:String)
messageProducer.send(message)
约定大于配置的写法
自定义
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface MySource
/**
* Name of the output channel.
*/
String OUTPUT = "my_exchange";
/**
* @return output channel
*/
@Output(OUTPUT)
MessageChannel output();
import com.stream_producer.channel.MySource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
@Component
@EnableBinding(MySource::class)
class MyMessageProducer
@Autowired
lateinit var source: MySource
/**
* 发送消息
*/
fun send(message:String)
source.output().send(MessageBuilder.withPayload(message).build())
只需要在所有的@Output(OUTPUT)和@Input(OUTPUT),以及监听的地方换成自己的自定义的交换机,在配置文件中,如下一部分就不需要进行配置
cloud:
stream:
bindings:
#消息发送通道
output:
destination: stream.message #绑定交换机的名称
消息分组
防止一个信息被多个消费者调用
cloud:
stream:
bindings:
#消息接收通道
input:
destination: stream.message #绑定交换机的名称
group: group-A 将该服务放入group-A 组
消息分区
当消费者将消息发送给多个消费者时,保证同一个消息始终由同一个消费者实例接收和处理,消息分区是对消息分组的一种补充
消费者
cloud:
stream:
bindings:
#消息接收通道
input:
destination: stream.message #绑定交换机的名称
consumer:
partitioned: true #开启对分区的支持
instance-count: 2 #消费者总数
instance-index: 0 #当前消费者索引
生产者
cloud:
stream:
bindings:
#消息发送通道
output:
destination: stream.message #绑定交换机的名称
producer:
partition-key-expression: payload # 配置分区键的表达式规则
partition-count: 2 # 配置消息分区的数量(消费者)
以上是关于SpringCloud——Stream(学习与使用)的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)
SpringCloud第二季之Stream,Sleuth学习笔记
SpringCloud第二季之Stream,Sleuth学习笔记