如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

Posted

技术标签:

【中文标题】如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]【英文标题】:How to get org.apache.kafka.connect.data.Decimal value from Kafka JSON message [duplicate] 【发布时间】:2019-11-07 08:04:07 【问题描述】:

我使用 debizium 将 postgresql 数据流式传输到 Kafka,并使用 Java 订阅 Kafka 主题。

我收到 Kafka 消息并得到一个 JSON 字符串,但无法识别一个数值。

JSON 是:


    "schema":
    
        "type": "struct",
        "fields": [
        
            "type": "struct",
            "fields": [
            
                "type": "string",
                "optional": true,
                "field": "creator"
            ,
            
                "type": "int64",
                "optional": true,
                "name": "io.debezium.time.MicroTimestamp",
                "version": 1,
                "field": "createtime"
            ,
            
                "type": "bytes",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Decimal",
                "version": 1,
                "parameters":
                
                    "scale": "5",
                    "connect.decimal.precision": "32"
                ,
                "field": "familyprice"
            ],
            "optional": true,
            "name": "pssdev.public.order.Value",
            "field": "before"
        ,
        
            "type": "struct",
            "fields": [
            
                "type": "string",
                "optional": true,
                "field": "creator"
            ,
            
                "type": "int64",
                "optional": true,
                "name": "io.debezium.time.MicroTimestamp",
                "version": 1,
                "field": "createtime"
            ,
            
                "type": "bytes",
                "optional": true,
                "name": "org.apache.kafka.connect.data.Decimal",
                "version": 1,
                "parameters":
                
                    "scale": "5",
                    "connect.decimal.precision": "32"
                ,
                "field": "familyprice"
            ],
            "optional": true,
            "name": "pssdev.public.order.Value",
            "field": "after"
        ,
        
            "type": "struct",
            "fields": [
            
                "type": "string",
                "optional": true,
                "field": "version"
            ,
            
                "type": "string",
                "optional": true,
                "field": "connector"
            ,
            
                "type": "string",
                "optional": false,
                "field": "name"
            ,
            
                "type": "string",
                "optional": false,
                "field": "db"
            ,
            
                "type": "int64",
                "optional": true,
                "field": "ts_usec"
            ,
            
                "type": "int64",
                "optional": true,
                "field": "txId"
            ,
            
                "type": "int64",
                "optional": true,
                "field": "lsn"
            ,
            
                "type": "string",
                "optional": true,
                "field": "schema"
            ,
            
                "type": "string",
                "optional": true,
                "field": "table"
            ,
            
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            ,
            
                "type": "boolean",
                "optional": true,
                "field": "last_snapshot_record"
            ,
            
                "type": "int64",
                "optional": true,
                "field": "xmin"
            ],
            "optional": false,
            "name": "io.debezium.connector.postgresql.Source",
            "field": "source"
        ,
        
            "type": "string",
            "optional": false,
            "field": "op"
        ,
        
            "type": "int64",
            "optional": true,
            "field": "ts_ms"
        ],
        "optional": false,
        "name": "pssdev.public.order.Envelope"
    ,
    "payload":
    
        "before":
        
            "creator": null,
            "createtime": null,
            "familyprice": null
        ,
        "after":
        
            "creator": "USER1E",
            "createtime": 1554292597815000,
            "familyprice": "W42A"
        ,
        "source":
        
            "version": "0.9.5.Final",
            "connector": "postgresql",
            "name": "pssdev",
            "db": "pf",
            "ts_usec": 1561459811737920,
            "txId": 771604,
            "lsn": 88282458880,
            "schema": "public",
            "table": "order",
            "snapshot": false,
            "last_snapshot_record": null,
            "xmin": null
        ,
        "op": "u",
        "ts_ms": 1561459811747
    

familyprice 值为W42A 我不知道如何转换它。

数据库中familyprice的实际值为60.00000,列类型为numeric(32,5)

【问题讨论】:

【参考方案1】:

有关 Java 客户端,请参阅 https://debezium.io/docs/faq/#how_to_retrieve_decimal_field_from_binary_representation。

您还可以将 decimal.handling.mode 设置为不同的值,这样您就可以将 Decimal 接收为字符串或双精度值(如果对您来说更方便)。

【讨论】:

这就是我需要的。非常感谢。

以上是关于如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector

Kafka Connect:将消息从字节转换为 Json

如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]

需要使用 Kafka Connect 将小型 JSON 消息从 Kafka 移动到 HDFS,但不使用 Confluent 库,如果不是完全免费的话

从源码分析如何优雅的使用 Kafka 生产者

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中