升级到 Spring Boot 2.6.1 和 Spring Cloud 2021.0.0 后 Spring Cloud Stream 启动错误

Posted

技术标签:

【中文标题】升级到 Spring Boot 2.6.1 和 Spring Cloud 2021.0.0 后 Spring Cloud Stream 启动错误【英文标题】:Startup error in Spring Cloud Stream after upgrading to Spring Boot 2.6.1 and Spring Cloud 2021.0.0 【发布时间】:2022-01-09 10:31:04 【问题描述】:

我刚刚升级了一个使用 Spring Cloud Stream Kafka 生产者和消费者的 Spring Boot 应用程序

plugins 
    id("org.springframework.boot") version "2.6.1"
    ...

extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"

应用程序不再启动,但出现以下异常:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
    at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:256)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    ... 16 common frames omitted

我错过了任何升级指南还是一个错误?

制片人

@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) 

    private val logger = LoggerFactory.getLogger(javaClass)

    fun send(event: EventReceived): Mono<EventReceived> 
        return Mono.defer 
            val message = MessageBuilder.withPayload(event)
                .setHeader(MESSAGE_KEY, event.id)
                .setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
                .build()
            logger.info("Sending event ", event)
            while (eventProcessor.tryEmitNext(message).isFailure) 
                LockSupport.parkNanos(10)
            
            event.toMono()
        .subscribeOn(Schedulers.boundedElastic())
    

消费者

@Configuration
class MetricConsumer(...) 

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> 
        ...
    

【问题讨论】:

你能展示一下你的函数是什么样子的吗? 【参考方案1】:

这看起来像是 s-c-sleuth 中的一个错误。我将就此与 Marcin 进行跟进。 您能否也发布您的功能签名,需要确认一些事情吗? 同时,您可以通过将spring.sleuth.function.enabled设置为false来暂时断开sleuth的TraceFunctionAroundWrapper

【讨论】:

嗨奥列格,我更新了有关生产者和消费者的详细信息的问题。感谢您的关注。 嗨,奥列格,有什么更新吗? 我已经回答过了。您的应用程序所依赖的 spring-cloud-sleuth 中有一个错误。该错误已在 sleuth 中修复,但要到下一个版本才能使用。因此,请按照我之前的回答中所述禁用侦探。

以上是关于升级到 Spring Boot 2.6.1 和 Spring Cloud 2021.0.0 后 Spring Cloud Stream 启动错误的主要内容,如果未能解决你的问题,请参考以下文章

TypeNotPresentExceptionProxy,将Spring Boot Starter父级从2.1.8升级到2.2.1

Spring Cloud 2021.0.0 正式发布,第一个支持Spring Boot 2.6的版本!

Spring Boot 版本从 2.1.6 升级到 2.2.1 和 spring-cloud 问题

升级到 Spring Boot 2.6 时 Spring Security 和 org.springdoc.ui.SwaggerConfig 之间的循环引用

Spring Boot 2.0干货系列:Spring Boot1.5X升级到2.0指南

将 Spring Boot 升级到 2.4.1