如何通过 debezium CDC 机制反序列化从 kafka 代理收到的 BigDecimal 值?

Posted

技术标签:

【中文标题】如何通过 debezium CDC 机制反序列化从 kafka 代理收到的 BigDecimal 值?【英文标题】:How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism? 【发布时间】:2019-10-23 10:32:09 【问题描述】:

我有几个使用 Spring Boot 开发的微服务,每个都有自己的 Postgres 数据库。这些微服务通过 kafka broker 和 kafka connect 与 debezium 平台提供的 CDC 机制交换数据。我有一个微服务 A,它存储了一些具有 BigDecimal 属性的实体。另一个微服务 B 依赖于 A 存储的数据,因此它通过 kafka 主题作为消息获取数据,如下所示:

"after":"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000

系数属性是 BigDecimal,它作为 BigDecimal (4.00) 存储在微服务 A 数据库中。

4.00怎么会转成“AZA=”? “AZA=”是某种编码格式来保持 BigDecimal 精度吗?如何再次从“AZA=”升级到 4.0?

请注意,jackson 无法将 "AZA=" 的字符串值反序列化为 BigDecimal 值,但以下情况除外:

com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `java.math.BigDecimal` from String "AZA=": not a valid representation
at [Source: (String)""id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000"; line: 1, column: 60] (through reference chain: org.perfometer.performanceservice.entities.ActorTypeEntity["coefficient"])
at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1549)
at com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:911)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:922)
at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl.consumeActorTypeMessages(ActorTypeServiceImpl.java:123)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$FastClassBySpringCGLIB$$944d568c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$EnhancerBySpringCGLIB$$167173df.consumeActorTypeMessages(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

任何提示或任何帮助将不胜感激!谢谢!

【问题讨论】:

【参考方案1】:

这是 Java 的解决方案 - https://debezium.io/docs/faq/#how_to_retrieve_decimal_field_from_binary_representation

另外请查看decimal.handling.mode 选项以了解如何将BigDecimal 编码到消息中。

【讨论】:

【参考方案2】:

如果您使用的是 Java 8,则可以将 Base64 字符串转换为字节数组,然后通过一些黑魔法得到一个 bigDecimal:

BigDecimal bigDecimal = new BigDecimal(new BigInteger(Base64.getDecoder().decode("BfXhAA==")), scale);

https://github.com/confluentinc/kafka-connect-storage-cloud/issues/48#issuecomment-395206864

【讨论】:

以上是关于如何通过 debezium CDC 机制反序列化从 kafka 代理收到的 BigDecimal 值?的主要内容,如果未能解决你的问题,请参考以下文章

FlinkCDC自定义反序列化器

使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多

如何使用 Debezium (cdc) 将从 mysql 获取的更改接收到另一个 mysql db

多个表之间 CDC 事件的 Debezium 排序

成功创建 Always On SQL Server 快照后,Debezium 未跟踪 CDC

SpringBoot整合Debezium CDC同步数据至目标数据库