Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入
Posted MateCloud微服务
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入相关的知识,希望对你有一定的参考价值。
一、背景描述
自Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,目前最新版本为3.1.3。
自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。下面就介绍下以函数式编程的方式代替StreamListener的方法
二、再叨叨几句为什么?
来自Spring的博客文章(https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive)上面写着a functional programming model in Spring Cloud Stream (SCSt). It’s less code, less configuration. Most importantly, though, your code is completely decoupled and independent from the internals of SCSt。这有利于使用Project Reactor提供的事件流抽象(如Flux和Mono)(https://projectreactor.io/). 命令函数在每个单独的事件上触发,而reactive函数只触发一次。
三、使用方法
3.1 如何集成
官方文档目前只集成了RabbitMQ和Kafka,查看Spring Cloud Alibaba的官方示例,没有提供集成方法。所以就按照RabbitMQ的方式,尝试在RocketMQ上测试验证,发现完全可用,说明rocketmq已经实现了Stream的函数式编辑的相关方法。这样就好办,直接开搞。
3.2 添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
没有带版本号,需要先依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>vip.mate</groupId>
<artifactId>mate-starter-dependencies</artifactId>
<version>3.3.8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
3.3 写配置
spring:
cloud:
stream:
bindings:
sms-out-0:
destination: sms-topic
content-type: application/json
sms-in-0:
destination: sms-topic
content-type: text/plain
group: sms-group
注意:
消费者和生产者的使用方式与之前有变化:采用名称-out-数字的方式,用于生产者,名称-in-数字的方式用于消费者。
以下摘录rabbitmq的官方示例:
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input()
return list ->
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str ->
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
);
results.forEach(correlation ->
try
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null)
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
catch (InterruptedException e)
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
catch (ExecutionException | TimeoutException e)
throw new IllegalStateException(e);
;
3.3 写代码
3.3.1 生产消息
package vip.mate.message.service.impl;
import lombok.AllArgsConstructor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;
import vip.mate.core.rocketmq.constant.MessageConstant;
import vip.mate.message.service.ISmsService;
/**
* 发送短信实现类
*
* @author xuzhanfu
*/
@Service
@AllArgsConstructor
public class SmsServiceImpl implements ISmsService
private final StreamBridge streamBridge;
/**
* 采用StreamBridge的发送方式
*
* @param message 短消息
* @link https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names
*/
@Override
public void sendSms(String message)
streamBridge.send(MessageConstant.SMS_MESSAGE_OUTPUT, message);
3.3.2 消费消息
package vip.mate.message.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import java.util.function.Consumer;
/**
* 短信消费者业务
*
* @author xuzhanfu
*/
@Slf4j
@Service
public class SmsConsumerService
/**
* 函数式编辑接收消息
*
* @return
*/
@Bean
public Consumer<String> sms()
return message ->
log.info("接收的普通消息为:", message);
;
这就是函数式编程的方式,其中方法名,要与通道名的名称一致。
四、完整源码
项目 | GITHUB | 码云 |
---|---|---|
MateCloud后端源码 | https://github.com/matevip/matecloud | https://gitee.com/matevip/matecloud |
Artemis前端源码 | https://github.com/matevip/artemis | https://gitee.com/matevip/artemis |
以上是关于Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入的主要内容,如果未能解决你的问题,请参考以下文章
spring cloud-stream 和 spring cloud-bus 有啥区别?
Spring Cloud(12)——基于Kafka的Stream实现