Apache kafka 生产集群设置问题

Posted

技术标签:

【中文标题】Apache kafka 生产集群设置问题【英文标题】:Apache kafka production cluster setup problems 【发布时间】:2019-05-14 06:58:00 【问题描述】:

我们一直在尝试在 AWS Linux 机器上建立一个生产级别的 Kafka 集群,直到现在我们都没有成功。

卡夫卡版本: 2.1.0

机器:

5 r5.xlarge machines for 5 Kafka brokers.
3 t2.medium zookeeper nodes
1 t2.medium node for schema-registry and related tools. (a Single instance of each)
1 m5.xlarge machine for Debezium.

默认代理配置:

num.partitions=15
min.insync.replicas=1
group.max.session.timeout.ms=2000000 
log.cleanup.policy=compact
default.replication.factor=3
zookeeper.session.timeout.ms=30000

我们的问题主要与海量数据有关。 我们正在尝试使用 debezium 将我们现有的表转移到 kafka 主题中。其中许多表非常庞大,有超过 50000000 行。

到目前为止,我们已经尝试了很多事情,但我们的集群每次都因一个或多个原因而失败。

错误计划任务“isr-expiration”中未捕获的异常(kafka.utils.KafkaScheduler) org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = /brokers/topics/__consumer_offsets/partitions/0/state 的会话已过期 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:130) 在 org.apache.zookeeper.KeeperException.create(KeeperException.java:54)..

错误2:

] INFO [Partition xxx.public.driver_operation-14 broker=3] 缓存的 zkVersion [21] 不等于 zookeeper 中的,跳过更新 ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,551] INFO [Partition xxx.public.hub-14 broker=3] 将 ISR 从 1,3 缩小到 3 (kafka.cluster.Partition) [2018-12-12 14:07:26,556] INFO [Partition xxx.public.hub-14 broker=3] 缓存的 zkVersion [3] 不等于 zookeeper 中的,跳过更新 ISR (kafka.cluster.Partition) [2018-12-12 14:07:26,556] INFO [Partition xxx.public.field_data_12_2018-7 broker=3] 将 ISR 从 1,3 缩小到 3 (kafka.cluster.Partition)

错误3:

isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=888665879, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread) java.io.IOException:在读取响应之前断开与 3 的连接 在 org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)

还有一些错误:

    代理之间经常断开连接,这可能是原因 在没有自动恢复的情况下不断缩减和扩展 ISR。 架构注册表超时。 我不知道架构注册表是如何受到影响的。我没有看到该服务器上的负载过多。我错过了什么吗?我应该为架构注册表的多个实例使用负载平衡器作为故障转移吗?。 __schemas 主题中只有 28 条消息。 确切的错误消息是 RestClientException: Register operation timed out。错误代码:50002

    有时消息传输速率超过 100000 条消息/秒,有时它会下降到 2000 条消息/秒?消息大小可能会导致这种情况?

    为了解决上面的一些问题,我们增加了broker的数量,增加了zookeeper.session.timeout.ms=30000,但我不确定它是否真的解决了我们的问题,如果解决了,是怎么解决的?

我有几个问题:

    我们的集群是否足以处理这么多数据。 我们有什么明显的遗漏吗? 如何在进入生产级别之前对我的设置进行负载测试? 什么可能导致代理和架构注册表之间的会话超时。 处理架构注册表问题的最佳方法。

我们的一位经纪人的网络负载。

请随时询问更多信息。

【问题讨论】:

2.11 听起来像是 Scala 版本,而不是 Kafka 版本。 Kafka 版本更像是 2.0.0、2.0.1 或最新的 2.1.0。 我的意思是kafka.apache.org/downloads#2.1.0 你给了 Kafka 多少堆?您是否更改了任何其他设置? 这还不够...如果您read over this section,您会注意到LinkedIn 提到了6G(以及主机本身的更多内存)。我在一个环境中,主机上有10G 和 252G 总 RAM。此外,请参阅 Confluent docs.confluent.io/current/kafka/deployment.html 的文档。另外,如果进行这样的分布式部署,可能是use Ansible 或CloudFormation 我再次将其调整为 Xmx3G,因为它对我们最有效。真正的问题似乎在于 io wait。我们的应用程序被 io wait 占用。我目前正在朝那个方向看。 【参考方案1】:

请为您的集群使用Confluent的最新官方版本。

实际上,您可以通过增加主题的分区数量并将tasks.max(当然在您的接收器连接器中)增加超过 1 来使它变得更好在您的连接器中更高效地同时工作。

请增加 Kafka-Connect 主题的数量,并使用 Kafka-Connect 分布式模式来提高您的 Kafka-connect 集群的高可用性。您可以通过在 Kafka-ConnectSchema-Registry 配置中设置复制因子的数量来实现,例如:

config.storage.replication.factor=2
status.storage.replication.factor=2
offset.storage.replication.factor=2

请为您的大桌子将topic compression 设置为snappy。它将增加主题的吞吐量,这有助于 Debezium 连接器更快地工作,并且不要使用 JSON 转换器 建议使用 Avro 转换器!

另外,请为您的 Schema Registry

使用负载平衡器

为了测试集群,您可以使用database.whitelist 创建一个只有一个表(我的意思是一个大表!)的连接器,并将snapshot.mode 设置为initial

关于模式注册表! KafkaZookeeper 的 Schema-registry 用户设置这些配置:

bootstrap.servers
kafkastore.connection.url

这就是你的 shema-registry 集群停机的原因

【讨论】:

以上是关于Apache kafka 生产集群设置问题的主要内容,如果未能解决你的问题,请参考以下文章

kafka集群配置和java编写生产者消费者操作例子

spring整合kafka项目生产和消费测试结果记录

如何使两个DC之间的kafka集群中的生产者幂等?

生产环境一键创建kafka集群

Apache Kafka核心概念

Kafka--Tomcat中的Kafka生产者认证Kerberos出现Could not find a 'KafkaClient' entry