Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题

Posted

技术标签:

【中文标题】Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题【英文标题】:Kafka Connect Debezium with ms sql server. Key extract configuration problem 【发布时间】:2021-01-25 13:36:51 【问题描述】:

我正在使用 Debezium 和 kafka connect。我需要从下面的有效负载中提取“listid”,并能够将其分配给 kafka 消息键。使用我的连接器文件中的配置,我无法提取该值。感谢您为解决此问题提供的任何帮助。


    "before": null,
    "after": 
        "listid": 19,
        "billingid": "0",
        "userid": "test",

连接器属性

key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false


value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

transforms=unwrap,extract
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type==org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid


key.converter.schemas.enable=false
value.converter.schemas.enable=true

Sooooo 我正在继续实验,并希望分享对问题的更好解释以及我所实验的内容。

跟随 Robin 的博客https://www.confluent.fr/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/

1- 从终端运行 kafka-avro 消费者

"LIST_ID":10058

价值

"before":null,"after":"test.dbo.test.Value":"LIST_ID":10058,"billingid 
etc...

2- 我正在尝试获取值为 10058 的键

3 - 这就是我的转换配置

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
transforms=createKey,extractInt,
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=LIST_ID
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=LIST_ID

我在针对 mssql 服务器的管道中使用 debezium 连接器。

【问题讨论】:

我确实评论了confluent.fr/blog/…。当我运行 我确实评论了confluent.fr/blog/…。我修改了道具以支持文章配置。运行 kafka avro 消费者时,密钥现在显示为 "_LIST_ID":24892。有什么建议吗? 【参考方案1】:

看起来像是一个错字(extract vs extractkey):

transforms=unwrap,extractkey
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.extractkey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractkey.field=listid

【讨论】:

我进行了调整,但我仍然得到 "LIST_ID":4524 的键值诗句 4525。感谢您的回复。 尝试设置transforms.extractkey.field=LIST_ID 谢谢亚历克斯。仍然没有运气:-( 请再次显示消息密钥中的确切内容(LIST_ID_LIST_ID 嗨,Alex,我不断收到返回的密钥 "LIST_ID":1535。我无法仅提取整数值 1535 :-(

以上是关于Kafka 将 Debezium 与 ms sql 服务器连接起来。密钥提取配置问题的主要内容,如果未能解决你的问题,请参考以下文章

Debezium MS SQL Server 连接器问题

Kafka 连接 Debezium Postgres Cloud SQL

Debezium SQL Server Source Connector 设置 Kafka 代理

Kafka Connect Debezium postgres

Debezium SQL Server 连接器 Kafka 初始快照

当debezium连接器从你的sql服务器获取数据时,有没有办法限制kafka连接堆空间