LocalDateTime 的自定义 spring-kafka 反序列化器
Posted
技术标签:
【中文标题】LocalDateTime 的自定义 spring-kafka 反序列化器【英文标题】:Custom spring-kafka deserializer for LocalDateTime 【发布时间】:2019-05-07 06:20:22 【问题描述】:我正在使用 spring boot 和 spring-kafka 构建一个简单的项目,我无法对其进行配置以使其工作,它是一个生成注释(作者、内容、createddatetime、lastmodifieddatetime)并发送事件的简单应用程序根据创建时的注释。
我已经玩了 2 天,但我想我没有得到它。
这是我的配置,我很确定它有很多样板,但我一直在使用几个示例来完成我的工作。
我有 2 个生产者和消费者工厂,因为我需要一个默认工厂,这是真的吗?我是否需要为我愿意发送的每种类型的消息创建一个自定义工厂?
我的应用程序.yml
spring.datasource.url: jdbc:mysql://localhost:3306/notes
spring.datasource.username: root
spring.datasource.password:
logging.level.org.hibernate.SQL: debug
spring.jpa.database: MYSQL
spring.jpa.open-in-view: true
spring.jpa.show-sql: true
spring.data.jpa.repositories.bootstrap-mode: default
spring.jpa.database-platform: org.hibernate.dialect.MySQL5Dialect
spring.jpa.hibernate.ddl-auto: update
logging.level.org.springframework: DEBUG
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
spring.kafka.bootstrap-servers: 192.168.169.22:9092
spring.kafka.consumer.group-id: noteGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.properties.spring.json.trusted.packages: com.remusrd.notesample.domain.event
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.add.type.headers: true
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
我的制作人
package com.remusrd.notesample.service
import arrow.core.Option
import arrow.core.getOrElse
import arrow.data.NonEmptyList
import com.remusrd.notesample.data.NoteRepository
import com.remusrd.notesample.domain.Note
import com.remusrd.notesample.domain.event.NoteEvent
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
@Service
@Transactional
class JpaNoteService : NoteService
val TOPIC_NAME = "notes"
@Autowired
private lateinit var noteRepository: NoteRepository
@Autowired
private lateinit var kafkaTemplate: KafkaTemplate<String, NoteEvent>
override fun getAllNotes(): Option<NonEmptyList<Note>> =
NonEmptyList.fromList(noteRepository.findAll())
override fun createNote(note: Option<Note>) : Note
note.map
kafkaTemplate.send(TOPIC_NAME, NoteEvent.Created(it))
return note.getOrElse Note(id=0)
@Override
@Transactional(readOnly = true)
override fun getNotesByAuthor(author: String): Option<NonEmptyList<Note>>
val noteList = noteRepository.findByAuthor(author)
return NonEmptyList.fromList(noteList)
我的消费者
package com.remusrd.notesample.service
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
@Component
class createdNotesConsumer
@KafkaListener(topics = ["notes"], groupId = "noteGroup")
fun recieve(noteEvent: Message<Any>)
println("received" + noteEvent + noteEvent.javaClass)
实体
package com.remusrd.notesample.domain
import java.time.LocalDateTime
import javax.persistence.*
@Entity
@Table(name = "note")
data class Note(
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long,
val content: String = "",
val creationDate: LocalDateTime = LocalDateTime.now(),
val lastModified: LocalDateTime = LocalDateTime.now(),
val author: String = ""
)
还有我的 build.gradle
buildscript
ext
kotlinVersion = "1.3.10"
springBootVersion = "2.1.1.RELEASE"
springCloudVersion = "Greenwich.M3"
arrow_version = "0.8.1"
repositories
mavenCentral()
dependencies
classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion")
classpath("org.jetbrains.kotlin:kotlin-allopen:$kotlinVersion")
classpath("org.jetbrains.kotlin:kotlin-noarg:$kotlinVersion")
classpath("io.spring.gradle:dependency-management-plugin:1.0.2.RELEASE")
apply plugin: "kotlin"
apply plugin: "kotlin-spring"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"
apply plugin: "kotlin-allopen"
apply plugin: "kotlin-noarg"
apply plugin: "kotlin-jpa"
group "com.remusrd"
version "0.0.1-SNAPSHOT"
sourceCompatibility = 1.8
repositories
mavenCentral()
maven url 'http://repo.spring.io/milestone'
noArg
annotation("com.remusrd.notesample.domain.annotation.NoArg")
allOpen
annotation("com.remusrd.notesample.domain.annotation.Open")
dependencies
// Kotlin
implementation "org.jetbrains.kotlin:kotlin-stdlib"
implementation "org.jetbrains.kotlin:kotlin-reflect"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin"
implementation "io.arrow-kt:arrow-core:$arrow_version"
implementation "io.arrow-kt:arrow-data:$arrow_version"
// Spring Boot
implementation "org.springframework.cloud:spring-cloud-starter-netflix-eureka-client"
implementation "org.springframework.boot:spring-boot-starter-web:$springBootVersion"
implementation "org.springframework.boot:spring-boot-starter-data-jpa:$springBootVersion"
implementation "org.springframework.kafka:spring-kafka"
implementation "com.fasterxml.jackson.module:jackson-module-kotlin:2.9.7"
// BBDD
implementation "mysql:mysql-connector-java:8.0.13"
implementation "com.h2database:h2:1.4.197"
// Test
testImplementation "junit:junit:4.12"
testImplementation("org.springframework.boot:spring-boot-starter-test")
dependencyManagement
imports
mavenBom "org.springframework.boot:spring-boot-dependencies:$springBootVersion"
mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
compileKotlin
kotlinOptions
freeCompilerArgs = ["-Xjsr305=strict"]
jvmTarget = "1.8"
compileTestKotlin
kotlinOptions
freeCompilerArgs = ["-Xjsr305=strict"]
jvmTarget = "1.8"
这是我得到的踪迹
2018-12-05 16:48:56.884 ERROR 8331 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition notes-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 110, 111, 116, 101, 34, 58, 123, 34, 105, 100, 34, 58, 48, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 72, 111, 108, 97, 32, 113, 117, -61, -87, 32, 116, 97, 108, 34, 44, 34, 99, 114, 101, 97, 116, 105, 111, 110, 68, 97, 116, 101, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 108, 97, 115, 116, 77, 111, 100, 105, 102, 105, 101, 100, 34, 58, 34, 50, 48, 49, 56, 45, 49, 50, 45, 48, 53, 32, 49, 54, 58, 52, 53, 58, 53, 57, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 82, 105, 99, 104, 97, 114, 100, 34, 125, 125]] from topic [notes]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): no String-argument constructor/factory method to deserialize from String value ('2018-12-05 16:45:59')
at [Source: (byte[])""note":"id":0,"content":"yo","creationDate":"2018-12-05 16:45:59","lastModified":"2018-12-05 16:45:59","author":"Richard""; line: 1, column: 58] (through reference chain: com.remusrd.notesample.domain.event.NoteEvent$Modified["note"]->com.remusrd.notesample.domain.Note["creationDate"])
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1452) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1028) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:371) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:323) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1373) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:171) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.7.jar:2.9.7]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.7.jar:2.9.7]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:328) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) ~[kafka-clients-2.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
编辑:https://github.com/RemusRD/notesample这是repo,如果您有任何改进代码的建议,请告诉我
Edit2:这是新的 KafkaConfig
package com.remusrd.notesample.configuration
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.jackson.Jackson2ObjectMapperBuilderCustomizer
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder
import org.springframework.kafka.core.*
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.support.serializer.JsonSerializer
@Configuration
class KafkaConfig
@Autowired
lateinit var jackson2ObjectMapperBuilder: Jackson2ObjectMapperBuilder
@Autowired
lateinit var kafkaProperties: KafkaProperties
@Bean
fun kafkaTemplate(): KafkaTemplate<Any, Any>
return KafkaTemplate<Any, Any>(defaultKafkaProducerFactory())
@Bean
fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any>
val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
kafkaProperties.buildConsumerProperties(),
jsonDeserializer,
jsonDeserializer
)
kafkaConsumerFactory.setValueDeserializer(jsonDeserializer)
return kafkaConsumerFactory
@Bean
fun defaultKafkaProducerFactory(): ProducerFactory<Any, Any>
val jsonSerializer = JsonSerializer<Any>(jackson2ObjectMapperBuilder.build())
jsonSerializer.configure(kafkaProperties.buildProducerProperties(), false)
val factory = DefaultKafkaProducerFactory<Any, Any>(
kafkaProperties.buildProducerProperties(),
jsonSerializer,
jsonSerializer
)
val transactionIdPrefix = kafkaProperties.producer
.transactionIdPrefix
if (transactionIdPrefix != null)
factory.setTransactionIdPrefix(transactionIdPrefix)
return factory
【问题讨论】:
可以在GitHub上分享一些简单的Spring Boot项目,让我们玩玩和复现吗?到目前为止,查看代码,我没有发现任何问题。kafkaConsumerFactory
覆盖KafkaAutoConfiguration
中自动配置的一个,因此kafkaListenerContainerFactory
应该接受带有JavaTimeModule
的一个以进行反序列化。不清楚是什么导致使用默认的JsonDeserializer
。我可以建议删除 spring.kafka.consumer.value-deserializer
属性,但不确定这会如何导致覆盖。因此,要调查的项目会有所帮助。
在使您的简单项目正常运行后,很高兴请您为我们的 Spring for Apache Kafka 示例做出贡献:github.com/spring-projects/spring-kafka/tree/master/samples
@ArtemBilan 当然!,没问题。编辑:我刚刚将回购添加到帖子中
【参考方案1】:
您将自定义的JsonDeserializer
填充到ConsumerFactory
上的keyDeserializer
中的问题:
@Bean
fun defaultKafkaConsumerFactory(): ConsumerFactory<Any, Any>
val objectMapper = jackson2ObjectMapperBuilder.build() as ObjectMapper
objectMapper.registerModule(JavaTimeModule())
val jsonDeserializer = JsonDeserializer<Any>(objectMapper)
jsonDeserializer.configure(kafkaProperties.buildConsumerProperties(), false)
val kafkaConsumerFactory = DefaultKafkaConsumerFactory<Any, Any>(
kafkaProperties.buildConsumerProperties()
)
kafkaConsumerFactory.setKeyDeserializer(jsonDeserializer)
return kafkaConsumerFactory
使用setValueDeserializer()
或直接删除这个以支持kafkaConsumerFactory
。
完全不知道为什么你有一对ConsumerFactory
和ProducerFactory
...
【讨论】:
太棒了!让它工作。其中一个问题是您提到的问题,我还删除了每个消费者和生产者 bean 之一(我正在编辑帖子以添加它)此外,DateTimeFormatter.ISO_DATE 是不必要的。如果你不介意,我还有几个问题。 1. 将工厂配置为 1.是的,没关系。只要您处理 JSON,目标侦听器可以是预期的任何类型。 2. 不,那是相关的。生产者需要serializer
,而消费者需要deserializer
。它们是不同的对象。但是,您可以为它们使用相同的ObjectMapper
谢谢,您非常有帮助,我想再问一个问题,这有点不相关,但我无法找到有关它的信息,也许我应该在 SO 上提出另一个问题。问题是:何时以及谁应该在微服务架构中创建/发送事件?我想这取决于上下文,但是,在一个简单的 Spring Boot 应用程序中,它应该在服务层中创建吗?我的意思是,我应该调用存储库来持久化并产生事件吗?
是的,这绝对是一个新的 SO 问题,提供更多信息来确定您的问题。以上是关于LocalDateTime 的自定义 spring-kafka 反序列化器的主要内容,如果未能解决你的问题,请参考以下文章
spring-rest接口LocalDateTime转时间戳
spring-rest接口LocalDateTime转时间戳
在 Spring Boot 中反序列化多个 LocalDateTime 格式
Spring Boot 2 没有序列化 LocalDateTime