Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题
Posted
技术标签:
【中文标题】Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题【英文标题】:Kafka Connect Debezium with ms sql server. Key extract configuration problem 【发布时间】:2021-01-25 13:36:51 【问题描述】:我正在使用 Debezium 和 kafka connect。我需要从下面的有效负载中提取“listid”,并能够将其分配给 kafka 消息键。使用我的连接器文件中的配置,我无法提取该值。感谢您为解决此问题提供的任何帮助。
"before": null,
"after":
"listid": 19,
"billingid": "0",
"userid": "test",
连接器属性
key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid
key.converter.schemas.enable=false
value.converter.schemas.enable=true
Sooooo 我正在继续实验,并希望分享对问题的更好解释以及我所实验的内容。
跟随 Robin 的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/
1- 从终端运行 kafka-avro 消费者
键
"LIST_ID":10058
价值
"before":null,"after":"test.dbo.test.Value":"LIST_ID":10058,"billingid
etc...
2- 我正在尝试获取值为 10058 的键
3 - 这就是我的转换配置
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID
我在针对 mssql 服务器的管道中使用 debezium 连接器。
【问题讨论】:
我确实评论了confluent.fr/blog/…。当我运行 我确实评论了confluent.fr/blog/…。我修改了道具以支持文章配置。运行 kafka avro 消费者时,密钥现在显示为 "_LIST_ID":24892。有什么建议吗? 【参考方案1】:看起来像是一个错字(extract
vs extractkey
):
transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid
【讨论】:
我进行了调整,但我仍然得到 "LIST_ID":4524 的键值诗句 4525。感谢您的回复。 尝试设置transforms.extractkey.field=LIST_ID
谢谢亚历克斯。仍然没有运气:-(
请再次显示消息密钥中的确切内容(LIST_ID
或_LIST_ID
)
嗨,Alex,我不断收到返回的密钥 "LIST_ID":1535。我无法仅提取整数值 1535 :-(以上是关于Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 连接 Debezium Postgres Cloud SQL
Debezium SQL Server Source Connector 设置 Kafka 代理
Kafka Connect Debezium postgres