在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector

Posted

技术标签:

【中文标题】在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector【英文标题】:Getting encoded Values in JSON while using Kafka connect :JdbcSourceConnector 【发布时间】:2021-10-14 19:03:19 【问题描述】:

我在独立配置中使用 Kafka Connect 从 Oracle 读取数据。

从主题读取消息时,我收到 JSON 格式的结果。我不需要在 JSON 对象中编码我的值。我该如何解决这个问题?

connect-standalone.properties:

bootstrap.servers=********:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

jdbc-kafka-connect.properties:

name=jdbctransconnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@**.**.*.***:1521/****?verifyServerCertificate=false&useSSL=true&requireSSL=true
connection.user=*******
connection.password=*******
query=select * from(select cast(tab1.PARAM_INSTANCE_ID as NUMERIC(11,0)) AS PARAM_INSTANCE_ID,tab1.entity_id,tab2.param_name,tab1.value from table1 tab1,table2 tab2 where tab1.param_spec_id=tab2.param_basic_spec_id)
tasks.max= 1
auto.create= true
auto.evolve= true
mode=incrementing
incrementing.column.name=PARAM_INSTANCE_ID
topic.prefix=EGPV

来自 Kafka 消费者的示例消息:


"PARAM_INSTANCE_ID":"B6frdA==",
"ENTITY_ID":"AXpUSLCacxAucbLv12PipNeV7ag2XAFudUuzxrP1xDvtlnZOuxf8KxSAAAAAAAAAAAAAAAAAAAAA",
"PARAM_NAME":"Action",
"VALUE":null

param_instance_identity_id 是源数据库中的数字列。如何在 Kafka 中获得相同的格式?

【问题讨论】:

【参考方案1】:

param_instance_id 和 entity_id 是源数据库中的数字列

默认的numeric.mapping属性没有设置,所以数字是base64编码的。

根据文档中的描述进行调整,但您可以从best_fit开始

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html

【讨论】:

感谢您的回复@OneCricketeer,我现在可以获得实际值。当我使用 table.whitelist 而不是查询时,我面临同样的问题,因为我的源表具有数字数据类型。这numeric.mapping 仅适用于数字数据类型。如果您知道,请建议对数字数据类型进行任何更改 我不知道这样的配置。不过,我在 JDBC 连接器中看到了一些 Github 问题,要求类似的东西。替代解决方案可能包括使用 Debezium 或 Confluent 的 Oracle CDC Source 而不是 JDBC

以上是关于在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector的主要内容,如果未能解决你的问题,请参考以下文章

kafka s3融合连接器 - 将json上传为字符串

在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?

kafka连接器不会自动启动

Kafka Connect:将消息从字节转换为 Json

MongoDB Sink 连接器:Apache Kafka 中的消息被截断

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败