如何在 kafka sink 连接器中设置特定表?
Posted
技术标签:
【中文标题】如何在 kafka sink 连接器中设置特定表?【英文标题】:How do I set a specific table in kafka sink connector? 【发布时间】:2021-10-13 15:35:25 【问题描述】:我正在使用融合的 kafka 连接器。 我想将数据插入到接收器连接器中的特定表 TB_TEST_KAFKA。 我已经创建了表格。
auto.create=false。
不知道sink连接器的一个properties表名key。 我正在尝试插入特定表 TB_TEST_KAFKA,
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA
但是错误
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more
有什么办法可以放入表中??
【问题讨论】:
【参考方案1】:JDBC Sink 连接器使用主题名称作为其填充的表的命名基础。您可以使用RegExRouter
自定义表的名称(将这些行添加到您的连接器属性):
transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA
【讨论】:
如果it(TB_TEST_KAFKA)是同义词,是否需要另外设置?? 主题名称是否为TB_TEST_KAFKA
?
如果您不需要实际的正则表达式模式,InsertField
转换会更高效
除了@OneCricketeer 所说的,您可以在Confluent documentation about SMTs找到更多信息
@IskuskovAlexander 我的意思是 sysnonyms 是数据库术语 (docs.microsoft.com/en-us/sql/t-sql/statements/…)以上是关于如何在 kafka sink 连接器中设置特定表?的主要内容,如果未能解决你的问题,请参考以下文章
提取和转换 jdbc sink 连接器的 kafka 消息特定字段
为jdbc sink连接器提取和转换kafka消息的特定字段。
如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?