在 Kafka Connect 分布式模式下为多个主题配置连接器

Posted

技术标签:

【中文标题】在 Kafka Connect 分布式模式下为多个主题配置连接器【英文标题】:Configuring connectors for multiple topics on Kafka Connect Distributed Mode 【发布时间】:2021-12-21 13:43:43 【问题描述】:

我们有向 Kafka 发送以下内容的生产者:

topic=syslog,每天约 25,000 个事件 topic=nginx,每天约 5,000 个事件 topic=zeek.xxx.log,每天约 100,000 个事件(总计)。在最后一种情况下,有 20 个不同的 zeek 主题,例如 zeek.conn.log 和 zeek.http.log

kafka-connect-elasticsearch 实例充当消费者,将数据从 Kafka 传送到 Elasticsearch。 kafka-connect-elasticsearch 的 hello-world Sink 配置可能如下所示:

# elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=24
topics=syslog,nginx,zeek.broker.log,zeek.capture_loss.log,zeek.conn.log,zeek.dhcp.log,zeek.dns.log,zeek.files.log,zeek.http.log,zeek.known_services.log,zeek.loaded_scripts.log,zeek.notice.log,zeek.ntp.log,zeek.packet_filtering.log,zeek.software.log,zeek.ssh.log,zeek.ssl.log,zeek.status.log,zeek.stderr.log,zeek.stdout.log,zeek.weird.log,zeek.x509.log
topic.creation.enable=true
key.ignore=true
schema.ignore=true
...

并且可以使用bin/connect-standalone.sh 调用。我意识到在单个进程中执行工作时运行或尝试运行tasks.max=24 并不理想。我知道使用分布式模式会是更好的选择,但不清楚将连接器提交到分布式模式的性能最佳方式。即,

在分布式模式下,我是否仍想通过单个 API 调用仅提交单个 elasticsearch.properties?还是最好拆分多个.properties configs + 连接器(例如,一个用于 syslog,一个用于 nginx,一个用于 zeek。**)并分别提交? 我知道tasks 等于主题数 x 分区数,但什么决定了工作人员的数量? 文档中是否有任何地方介绍了针对不同主题的吞吐量明显不平衡的情况的最佳实践?

【问题讨论】:

【参考方案1】:

在分布式模式下,我还想通过一个 API 调用提交一个 elasticsearch.properties 吗?

它应该是一个 JSON 文件,但是是的。

什么决定了工人的数量?

由你决定。 JVM 使用率是您可以监控和扩展的因素之一

据我所知,没有任何文档

【讨论】:

我想我对工人到底是什么感到困惑。我知道一个任务实际上是一个线程,但是提交一个 JSON 属性文件等于一个工人吗? 编辑上述评论:我的理解是我会在多个节点上运行类似于bin/connect-distributed.sh worker.properties 的东西,同时指定相同的group.id。我假设其中每一个都是一个工作进程,然后会使用连接器属性发出 single REST API 请求。对吗? 没错。您还可以使用 Kubernetes,例如启动某个 Connect 映像的多个 pod 以运行集群,然后为 API 公开负载均衡器

以上是关于在 Kafka Connect 分布式模式下为多个主题配置连接器的主要内容,如果未能解决你的问题,请参考以下文章

在Kafka Connect中,如何连接多个kafka集群?

Kafka-Connect:在分布式模式下创建新连接器就是创建新组

在分布式模式下使用 Kafka Connect,内部主题应该存在哪里

Kafka: Connect

Kafka Connect Details 详解

使用kafka connect,将数据批量写到hdfs完整过程