如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?
Posted
技术标签:
【中文标题】如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?【英文标题】:How can I create many kafka topics during spring-boot application start up? 【发布时间】:2019-08-15 14:45:59 【问题描述】:我有这个配置:
@Configuration
public class KafkaTopicConfig
private final TopicProperties topics;
public KafkaTopicConfig(TopicProperties topics)
this.topics = topics;
@Bean
public NewTopic newTopicImportCharge()
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CHARGES.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
@Bean
public NewTopic newTopicImportPayment()
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_PAYMENTS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
@Bean
public NewTopic newTopicImportCatalog()
TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CATALOGS.name());
return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
我可以向TopicProperties
添加 10 个不同的主题。而且我不想手动创建每个类似的bean。在 spring-kafka 或仅 spring 中创建所有主题是否存在某种方式?
【问题讨论】:
【参考方案1】:直接使用管理客户端;您可以从 Boot 的KafkaAdmin
获取预先构建的属性映射。
@SpringBootApplication
public class So55336461Application
public static void main(String[] args)
SpringApplication.run(So55336461Application.class, args);
@Bean
public ApplicationRunner runner(KafkaAdmin kafkaAdmin)
return args ->
AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
List<NewTopic> topics = new ArrayList<>();
// build list
admin.createTopics(topics).all().get();
;
编辑
要检查它们是否已经存在,或者是否需要增加分区,KafkaAdmin
有这个逻辑......
private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics)
if (topics.size() > 0)
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
DescribeTopicsResult topicInfo = adminClient
.describeTopics(topics.stream()
.map(NewTopic::name)
.collect(Collectors.toList()));
List<NewTopic> topicsToAdd = new ArrayList<>();
Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
if (topicsToAdd.size() > 0)
addTopics(adminClient, topicsToAdd);
if (topicsToModify.size() > 0)
modifyTopics(adminClient, topicsToModify);
private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNameToTopic,
DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd)
Map<String, NewPartitions> topicsToModify = new HashMap<>();
topicInfo.values().forEach((n, f) ->
NewTopic topic = topicNameToTopic.get(n);
try
TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);
if (topic.numPartitions() < topicDescription.partitions().size())
if (LOGGER.isInfoEnabled())
LOGGER.info(String.format(
"Topic '%s' exists but has a different partition count: %d not %d", n,
topicDescription.partitions().size(), topic.numPartitions()));
else if (topic.numPartitions() > topicDescription.partitions().size())
if (LOGGER.isInfoEnabled())
LOGGER.info(String.format(
"Topic '%s' exists but has a different partition count: %d not %d, increasing "
+ "if the broker supports it", n,
topicDescription.partitions().size(), topic.numPartitions()));
topicsToModify.put(n, NewPartitions.increaseTo(topic.numPartitions()));
catch (@SuppressWarnings("unused") InterruptedException e)
Thread.currentThread().interrupt();
catch (TimeoutException e)
throw new KafkaException("Timed out waiting to get existing topics", e);
catch (@SuppressWarnings("unused") ExecutionException e)
topicsToAdd.add(topic);
);
return topicsToModify;
【讨论】:
很好的答案,但是当我第二次启动应用程序时出现错误:Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'my-topic' already exists.
所以,先检查它们是否存在。我添加了管理员用来执行此操作的代码。【参考方案2】:
目前我们只能使用KafkaAdmin.NewTopics
Spring Doc
【讨论】:
以上是关于如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?的主要内容,如果未能解决你的问题,请参考以下文章
如何在不依赖 MongoDB 的情况下启动 spring-boot 应用程序?
在我的 spring-boot 应用程序启动之前,我如何等待数据库容器启动
spring-boot 应用程序不会将夹具加载到多个数据源之一