如何在 kafka sink 连接器中设置特定表?

Posted

技术标签:

【中文标题】如何在 kafka sink 连接器中设置特定表?【英文标题】:How do I set a specific table in kafka sink connector? 【发布时间】:2021-10-13 15:35:25 【问题描述】:

我正在使用融合的 kafka 连接器。 我想将数据插入到接收器连接器中的特定表 TB_TEST_KAFKA。 我已经创建了表格。

auto.create=false。

不知道sink连接器的一个properties表名key。 我正在尝试插入特定表 TB_TEST_KAFKA,

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA

但是错误

Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more

有什么办法可以放入表中??

【问题讨论】:

【参考方案1】:

JDBC Sink 连接器使用主题名称作为其填充的表的命名基础。您可以使用RegExRouter 自定义表的名称(将这些行添加到您的连接器属性):

transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA

【讨论】:

如果it(TB_TEST_KAFKA)是同义词,是否需要另外设置?? 主题名称是否为TB_TEST_KAFKA 如果您不需要实际的正则表达式模式,InsertField 转换会更高效 除了@OneCricketeer 所说的,您可以在Confluent documentation about SMTs找到更多信息 @IskuskovAlexander 我的意思是 sysnonyms 是数据库术语 (docs.microsoft.com/en-us/sql/t-sql/statements/…)

以上是关于如何在 kafka sink 连接器中设置特定表?的主要内容,如果未能解决你的问题,请参考以下文章

提取和转换 jdbc sink 连接器的 kafka 消息特定字段

为jdbc sink连接器提取和转换kafka消息的特定字段。

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

在 Kafka 连接器中设置分区策略

Kafka Sink 如何将字段映射到具有不同主题和表模式名称的数据库

无法使用镜头 kudu sink 连接器将数据从 kafka 主题插入或更新到 kudu 表