当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用

Posted

技术标签:

【中文标题】当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用【英文标题】:Caching doesn't work when I use Kotlin Functional Types instead of Java Functional Interface for consuming messages from Kafka 【发布时间】:2022-01-17 04:19:30 【问题描述】:

根据这篇文章https://spring.io/blog/2018/09/11/kotlin-support-in-spring-cloud-function,Spring Cloud Function 中已经添加了对 Kotlin 函数类型的支持。这意味着 Spring Cloud Function 现在可以识别与 Java 的供应商、函数或消费者之一有效匹配的 Kotlin 函数类型,并将它们视为此类。因此,例如,我们可以使用 Consumer<String>(String) -> Unit 互换。但是如果我想使用缓存机制,我似乎无法使用 Kotlin 函数类型。

这是我的应用程序示例(源代码:https://github.com/grolegor/kotlin-function-type-for-spring-cloud-function):

@SpringBootApplication
class Application 
    @Bean
    fun embeddedKafkaBroker(): EmbeddedKafkaBroker = EmbeddedKafkaBroker(1).kafkaPorts(9092)


fun main(args: Array<String>) 
    runApplication<Application>(*args)

在这里,我使用 Embedded Kafka Broker 创建了一个 bean,只是为了使该应用程序可重现。在现实世界中,我使用单独的 Kafka 集群。

ConsumerService 使用 Kotlin 函数从 Kafka 读取数据并使用 cachedSevice 进行处理:

@Service
class ConsumerService(
    private val cachedService: CachedService
) 

    fun receive(event: String) 
        cachedService.cachedMethod()
    

    // use Kotlin Functional Types instead of Java's Consumer Interface
    @Bean
    fun consumer(): (String) -> Unit = ::receive

CachedService 只有一种可缓存的方法。如果已调用此方法,则此方法将打印“缓存未命中”。因此,如果缓存机制正常工作,即使我们多次调用此函数,我们也只会看到一条这样的消息。

@Service
@EnableCaching
class CachedService 

    @Cacheable(cacheNames = ["cache"])
    fun cachedMethod(): String 
        println("cache miss")
        return "answer"
    


最后,我们有一个简单的控制器,它依赖于cachedServiceConsumerService

@RestController
class Controller(
    private val cachedService: CachedService
) 

    @GetMapping("/")
    fun get() =
        cachedService.cachedMethod()

依赖关系:

Spring Boot 2.6.1 Kotlin 1.6.0 Spring Cloud Stream Kafka 3.1.3

我的 pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kotlin-function-type-for-spring-cloud-function</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kotlin-function-type-for-spring-cloud-function</name>
    <description>kotlin-function-type-for-spring-cloud-function</description>
    <properties>
        <java.version>11</java.version>
        <kotlin.version>1.6.0</kotlin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-reflect</artifactId>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib-jdk8</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-function-kotlin</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>$project.basedir/src/main/kotlin</sourceDirectory>
        <testSourceDirectory>$project.basedir/src/test/kotlin</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-maven-plugin</artifactId>
                <configuration>
                    <args>
                        <arg>-Xjsr305=strict</arg>
                    </args>
                    <compilerPlugins>
                        <plugin>spring</plugin>
                    </compilerPlugins>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.jetbrains.kotlin</groupId>
                        <artifactId>kotlin-maven-allopen</artifactId>
                        <version>$kotlin.version</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>

</project>

当我运行这个应用程序时,我在输出中看到以下消息:

@Bean method KotlinLambdaToFunctionAutoConfiguration.kotlinToFunctionTransformerOld is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
Bean 'cachedService' of type [com.example.kotlinfunctiontypeforspringcloudfunction.CachedService] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
Bean 'consumerService' of type [com.example.kotlinfunctiontypeforspringcloudfunction.ConsumerService] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)

表示我不能使用@Cacheable等代理机制。 所以,如果我使用这个 curl 多次调用我的方法使用休息控制器

curl --location --request GET 'http://localhost:8080/'

我将多次调用cachedMethod

但如果我在 ConsumerService 中使用功能接口 Consumer 创建消费者 bean,如下所示:

@Bean
fun consumer(): Consumer<String> = Consumer(::receive)

一切都正确!

我没有看到任何关于自动代理的消息,@Cacheable 按预期工作。所以,如果我多次运行 curl 方法 cachedMethod 将只被调用一次。

【问题讨论】:

有点好奇,但是这里与 spring-cloud-function 和/或 Kotlin 有什么关系?我的意思是,你确实在使用一个函数(消费者)并从中调用你的服务。对于 spring-cloud-function 它只是另一个 bean,所以缓存与否我们不会在 spring-cloud-function 级别知道一种或另一种方式。 . .我错过了什么? @OlegZhurakousky 这个bean基于另一个用于处理事件的服务,所以消费者调用缓存的服务,一切都很好,但是当我使用kotlin函数类型而不是消费者函数接口'@Cacheable ' 停止工作,因为它变得“不符合自动代理的条件”。 我还没有完全关注。能否请您提供一个重现问题的小原型并将其推送到 github 以便我们查看 @OlegZhurakousky 不幸的是,我已经提供了一个可重现的例子,它不是那么小,但我不知道如何让它更小。我用我的例子提供了一个 repo 的链接。如果您运行此代码,您将看到当您使用 Kotlin 功能类型而不是消费者接口时,您将在日志中收到有关无法为 bean 制作代理的消息,因此“@Cacheable”不起作用(以及为“@Transactional”,因为 Spring 为此类 bean 创建了代理)。 我的错,对不起,错过了。会看看 【参考方案1】:

修复确实在 3.2.2-SNAPSHIT 但它也在 3.1.6 中,所以请将你的依赖更新到

<dependency>
    <groupId>org.springframework.cloud</groupId> 
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>3.1.6</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-kotlin</artifactId>
    <version>3.1.6</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
    <version>3.1.6</version>
</dependency>

确保 spring-cloud-function-context 是 3.1.6 并更新 Spring Boot 顶部 2.4.13

【讨论】:

以上是关于当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin返回值UnitNothing与Any

在 Kotlin 中用复杂类型覆盖 Java 方法

kotlin和java区别

Kotlin的Any 和Java的Object对比

Kotlin的Any 和Java的Object对比

你知道为什么用户选择 kotlin 而不是 java 进行后端开发吗?