Kafka JDBC Sink Connector,批量插入值
Posted
技术标签:
【中文标题】Kafka JDBC Sink Connector,批量插入值【英文标题】:Kafka JDBC Sink Connector, insert values in batches 【发布时间】:2020-03-21 19:16:34 【问题描述】:我每秒(50000 - 100000)收到很多消息(通过 http 协议),并希望将它们保存到 PostgreSql。为此,我决定使用 Kafka JDBC Sink。
消息按一条记录保存到数据库中,而不是批量保存。我想在 PostgreSQL 中批量插入 500-1000 条记录的记录。
我在 issue 中找到了一些关于这个问题的答案:How to use batch.size?
我尝试在配置中使用相关选项,但似乎没有任何效果。
我的 Kafka JDBC Sink PostgreSql 配置 (etc/kafka-connect-jdbc/postgres.properties
):
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs
connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false
insert.mode=insert
connection.user=postgres
table.name.format=$topic
connection.password=pwd
batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000
我还为connect-distributed.properties
添加了选项:
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
虽然每个分区每秒获取超过 1000 条记录,但记录被保存到 PostgreSQL 中。
编辑:消费者选项已添加到名称正确的其他文件中
我还为etc/schema-registry/connect-avro-standalone.properties
添加了选项:
# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
【问题讨论】:
顺便说一下应该是consumer.max.poll.records
我尝试更改 max.poll.records -> consumer.max.poll.records,但收到相同的结果。
当然。我只是说,这是正确的属性名称。无论如何,记录应该在单独的查询中发送,我不确定是否有围绕批次的交易规则
在 Kafka Connect 中你可以,是的 @Miguel
@Miguel 错误的部分。搜索“可以使用相同的参数,但需要分别前缀producer.和consumer.”
【参考方案1】:
我意识到我误解了文档。记录被一一插入到数据库中。在一个事务中插入的记录数取决于batch.size
和consumer.max.poll.records
。我希望批量插入是以另一种方式实现的。我想有一个选项来插入这样的记录:
INSERT INTO table1 (First, Last)
VALUES
('Fred', 'Smith'),
('John', 'Smith'),
('Michael', 'Smith'),
('Robert', 'Smith');
但这似乎是不可能的。
【讨论】:
以上是关于Kafka JDBC Sink Connector,批量插入值的主要内容,如果未能解决你的问题,请参考以下文章
Kafka JDBC Sink Connector 在雪花中找不到表
kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)
Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?
无法将 Kafka 与 InfluxDB Sink Connector 连接