无法使用泛型使用 Jackson 反序列化动态 json

Posted

技术标签:

【中文标题】无法使用泛型使用 Jackson 反序列化动态 json【英文标题】:Unable to deserialise dynamic json with Jackson using generics 【发布时间】:2021-05-28 18:16:21 【问题描述】:

我正在尝试使用 Jackson 解析 Java 中的 Debezium CDC 消息。但是在进行反序列化时,我遇到了演员表异常。我使用泛型,因为对象是动态的,并且会在不同的 kafka 主题上发生变化,因为每个表都有不同的主题是 mysql

Json 输入

"schema":"type":"struct","fields":["type":"struct","fields":["type":"int32","optional":false,"field":"id","type":"string","optional":false,"field":"first_name","type":"string","optional":false,"field":"last_name","type":"string","optional":false,"field":"email"],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before","type":"struct","fields":["type":"int32","optional":false,"field":"id","type":"string","optional":false,"field":"first_name","type":"string","optional":false,"field":"last_name","type":"string","optional":false,"field":"email"],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after","type":"struct","fields":["type":"string","optional":false,"field":"version","type":"string","optional":false,"field":"connector","type":"string","optional":false,"field":"name","type":"int64","optional":false,"field":"ts_ms","type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":"allowed":"true,last,false","default":"false","field":"snapshot","type":"string","optional":false,"field":"db","type":"string","optional":true,"field":"table","type":"int64","optional":false,"field":"server_id","type":"string","optional":true,"field":"gtid","type":"string","optional":false,"field":"file","type":"int64","optional":false,"field":"pos","type":"int32","optional":false,"field":"row","type":"int64","optional":true,"field":"thread","type":"string","optional":true,"field":"query"],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source","type":"string","optional":false,"field":"op","type":"int64","optional":true,"field":"ts_ms","type":"struct","fields":["type":"string","optional":false,"field":"id","type":"int64","optional":false,"field":"total_order","type":"int64","optional":false,"field":"data_collection_order"],"optional":true,"field":"transaction"],"optional":false,"name":"dbserver1.inventory.customers.Envelope","payload":"before":"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org","after":"id":1004,"first_name":"Anne","last_name":"old and new","email":"annek@noanswer.org","source":"version":"1.4.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1614335758000,"snapshot":"false","db":"inventory","table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2150,"row":0,"thread":2,"query":null,"op":"u","ts_ms":1614335758726,"transaction":null

根pojo

@JsonIgnoreProperties(ignoreUnknown = true)
public class DebeziumCDCMessage<S,D> 
    
    private S schema;
    private DebeziumPayload<D>  payload;
    
    @JsonCreator
    DebeziumCDCMessage( @JsonProperty("schema") S _schema,
                        @JsonProperty("payload") DebeziumPayload<D>  _payload)
        this.schema=_schema;
        this.payload=_payload;
    

    public S getSchema() 
        return schema;
    

    public void setSchema(S schema) 
        this.schema = schema;
    

    public DebeziumPayload<D> getPayload() 
        return payload;
    

    public void setPayload(DebeziumPayload<D> payload) 
        this.payload = payload;
    
    


DebeziumPayload Pojo

@JsonIgnoreProperties(ignoreUnknown = true)
public class DebeziumPayload<D> 

    private D before;
    private D after;
    private String op;
    private String ts_ms;
    private Object source;
    @JsonCreator
    DebeziumPayload(    @JsonProperty("before") D _before,
                        @JsonProperty("after") D _after,
                        @JsonProperty("op") String _op,
                        @JsonProperty("ts_ms") String _ts_ms,
                        @JsonProperty("source") Object _source
                         )
        this.before=_before;
        this.after=_after; 
        this.op=_op;
        this.ts_ms=_ts_ms;
        this.setSource(_source);
    

    public D getBefore() 
        return before;
    

    public void setBefore(D before) 
        this.before = before;
    

    public D getAfter() 
        return after;
    

    public void setAfter(D after) 
        this.after = after;
    

    public String getOp() 
        return op;
    

    public void setOp(String op) 
        this.op = op;
    

    public String getTs_ms() 
        return ts_ms;
    

    public void setTs_ms(String ts_ms) 
        this.ts_ms = ts_ms;
    

    public Object getSource() 
        return source;
    

    public void setSource(Object source) 
        this.source = source;
    
    
    

Pojo 前后(仅适用于共享 JSON,每个主题会有不同的目标对象)

@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer 

    private Integer id;
    private String first_name;
    private String last_name;
    private String email;
    
    @JsonCreator
    Customer(@JsonProperty("id") Integer _id,
            @JsonProperty("first_name") String _first_name,
            @JsonProperty("last_name") String _last_name,
            @JsonProperty("email") String _email)
        this.id=_id;
        this.first_name=_first_name;
        this.last_name=_last_name;
        this.email=_email;
    

    public Integer getId() 
        return id;
    

    public void setId(Integer id) 
        this.id = id;
    

    public String getFirst_name() 
        return first_name;
    

    public void setFirst_name(String first_name) 
        this.first_name = first_name;
    

    public String getLast_name() 
        return last_name;
    

    public void setLast_name(String last_name) 
        this.last_name = last_name;
    

    public String getEmail() 
        return email;
    

    public void setEmail(String email) 
        this.email = email;
    
    
    

最终的反序列化代码

@Service
public class CustomersCDCConsumer 

    @SuppressWarnings("unchecked")
    @KafkaListener(topics = "dbserver1.inventory.customers", groupId = "group_id")
    public void listenGroupFoo(String message) 
        try 
            DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, DebeziumCDCMessage.class);
            DebeziumPayload<Customer> customer=respo.getPayload();
            System.out.println("data as recieved="+customer.getAfter().getLast_name());
         catch (JsonMappingException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
         catch (JsonProcessingException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        
    


错误,我在运行代码时遇到错误。

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.licious.kafa2sfwrapper.kafkacosumers.CustomersCDCConsumer.listenGroupFoo(java.lang.String)' threw exception; nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6); nested exception is java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2114) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2001) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.licious.kafa2sfwrapper.model.tables.Customer (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.licious.kafa2sfwrapper.model.tables.Customer is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @5010be6)
    at com.licious.kafa2sfwrapper.kafkacosumers.CustomersCDCConsumer.listenGroupFoo(CustomersCDCConsumer.java:22) ~[classes!/:0.0.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.3.4.jar!/:5.3.4]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.3.4.jar!/:5.3.4]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:330) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.6.6.jar!/:2.6.6]
    ... 8 common frames omitted

【问题讨论】:

【参考方案1】:

好的,我已经很接近了,只需要告诉杰克逊泛型目标类的类型信息。基本上不得不更换

 DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, DebeziumCDCMessage.class);

DebeziumCDCMessage<Object,Customer> respo=new ObjectMapper().readValue(message, new TypeReference<DebeziumCDCMessage<Object,Customer>>() );

【讨论】:

【参考方案2】:

代替通用数据类型,

我建议你为不同的消息创建一个包含公共字段和多个子 pojo 类的基类。

在基类上添加:

@JsonSubTypes

注解并指定其中的所有子类,这样jackson会找到合适的子时间并反序列化json。

您可以在此处找到示例:https://www.tutorialspoint.com/jackson_annotations/jackson_annotations_jsonsubtypes.htm

【讨论】:

“before”、“after”、“schema”对于每个函数将具有不同的结构,并且不是已知类型的子类。结构将根据使用 Debezium 监控的不同表而改变

以上是关于无法使用泛型使用 Jackson 反序列化动态 json的主要内容,如果未能解决你的问题,请参考以下文章

Jackson序列化(8)— 支持泛型的反序列化

Jackson反序列化泛型List(使用JavaType将json字符串转换成泛型List)

json之jackson序列化反序列化探究(二)

json之jackson的常用全局配置以及自定义序列化规则

无法使用 Jackson XML 直接在根元素内反序列化列表

Jackson 反序列化期间属性的动态过滤