用 spring 管理 Kafka 主题

Posted

技术标签:

【中文标题】用 spring 管理 Kafka 主题【英文标题】:Managing Kafka Topic with spring 【发布时间】:2018-11-27 07:10:44 【问题描述】:

我们计划在我们的应用程序中使用 Kafka 进行排队。我在 RabbitMQ 和 Spring 方面有一些经验。

使用 RabbitMQ 和 Spring,我们曾经在启动 spring 服务时管理队列创建。

对于 Kafka,我不确定创建主题的最佳方式是什么?有没有办法用 Spring 管理主题。

或者,我们是否应该编写一个单独的脚本来帮助创建主题?维护一个单独的脚本来创建主题对我来说似乎有点奇怪。

任何建议将不胜感激。

【问题讨论】:

kafka 1.1.0 默认将auto.create.topics.enable 设置为true,如果可以满足您的生产要求,您已经设置好了:D 是否建议在生产环境中启用它? 一般来说,我看不到保持启用它的问题,因为 99% 的时间 kafka 被配置为只能通过本地网络访问,除此之外,您可以添加身份验证,因此这取决于您的架构和安全要求 是的。它只能通过本地网络访问 我宁愿禁用它以防止应用程序意外创建错误/不必要的主题。如果它在本地网络上,pre-prod 应用程序启动可能能够访问集群?取决于您的 NW 设置/安全性,但恕我直言,禁用可能是明智的选择。 【参考方案1】:

在春季,可以在应用程序启动期间使用 bean 创建主题:

@Bean
public KafkaAdmin admin() 
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
            StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
    return new KafkaAdmin(configs);


@Bean
public NewTopic topic1() 
    return new NewTopic("foo", 10, (short) 2);

或者,您可以通过自动装配AdminClient 来编写自己的创建主题,例如从输入文件中读取列表或指定分区号等高级属性:

@Autowired
private KafkaAdmin admin;
//...your implementation

另请注意,由于 Kafka 1.1.0 默认启用auto.create.topics.enable (see Broker configs)。

更多信息参考spring-kafkadocs

【讨论】:

Spring Boot 自动配置了一个KafkaAdmin,所以你只需要在启动应用程序中添加NewTopic @Beans。【参考方案2】:

要在 Spring Boot 中自动创建 Kafka 主题,只有这个是必需的:

@Bean
public NewTopic topic1() 
    return new NewTopic("foo", 10, (short) 2);

    //foo: topic name
    //10: number of partitions
    //2: replication factor

Kafka Admin 正在由 Spring Boot 自动创建和配置。 Spring Kafka 2.3 版本引入了TopicBuilder 类,使构建主题更加流畅和直观:

@Bean
public NewTopic topic()
    return TopicBuilder.name("foo")
        .partitions(10)
        .replicas(2)
        .build();

【讨论】:

以上是关于用 spring 管理 Kafka 主题的主要内容,如果未能解决你的问题,请参考以下文章

使用 spring kafka 中的注释为每个主题单独的 Kafka 侦听器

如何在使用 Spring 创建期间配置 kafka 主题保留策略?

使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载

Kafka主题管理

如何在spring cloud stream和kafka中从同一主题发送和接收

kafka如何动态消费新增topic主题