springcloud笔记七Stream

Posted 今夜月色很美

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud笔记七Stream相关的知识,希望对你有一定的参考价值。

stream介绍

为什么使用springcloud stream


屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

Binder Implementations 绑定器

通过绑定器Binder作为中间件,实现了应用程序与消息中间件细节的解耦。

Input对应消息生产者

Output对应消息消费者

常用注解

消息生产者

pom.xml

		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

application.yaml

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
               rabbitmq:
                  host:
                  port: 5672
                  username:
                  password:
      bindings: # 服务的整合处理
        output: # 通道的名字
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
  client:
    register-with-eureka: true
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
    fetch-registry: true
  instance:
    instance-id: stream-provider8801
    prefer-ip-address: true
    lease-renewal-interval-in-seconds: 10
    lease-expiration-duration-in-seconds: 30

启动类

添加@SpringBootApplication

controller

@RestController
public class MessageController {

    @Resource
    private MessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

service

package com.fox.springcloud.service.impl;


import com.fox.springcloud.service.MessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class)//定义消息的推送管道
public class MessageProviderImpl implements MessageProvider {
    @Resource
    private MessageChannel output;//变量名必须是output,应该是来自配置文件中通道的名字

    @Override
    public String send() {
        String uuid = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(uuid).build());
        return null;
    }
}

启动项目


消息消费者

pom.xml

		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

application.yaml

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
               rabbitmq:
                  host:
                  port: 5672
                  username:
                  password:
      bindings: # 服务的整合处理
        input: # 通道的名字
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
  client:
    register-with-eureka: true
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
    fetch-registry: true
  instance:
    instance-id: stream-consumer8802
    prefer-ip-address: true
    lease-renewal-interval-in-seconds: 10
    lease-expiration-duration-in-seconds: 30

启动类

添加@SpringBootApplication

listener

package com.fox.springcloud.listener;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1接收到消息:" + message.getPayload() + "\\t port:" + port);
    }
}

消息重复消费问题

不同分组对同一条消息会重复消费,同一个分组一个消息只会被一个消费者消费。

持久化

添加group后自动支持持久化,消费者重启不会丢失消息。

问题:消息无法正常发送消费

启动8801后没有如尚硅谷课程中的那样在rabbit中出现配置文件中的studyExchange,但是有一个output名字的exchange,发送消息时看overview,消息也可以发送到rabbitmq,消费者无法正常接收消息。

问题原因:

yaml文件对缩减要求严格,因此需要注意yaml文件中的key层级关系是否正确,博主由于binders和bindings是同级,不小心在bindings前面加了缩进,导致属性无法正确绑定,发生故障

以上是关于springcloud笔记七Stream的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud第二季之Stream,Sleuth学习笔记

SpringCloud Stream消息驱动设计思想以及整合rabbitmq消息队列案例--学习笔记

SpringCloud Stream消息驱动

#yyds干货盘点# springcloud整合stream消费自己生产的消息

#yyds干货盘点# springcloud整合stream实现同一通道根据消息内容分发不同的消费逻辑

SpringCloud——Stream(学习与使用)