在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector
Posted
技术标签:
【中文标题】在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector【英文标题】:Getting encoded Values in JSON while using Kafka connect :JdbcSourceConnector 【发布时间】:2021-10-14 19:03:19 【问题描述】:我在独立配置中使用 Kafka Connect 从 Oracle 读取数据。
从主题读取消息时,我收到 JSON 格式的结果。我不需要在 JSON 对象中编码我的值。我该如何解决这个问题?
connect-standalone.properties
:
bootstrap.servers=********:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
jdbc-kafka-connect.properties
:
name=jdbctransconnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:oracle:thin:@**.**.*.***:1521/****?verifyServerCertificate=false&useSSL=true&requireSSL=true
connection.user=*******
connection.password=*******
query=select * from(select cast(tab1.PARAM_INSTANCE_ID as NUMERIC(11,0)) AS PARAM_INSTANCE_ID,tab1.entity_id,tab2.param_name,tab1.value from table1 tab1,table2 tab2 where tab1.param_spec_id=tab2.param_basic_spec_id)
tasks.max= 1
auto.create= true
auto.evolve= true
mode=incrementing
incrementing.column.name=PARAM_INSTANCE_ID
topic.prefix=EGPV
来自 Kafka 消费者的示例消息:
"PARAM_INSTANCE_ID":"B6frdA==",
"ENTITY_ID":"AXpUSLCacxAucbLv12PipNeV7ag2XAFudUuzxrP1xDvtlnZOuxf8KxSAAAAAAAAAAAAAAAAAAAAA",
"PARAM_NAME":"Action",
"VALUE":null
param_instance_id
和 entity_id
是源数据库中的数字列。如何在 Kafka 中获得相同的格式?
【问题讨论】:
【参考方案1】:param_instance_id 和 entity_id 是源数据库中的数字列
默认的numeric.mapping
属性没有设置,所以数字是base64编码的。
根据文档中的描述进行调整,但您可以从best_fit
开始
https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html
【讨论】:
感谢您的回复@OneCricketeer,我现在可以获得实际值。当我使用 table.whitelist 而不是查询时,我面临同样的问题,因为我的源表具有数字数据类型。这numeric.mapping 仅适用于数字数据类型。如果您知道,请建议对数字数据类型进行任何更改 我不知道这样的配置。不过,我在 JDBC 连接器中看到了一些 Github 问题,要求类似的东西。替代解决方案可能包括使用 Debezium 或 Confluent 的 Oracle CDC Source 而不是 JDBC以上是关于在使用 Kafka 连接时获取 JSON 中的编码值:JdbcSourceConnector的主要内容,如果未能解决你的问题,请参考以下文章
在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?