SpringCloud——Stream(学习与使用)

Posted *^O^*—*^O^*

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud——Stream(学习与使用)相关的知识,希望对你有一定的参考价值。

什么是Spring Cloud Stream

Spring Cloud Stream是用于构建消息驱动微服务应用程序的框架,该框架提供了一个灵活的编程模型,提供了来自多家供应商的中间件的合理配置,包括publish-subscrbe ,消息分组和消息分区的支持。可以做到代码层面对于中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

工作原理

通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件的实现,当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的是更换对应的Binder绑定器而不需要修改任何应用逻辑

环境准备

需要下载一个消息中间件,这里我使用的是RabbitMQ,在docker中,进行安装,可参考以下博客

https://blog.csdn.net/qq_38066812/article/details/122476744

添加依赖

    // https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-rabbit
    implementation("org.springframework.cloud:spring-cloud-stream-binder-rabbit:3.0.12.RELEASE")

入门案例

消费者

server:
  port: 8002

spring:
  application:
    name: stream-consumer
  rabbitmq:
    host: 192.168.10.120
    port: 5672
    username: guest
    password: guest
    virtual-host: /

  cloud:
    stream:
      bindings:
        #消息接收通道
        input:
          destination: stream.message  #绑定交换机的名称
eureka:
  instance:
    prefer-ip-address: true
    instance-id: $spring.cloud.client,ip-address:$server.port
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.StreamListener
import org.springframework.cloud.stream.messaging.Sink
import org.springframework.stereotype.Component


@Component
@EnableBinding(Sink::class)
class MessageConsumer 

    /**
     * 接收消息
     */
    @StreamListener(Sink.INPUT)
    fun receive(message:String)
        println("message======$message")
    

生产者

server:
  port: 8001

spring:
  application:
    name: stream-producer
  rabbitmq:
    host: 192.168.10.120
    port: 5672
    username: guest
    password: guest
    virtual-host: /

  cloud:
    stream:
      bindings:
        #消息发送通道
        output:
          destination: stream.message  #绑定交换机的名称
eureka:
  instance:
    prefer-ip-address: true
    instance-id: $spring.cloud.client,ip-address:$server.port
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.messaging.Source
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component


@Component
@EnableBinding(Source::class)
class MessageProducer 

    @Autowired
    lateinit var source: Source

    /**
     * 发送消息
     */
    fun send(message:String)
        source.output().send(MessageBuilder.withPayload(message).build())
    

import com.stream_producer.util.MessageProducer
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/producer")
class TestController 

    @Autowired
    lateinit var messageProducer : MessageProducer

    @GetMapping
    fun testSend(message:String)
        messageProducer.send(message)
    

约定大于配置的写法

自定义

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource 

    /**
     * Name of the output channel.
     */
    String OUTPUT = "my_exchange";

    /**
     * @return output channel
     */
    @Output(OUTPUT)
    MessageChannel output();

import com.stream_producer.channel.MySource
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component

@Component
@EnableBinding(MySource::class)
class MyMessageProducer 
    @Autowired
    lateinit var source: MySource

    /**
     * 发送消息
     */
    fun send(message:String)
        source.output().send(MessageBuilder.withPayload(message).build())
    

只需要在所有的@Output(OUTPUT)和@Input(OUTPUT),以及监听的地方换成自己的自定义的交换机,在配置文件中,如下一部分就不需要进行配置

  cloud:
    stream:
      bindings:
        #消息发送通道
        output:
          destination: stream.message  #绑定交换机的名称

消息分组

防止一个信息被多个消费者调用

  cloud:
    stream:
      bindings:
        #消息接收通道
        input:
          destination: stream.message  #绑定交换机的名称
          group: group-A 将该服务放入group-A 组

消息分区

当消费者将消息发送给多个消费者时,保证同一个消息始终由同一个消费者实例接收和处理,消息分区是对消息分组的一种补充
消费者

  cloud:
    stream:
      bindings:
        #消息接收通道
        input:
          destination: stream.message  #绑定交换机的名称
          consumer:
            partitioned: true  #开启对分区的支持
      instance-count: 2 #消费者总数
      instance-index: 0 #当前消费者索引

生产者

  cloud:
    stream:
      bindings:
        #消息发送通道
        output:
          destination: stream.message  #绑定交换机的名称
          producer:
            partition-key-expression: payload  # 配置分区键的表达式规则
            partition-count: 2   # 配置消息分区的数量(消费者)

以上是关于SpringCloud——Stream(学习与使用)的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud——Stream(学习与使用)

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

SpringCloud第二季之Stream,Sleuth学习笔记

SpringCloud第二季之Stream,Sleuth学习笔记

SpringCloud学习—— SpringCloud Stream 消息驱动

SpringCloud Stream消息驱动设计思想以及整合rabbitmq消息队列案例--学习笔记