当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用
Posted
技术标签:
【中文标题】当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用【英文标题】:Caching doesn't work when I use Kotlin Functional Types instead of Java Functional Interface for consuming messages from Kafka 【发布时间】:2022-01-17 04:19:30 【问题描述】:根据这篇文章https://spring.io/blog/2018/09/11/kotlin-support-in-spring-cloud-function,Spring Cloud Function 中已经添加了对 Kotlin 函数类型的支持。这意味着 Spring Cloud Function 现在可以识别与 Java 的供应商、函数或消费者之一有效匹配的 Kotlin 函数类型,并将它们视为此类。因此,例如,我们可以使用 Consumer<String>
和 (String) -> Unit
互换。但是如果我想使用缓存机制,我似乎无法使用 Kotlin 函数类型。
这是我的应用程序示例(源代码:https://github.com/grolegor/kotlin-function-type-for-spring-cloud-function):
@SpringBootApplication
class Application
@Bean
fun embeddedKafkaBroker(): EmbeddedKafkaBroker = EmbeddedKafkaBroker(1).kafkaPorts(9092)
fun main(args: Array<String>)
runApplication<Application>(*args)
在这里,我使用 Embedded Kafka Broker 创建了一个 bean,只是为了使该应用程序可重现。在现实世界中,我使用单独的 Kafka 集群。
ConsumerService
使用 Kotlin 函数从 Kafka 读取数据并使用 cachedSevice
进行处理:
@Service
class ConsumerService(
private val cachedService: CachedService
)
fun receive(event: String)
cachedService.cachedMethod()
// use Kotlin Functional Types instead of Java's Consumer Interface
@Bean
fun consumer(): (String) -> Unit = ::receive
CachedService
只有一种可缓存的方法。如果已调用此方法,则此方法将打印“缓存未命中”。因此,如果缓存机制正常工作,即使我们多次调用此函数,我们也只会看到一条这样的消息。
@Service
@EnableCaching
class CachedService
@Cacheable(cacheNames = ["cache"])
fun cachedMethod(): String
println("cache miss")
return "answer"
最后,我们有一个简单的控制器,它依赖于cachedService
和ConsumerService
:
@RestController
class Controller(
private val cachedService: CachedService
)
@GetMapping("/")
fun get() =
cachedService.cachedMethod()
依赖关系:
Spring Boot 2.6.1 Kotlin 1.6.0 Spring Cloud Stream Kafka 3.1.3我的 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kotlin-function-type-for-spring-cloud-function</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kotlin-function-type-for-spring-cloud-function</name>
<description>kotlin-function-type-for-spring-cloud-function</description>
<properties>
<java.version>11</java.version>
<kotlin.version>1.6.0</kotlin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>
</dependencies>
<build>
<sourceDirectory>$project.basedir/src/main/kotlin</sourceDirectory>
<testSourceDirectory>$project.basedir/src/test/kotlin</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<configuration>
<args>
<arg>-Xjsr305=strict</arg>
</args>
<compilerPlugins>
<plugin>spring</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-allopen</artifactId>
<version>$kotlin.version</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
当我运行这个应用程序时,我在输出中看到以下消息:
@Bean method KotlinLambdaToFunctionAutoConfiguration.kotlinToFunctionTransformerOld is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
Bean 'cachedService' of type [com.example.kotlinfunctiontypeforspringcloudfunction.CachedService] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
Bean 'consumerService' of type [com.example.kotlinfunctiontypeforspringcloudfunction.ConsumerService] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
表示我不能使用@Cacheable等代理机制。 所以,如果我使用这个 curl 多次调用我的方法使用休息控制器
curl --location --request GET 'http://localhost:8080/'
我将多次调用cachedMethod
。
但如果我在 ConsumerService
中使用功能接口 Consumer
创建消费者 bean,如下所示:
@Bean
fun consumer(): Consumer<String> = Consumer(::receive)
一切都正确!
我没有看到任何关于自动代理的消息,@Cacheable
按预期工作。所以,如果我多次运行 curl 方法 cachedMethod
将只被调用一次。
【问题讨论】:
有点好奇,但是这里与 spring-cloud-function 和/或 Kotlin 有什么关系?我的意思是,你确实在使用一个函数(消费者)并从中调用你的服务。对于 spring-cloud-function 它只是另一个 bean,所以缓存与否我们不会在 spring-cloud-function 级别知道一种或另一种方式。 . .我错过了什么? @OlegZhurakousky 这个bean基于另一个用于处理事件的服务,所以消费者调用缓存的服务,一切都很好,但是当我使用kotlin函数类型而不是消费者函数接口'@Cacheable ' 停止工作,因为它变得“不符合自动代理的条件”。 我还没有完全关注。能否请您提供一个重现问题的小原型并将其推送到 github 以便我们查看 @OlegZhurakousky 不幸的是,我已经提供了一个可重现的例子,它不是那么小,但我不知道如何让它更小。我用我的例子提供了一个 repo 的链接。如果您运行此代码,您将看到当您使用 Kotlin 功能类型而不是消费者接口时,您将在日志中收到有关无法为 bean 制作代理的消息,因此“@Cacheable”不起作用(以及为“@Transactional”,因为 Spring 为此类 bean 创建了代理)。 我的错,对不起,错过了。会看看 【参考方案1】:修复确实在 3.2.2-SNAPSHIT 但它也在 3.1.6 中,所以请将你的依赖更新到
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.1.6</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>3.1.6</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>3.1.6</version>
</dependency>
确保 spring-cloud-function-context
是 3.1.6 并更新 Spring Boot 顶部 2.4.13
【讨论】:
以上是关于当我使用 Kotlin 功能类型而不是 Java 功能接口来使用来自 Kafka 的消息时,缓存不起作用的主要内容,如果未能解决你的问题,请参考以下文章