Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入

Posted MateCloud微服务

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入相关的知识,希望对你有一定的参考价值。

一、背景描述

自Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,目前最新版本为3.1.3。
自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。下面就介绍下以函数式编程的方式代替StreamListener的方法

二、再叨叨几句为什么?

来自Spring的博客文章(https://spring.io/blog/2019/10/17/spring-cloud-stream-functional-and-reactive)上面写着a functional programming model in Spring Cloud Stream (SCSt). It’s less code, less configuration. Most importantly, though, your code is completely decoupled and independent from the internals of SCSt。这有利于使用Project Reactor提供的事件流抽象(如Flux和Mono)(https://projectreactor.io/). 命令函数在每个单独的事件上触发,而reactive函数只触发一次。

三、使用方法

3.1 如何集成

官方文档目前只集成了RabbitMQ和Kafka,查看Spring Cloud Alibaba的官方示例,没有提供集成方法。所以就按照RabbitMQ的方式,尝试在RocketMQ上测试验证,发现完全可用,说明rocketmq已经实现了Stream的函数式编辑的相关方法。这样就好办,直接开搞。

3.2 添加依赖

 <dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
 </dependency>

没有带版本号,需要先依赖

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>vip.mate</groupId>
            <artifactId>mate-starter-dependencies</artifactId>
            <version>3.3.8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

3.3 写配置

spring:
  cloud:
    stream:
      bindings:
        sms-out-0:
          destination: sms-topic
          content-type: application/json
        sms-in-0:
          destination: sms-topic
          content-type: text/plain
          group: sms-group

注意:
消费者和生产者的使用方式与之前有变化:采用名称-out-数字的方式,用于生产者,名称-in-数字的方式用于消费者。

参考:https://docs.spring.io/spring-cloud-stream-binder-rabbit/docs/3.1.3/reference/html/spring-cloud-stream-binder-rabbit.html

以下摘录rabbitmq的官方示例:

	@Autowired
	private StreamBridge bridge;

	@Bean
	Consumer<List<String>> input() 
		return list -> 
			List<MyCorrelationData> results = new ArrayList<>();
			list.forEach(str -> 
				log.info("Received: " + str);
				MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
				results.add(corr);
				this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
						.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
						.build());
			);
			results.forEach(correlation -> 
				try 
					Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
					log.info(confirm + " for " + correlation.getPayload());
					if (correlation.getReturnedMessage() != null) 
						log.error("Message for " + correlation.getPayload() + " was returned ");

						// throw some exception to invoke binder retry/error handling

					
				
				catch (InterruptedException e) 
					Thread.currentThread().interrupt();
					throw new IllegalStateException(e);
				
				catch (ExecutionException | TimeoutException e) 
					throw new IllegalStateException(e);
				
		;
	

3.3 写代码

3.3.1 生产消息

package vip.mate.message.service.impl;

import lombok.AllArgsConstructor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;
import vip.mate.core.rocketmq.constant.MessageConstant;
import vip.mate.message.service.ISmsService;

/**
 * 发送短信实现类
 *
 * @author xuzhanfu
 */
@Service
@AllArgsConstructor
public class SmsServiceImpl implements ISmsService 

	private final StreamBridge streamBridge;

	/**
	 * 采用StreamBridge的发送方式
	 *
	 * @param message  短消息
	 * @link https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names
	 */
	@Override
	public void sendSms(String message) 
		streamBridge.send(MessageConstant.SMS_MESSAGE_OUTPUT, message);
	


3.3.2 消费消息

package vip.mate.message.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import java.util.function.Consumer;

/**
 * 短信消费者业务
 *
 * @author xuzhanfu
 */
@Slf4j
@Service
public class SmsConsumerService 

	/**
	 * 函数式编辑接收消息
	 *
	 * @return
	 */
	@Bean
	public Consumer<String> sms() 
		return message -> 
			log.info("接收的普通消息为:", message);
		;
	


这就是函数式编程的方式,其中方法名,要与通道名的名称一致。

四、完整源码

项目GITHUB码云
MateCloud后端源码https://github.com/matevip/matecloudhttps://gitee.com/matevip/matecloud
Artemis前端源码https://github.com/matevip/artemishttps://gitee.com/matevip/artemis

以上是关于Spring Cloud Stream 3.1.x版本,弃用@StreamListener而采用函数式编程实现RocketMQ的接入的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-stream 请求-回复消息模式

spring cloud stream

spring cloud-stream 和 spring cloud-bus 有啥区别?

Spring Cloud(12)——基于Kafka的Stream实现

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)