这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?

Posted

技术标签:

【中文标题】这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?【英文标题】:Are there any problems with this way of starting an infinite loop in a Spring Boot application?这种在 Spring Boot 应用程序中启动无限循环的方式有什么问题吗? 【发布时间】:2019-06-09 05:23:06 【问题描述】:

我有一个 Spring Boot 应用程序,它需要处理一些 Kafka 流数据。我向CommandLineRunner 类添加了一个无限循环,该类将在启动时运行。里面有一个可以唤醒的 Kafka 消费者。我用Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup)); 添加了一个关机钩子。我会遇到什么问题吗?在 Spring 中是否有更惯用的方式来执行此操作?我应该改用@Scheduled 吗?下面的代码去除了特定的 Kafka 实现内容,但其他方面是完整的。

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Properties;


    @Component
    public class InfiniteLoopStarter implements CommandLineRunner 

        private final Logger logger = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run(String... args) 
            Consumer<AccountKey, Account> consumer = new KafkaConsumer<>(new Properties());
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

            try 
                while (true) 
                    ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                    //process records
                
             catch (WakeupException e) 
                logger.info("Consumer woken up for exiting.");
             finally 
                consumer.close();
                logger.info("Closed consumer, exiting.");
            
        
    

【问题讨论】:

你有没有考虑过使用Spring Cloud Stream,它实际上是为处理Kafka和类似工具而设计的? @jonrsharpe 不,我没有,我已经简要地查看了特定的 Spring for Kafka 库,但还没有遇到这个。我去看看,谢谢。 【参考方案1】:

为了回答我自己的问题,我查看了像 Spring-Kafka 和 Spring Cloud Stream 这样的 Kafka 集成库,但是与 Confluent 的 Schema Registry 的集成要么没有完成,要么我不太清楚。这对于原语来说已经足够了,但我们需要它用于由模式注册表验证的类型化 Avro 对象。我现在根据Spring Boot - Best way to start a background thread on deployment 的答案实现了一个与 Kafka 无关的解决方案

最终代码如下所示:

@Component
public class AccountStreamConsumer implements DisposableBean, Runnable 

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final AccountService accountService;
    private final KafkaProperties kafkaProperties;
    private final Consumer<AccountKey, Account> consumer;

    @Autowired
    public AccountStreamConsumer(AccountService accountService, KafkaProperties kafkaProperties,
                                 ConfluentProperties confluentProperties) 

        this.accountService = accountService;
        this.kafkaProperties = kafkaProperties;

        if (!kafkaProperties.getEnabled()) 
            consumer = null;
            return;
        

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentProperties.getSchemaRegistryUrl());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSecurityProtocolConfig());
        props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSaslMechanism());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required username=\"" + kafkaProperties.getUsername() + "\" password=\"" + kafkaProperties.getPassword() + "\";");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getAccountConsumerGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(kafkaProperties.getAccountsTopicName()));

        Thread thread = new Thread(this);
        thread.start();
    

    @Override
    public void run() 
        if (!kafkaProperties.getEnabled())
            return;

        logger.debug("Started account stream consumer");
        try 
            //noinspection InfiniteLoopStatement
            while (true) 
                ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                List<Account> accounts = new ArrayList<>();
                records.iterator().forEachRemaining(record -> accounts.add(record.value()));
                if (accounts.size() != 0)
                    accountService.store(accounts);
            
         catch (WakeupException e) 
            logger.info("Account stream consumer woken up for exiting.");
         finally 
            consumer.close();
        
    

    @Override
    public void destroy() 
        if (consumer != null)
            consumer.wakeup();

        logger.info("Woke up account stream consumer, exiting.");
    

【讨论】:

【参考方案2】:

实现看起来不错,但不是为此而使用 CommandLineRunner。 CommandLineRunner 仅用于在启动时运行某些任务一次。从设计的角度来看,它不是很优雅。我宁愿使用带有kafka的spring集成适配器组件。你可以在这里找到例子https://github.com/raphaelbrugier/spring-integration-kafka-sample/blob/master/src/main/java/com/github/rbrugier/esb/consumer/Consumer.java。

【讨论】:

【参考方案3】:

我不确定你是否会在那里遇到任何问题,但它有点脏 - Spring 为使用 Kafka 提供了非常好的内置支持,所以我倾向于这样做(网上有很多关于这方面的文档,但一个不错的是:https://www.baeldung.com/spring-kafka)。

您需要以下依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

配置就像将@EnableKafka 注解添加到配置类然后设置Listener 和ConsumerFactory bean 一样简单

配置完成后,您可以轻松设置消费者,如下所示:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) 
      System.out.println("Received Message: " + message"+ "from partition: " + partition);

【讨论】:

以上是关于这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?的主要内容,如果未能解决你的问题,请参考以下文章

避免无限循环 Spring Boot

Azure App Service - Spring Boot 应用程序在启动时卡住

Postgres RDS 数据库数据库连接在星期六无限增加,导致 Spring Boot Java API 应用程序中出现“JDBCConnectionException”

spring boot怎么启动kafka

Spring Boot使用@Schedule启动、停止服务

spring-boot的三种启动方式