如何在 apache kafka 中创建主题?

Posted

技术标签:

【中文标题】如何在 apache kafka 中创建主题?【英文标题】:How to create topics in apache kafka? 【发布时间】:2016-07-26 07:17:05 【问题描述】:

在 kafka 中创建主题的最佳方式是什么?

创建主题时要定义多少个副本/分区?

在新的 producer API 中,当我尝试将消息发布到不存在的主题时,它第一次失败,然后成功发布。

我想知道,副本、分区和集群节点数之间的关系。 我们需要在发布消息之前创建主题吗?

【问题讨论】:

【参考方案1】:

您可以通过编程方式创建主题。

public class CreateTopic 
  public static void main(String[] args) throws ExecutionException, InterruptedException 
      Properties config = new Properties();
      config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      AdminClient admin = AdminClient.create(config);
      //creating new topic
      System.out.println("-- creating --");
      NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
      admin.createTopics(Collections.singleton(newTopic));

      //listing
      System.out.println("-- listing --");
      admin.listTopics().names().get().forEach(System.out::println);
  

【讨论】:

【参考方案2】:

当您启动 Kafka 代理时,您可以在 conf/server.properties 文件中定义一组属性。该文件只是键值属性文件。其中一个属性是auto.create.topics.enable,如果设置为 true(默认情况下),当您向不存在的主题发送消息时,Kafka 会自动创建主题。

您可以找到的所有配置选项都定义为here。恕我直言,创建主题的简单规则如下:副本数不能超过您拥有的节点数。主题和分区的数量不受集群中节点数量的影响

例如:

你有 9 个节点的集群 您的主题可以有 9 个分区和 9 个副本或 18 个分区和 9 个副本或 36 个分区和 9 个副本等等......

【讨论】:

会不会要求副本数等于节点数会使您的集群非常脆弱?一个节点出现故障,突然您的集群不再响应,因为它必须等待正确数量的副本。 @SethPaulson 没有等待,因为节点出现故障。在这种情况下,领导者将从“同步”副本列表中删除它,并在它返回时尝试恢复它。有关详细说明,请参阅Kafka Documentation on Replication。 @Seth Paulson 在复制因子为 3 且 acks=all 时更改 min.insync.replicas=2。不等待所有 3 个副本都启动。在这种情况下,如果有 2 个副本可用,集群将继续不间断地工作。【参考方案3】:

我想分享我最近在我的博客The Side Effect of Fetching Kafka Topic Metadata 上描述的经历,并对这里提出的某些问题给出我的答案。

1) 在 kafka 中创建主题的最佳方式是什么?我们需要在发布消息之前创建主题吗?

我认为如果我们提前知道我们将使用一个固定名称的 Kafka 主题,我们最好在写入或读取消息之前创建该主题。这通常可以通过使用 bin/kafka-topics.sh 在启动后脚本中完成,例如参见official documentation。或者我们可以使用 Kafka 0.11.0.0 中引入的KafkaAdminClient。

另一方面,我确实看到某些情况下我们需要即时生成主题名称。在这些情况下,我们将无法知道固定的主题名称,我们可以依赖“auto.create.topics.enable”属性。启用后,将自动创建一个主题。这就引出了第二个问题:

2) 当 auto.create.topics.enable 为 true 时,哪些操作会导致创建

实际上正如@Lan 已经指出的那样

如果设置为 true,当应用程序尝试生产、消费、 或获取不存在主题的元数据,Kafka 将自动 使用默认复制因子和数量创建主题 分区。

我想说得更简单:

如果为 Kafka 代理启用了自动主题创建,则每当 Kafka 代理看到特定主题名称时,如果该主题尚不存在,则会创建该主题

此外,获取元数据会自动创建主题这一事实经常被包括我在内的人们所忽视。一个具体的例子是使用 consumer.partitionFor(topic) API,如果给定的主题不存在,此方法将创建它。

任何对我上面提到的更多细节感兴趣的人,你也可以看看我自己关于同一主题的博客文章The Side Effect of Fetching Kafka Topic Metadata。

【讨论】:

【参考方案4】:

Kafka 的基本并行级别是分区。在生产者和代理端,对不同分区的写入可以完全并行完成。

注意事项

更多分区需要更多打开文件句柄 更多分区可能会增加不可用性 更多分区可能会增加端到端延迟

根据经验,将每个代理的分区数限制为100 x b x r 可能是个好主意, 其中b 是代理数量,r 是复制因子。

例如: 如果您的集群中有 9 个代理/节点,您的主题可能有

1800 个分区,3 个副本,或 900 个分区和 2 个副本

编辑:更多详情请参阅文章How to choose the number of topics/partitions in a Kafka cluster?(已从那里获取答案)

【讨论】:

感谢您的回答。当您的答案基于一个作为对原作者的礼貌时,请引用外部资源。我为你添加了链接。 关于您分享的示例,您是否假设集群中有一个主题?那么,是 1800 个分区,每个主题有 3 个副本,还是总共有 3 个副本?我猜是后者?【参考方案5】:

设置属性 auto.create.topics.enable=true 在您的 server.properties 文件中,如果您有多个代理,请对所有 server*.properties 文件执行相同操作并重新启动您的 kafka-server。 但是一定要在服务器*.propertiesnum.partitions=int中设置合适的分区数,否则以后再增加分区会出现性能问题。

【讨论】:

【参考方案6】:

分区号决定了主题的并行度,因为一个分区只能被一个消费者组中的一个消费者消费。比如一个topic只有10个partition,一个consumer group有20个consumer,那么有10个consumer是空闲的,没有收到任何消息。这个数字真的取决于你的应用程序,但 1-1000 都是合理的。

副本数由您的耐用性要求决定。对于复制因子为 N 的主题,Kafka 最多可以容忍 N-1 个服务器故障,而不会丢失任何提交到日志的消息。 3个副本是常见的配置。当然,副本号必须小于或等于您的代理号。

auto.create.topics.enable 属性控制 Kafka 何时在服务器上启用自动创建主题。如果设置为 true,当应用程序尝试为不存在的主题生成、使用或获取元数据时,Kafka 将自动创建具有默认复制因子和分区数的主题。我建议在生产中将其关闭并提前创建主题。

【讨论】:

我不确定你是否会获得在消费或获取时创建的主题,根据这个线程 - mail-archive.com/users@kafka.apache.org/msg09182.html - “主题可以由生产者自动创建,但不能消费者”。 kafka.apache.org/documentation.html#brokerconfigs 的最新文档只是说“在服务器上启用自动创建主题”,而没有说明哪些操作会导致创建。 @Lan - 您能否进一步说明您在生产环境中关闭自动创建的建议?即相同的原因?如果您可以分享详细介绍此内容的任何参考资料,那就太好了。提前致谢。 @Lalit - 虽然我不能代表 Lan,但我对他的评论的解释是关闭生产中不需要的东西。这称为硬化。这是一个最佳实践。如果您的应用程序需要动态创建主题,那么您可能会想要使用该功能。 @Nick - 感谢您的回复。我基本上只是在寻找一些标准模式或可能推动自动启用决定的因素,以及它们是技术性的还是操作性的。我正在考虑将其发布为一个问题,希望能征求社区的意见。 @Lalit - 听起来不错。我可以从昨晚的漫长夜晚告诉您,如果您尝试立即使用主题,自动创建主题会导致速度变慢。就我而言,我创建了一个动态数据管道。我怀疑这是由于没有使用与经纪人匹配的分区。默认主题创建仅使用 1 个分区(也许我做错了?)。将 meta_data_max_age_ms 降低到 5000 之类的小值,而不是默认的 300000。

以上是关于如何在 apache kafka 中创建主题?的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 Java 在 Kafka 中创建主题

我们如何使用 API 从 IDE 在 Kafka 中创建主题

如何从具有小数类型列的主题在 ksql 中创建流

如何在 kafka 中创建自定义序列化程序?

如何从kafka主题为ksqldb创建主题

如何在 Spring Boot 中从 Mongodb 读取集合数据并定期发布到 kafka 主题中