spring boot怎么启动kafka

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot怎么启动kafka相关的知识,希望对你有一定的参考价值。

spring boot配置开发模式为什么启动会有错误 日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输 参考技术A spring boot配置开发模式为什么启动会有错误 日志采集。线上数据一般主要是落地文件或者通过socket传输给另外一个系统。这种情况下,你很难推动线上应用或服务去修改接口,直接向kafka里写数据。这时候你可能就需要flume这样的系统帮你去做传输

如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?

【中文标题】如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?【英文标题】:How can I create many kafka topics during spring-boot application start up? 【发布时间】:2019-08-15 14:45:59 【问题描述】:

我有这个配置:

@Configuration
public class KafkaTopicConfig 

    private final TopicProperties topics;

    public KafkaTopicConfig(TopicProperties topics) 
        this.topics = topics;
    

    @Bean
    public NewTopic newTopicImportCharge() 
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CHARGES.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    

    @Bean
    public NewTopic newTopicImportPayment() 
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_PAYMENTS.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    

    @Bean
    public NewTopic newTopicImportCatalog() 
        TopicProperties.Topic topic = topics.getTopicNameByType(MessageType.IMPORT_CATALOGS.name());
        return new NewTopic(topic.getTopicName(), topic.getNumPartitions(), topic.getReplicationFactor());
    

我可以向TopicProperties 添加 10 个不同的主题。而且我不想手动创建每个类似的bean。在 spring-kafka 或仅 spring 中创建所有主题是否存在某种方式?

【问题讨论】:

【参考方案1】:

直接使用管理客户端;您可以从 Boot 的KafkaAdmin 获取预先构建的属性映射。

@SpringBootApplication
public class So55336461Application 

    public static void main(String[] args) 
        SpringApplication.run(So55336461Application.class, args);
    

    @Bean
    public ApplicationRunner runner(KafkaAdmin kafkaAdmin) 
        return args -> 
            AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
            List<NewTopic> topics = new ArrayList<>();
            // build list
            admin.createTopics(topics).all().get();
        ;
    

编辑

要检查它们是否已经存在,或者是否需要增加分区,KafkaAdmin 有这个逻辑......

private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) 
    if (topics.size() > 0) 
        Map<String, NewTopic> topicNameToTopic = new HashMap<>();
        topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
        DescribeTopicsResult topicInfo = adminClient
                .describeTopics(topics.stream()
                        .map(NewTopic::name)
                        .collect(Collectors.toList()));
        List<NewTopic> topicsToAdd = new ArrayList<>();
        Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
        if (topicsToAdd.size() > 0) 
            addTopics(adminClient, topicsToAdd);
        
        if (topicsToModify.size() > 0) 
            modifyTopics(adminClient, topicsToModify);
        
    


private Map<String, NewPartitions> checkPartitions(Map<String, NewTopic> topicNameToTopic,
        DescribeTopicsResult topicInfo, List<NewTopic> topicsToAdd) 

    Map<String, NewPartitions> topicsToModify = new HashMap<>();
    topicInfo.values().forEach((n, f) -> 
        NewTopic topic = topicNameToTopic.get(n);
        try 
            TopicDescription topicDescription = f.get(this.operationTimeout, TimeUnit.SECONDS);
            if (topic.numPartitions() < topicDescription.partitions().size()) 
                if (LOGGER.isInfoEnabled()) 
                    LOGGER.info(String.format(
                        "Topic '%s' exists but has a different partition count: %d not %d", n,
                        topicDescription.partitions().size(), topic.numPartitions()));
                
            
            else if (topic.numPartitions() > topicDescription.partitions().size()) 
                if (LOGGER.isInfoEnabled()) 
                    LOGGER.info(String.format(
                        "Topic '%s' exists but has a different partition count: %d not %d, increasing "
                        + "if the broker supports it", n,
                        topicDescription.partitions().size(), topic.numPartitions()));
                
                topicsToModify.put(n, NewPartitions.increaseTo(topic.numPartitions()));
            
        
        catch (@SuppressWarnings("unused") InterruptedException e) 
            Thread.currentThread().interrupt();
        
        catch (TimeoutException e) 
            throw new KafkaException("Timed out waiting to get existing topics", e);
        
        catch (@SuppressWarnings("unused") ExecutionException e) 
            topicsToAdd.add(topic);
        
    );
    return topicsToModify;

【讨论】:

很好的答案,但是当我第二次启动应用程序时出现错误:Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'my-topic' already exists. 所以,先检查它们是否存在。我添加了管理员用来执行此操作的代码。【参考方案2】:

目前我们只能使用KafkaAdmin.NewTopics

Spring Doc

【讨论】:

以上是关于spring boot怎么启动kafka的主要内容,如果未能解决你的问题,请参考以下文章

KafkaContainer - 如何在启动()之后在 Spring Boot 中读取 kafka 容器端口作为属性 / 如何在启动之前配置 Kafka 端口

Spring Boot Kafka:由于 NoSuchBeanDefinitionException 而无法启动消费者

如何在 spring-boot 应用程序启动期间创建许多 kafka 主题?

spring boot 配置使用kafka

Spring Boot Kafka

启动spring boot报错,怎么解决