这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?
Posted
技术标签:
【中文标题】这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?【英文标题】:Are there any problems with this way of starting an infinite loop in a Spring Boot application?这种在 Spring Boot 应用程序中启动无限循环的方式有什么问题吗? 【发布时间】:2019-06-09 05:23:06 【问题描述】:我有一个 Spring Boot 应用程序,它需要处理一些 Kafka 流数据。我向CommandLineRunner
类添加了一个无限循环,该类将在启动时运行。里面有一个可以唤醒的 Kafka 消费者。我用Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
添加了一个关机钩子。我会遇到什么问题吗?在 Spring 中是否有更惯用的方式来执行此操作?我应该改用@Scheduled
吗?下面的代码去除了特定的 Kafka 实现内容,但其他方面是完整的。
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Properties;
@Component
public class InfiniteLoopStarter implements CommandLineRunner
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void run(String... args)
Consumer<AccountKey, Account> consumer = new KafkaConsumer<>(new Properties());
Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));
try
while (true)
ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
//process records
catch (WakeupException e)
logger.info("Consumer woken up for exiting.");
finally
consumer.close();
logger.info("Closed consumer, exiting.");
【问题讨论】:
你有没有考虑过使用Spring Cloud Stream,它实际上是为处理Kafka和类似工具而设计的? @jonrsharpe 不,我没有,我已经简要地查看了特定的 Spring for Kafka 库,但还没有遇到这个。我去看看,谢谢。 【参考方案1】:为了回答我自己的问题,我查看了像 Spring-Kafka 和 Spring Cloud Stream 这样的 Kafka 集成库,但是与 Confluent 的 Schema Registry 的集成要么没有完成,要么我不太清楚。这对于原语来说已经足够了,但我们需要它用于由模式注册表验证的类型化 Avro 对象。我现在根据Spring Boot - Best way to start a background thread on deployment 的答案实现了一个与 Kafka 无关的解决方案
最终代码如下所示:
@Component
public class AccountStreamConsumer implements DisposableBean, Runnable
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final AccountService accountService;
private final KafkaProperties kafkaProperties;
private final Consumer<AccountKey, Account> consumer;
@Autowired
public AccountStreamConsumer(AccountService accountService, KafkaProperties kafkaProperties,
ConfluentProperties confluentProperties)
this.accountService = accountService;
this.kafkaProperties = kafkaProperties;
if (!kafkaProperties.getEnabled())
consumer = null;
return;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentProperties.getSchemaRegistryUrl());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSecurityProtocolConfig());
props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSaslMechanism());
props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required username=\"" + kafkaProperties.getUsername() + "\" password=\"" + kafkaProperties.getPassword() + "\";");
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getAccountConsumerGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(kafkaProperties.getAccountsTopicName()));
Thread thread = new Thread(this);
thread.start();
@Override
public void run()
if (!kafkaProperties.getEnabled())
return;
logger.debug("Started account stream consumer");
try
//noinspection InfiniteLoopStatement
while (true)
ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
List<Account> accounts = new ArrayList<>();
records.iterator().forEachRemaining(record -> accounts.add(record.value()));
if (accounts.size() != 0)
accountService.store(accounts);
catch (WakeupException e)
logger.info("Account stream consumer woken up for exiting.");
finally
consumer.close();
@Override
public void destroy()
if (consumer != null)
consumer.wakeup();
logger.info("Woke up account stream consumer, exiting.");
【讨论】:
【参考方案2】:实现看起来不错,但不是为此而使用 CommandLineRunner。 CommandLineRunner 仅用于在启动时运行某些任务一次。从设计的角度来看,它不是很优雅。我宁愿使用带有kafka的spring集成适配器组件。你可以在这里找到例子https://github.com/raphaelbrugier/spring-integration-kafka-sample/blob/master/src/main/java/com/github/rbrugier/esb/consumer/Consumer.java。
【讨论】:
【参考方案3】:我不确定你是否会在那里遇到任何问题,但它有点脏 - Spring 为使用 Kafka 提供了非常好的内置支持,所以我倾向于这样做(网上有很多关于这方面的文档,但一个不错的是:https://www.baeldung.com/spring-kafka)。
您需要以下依赖项:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
配置就像将@EnableKafka
注解添加到配置类然后设置Listener 和ConsumerFactory bean 一样简单
配置完成后,您可以轻松设置消费者,如下所示:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition)
System.out.println("Received Message: " + message"+ "from partition: " + partition);
【讨论】:
以上是关于这种在 Spring Boot 应用程序中启动无限循环的方式有啥问题吗?的主要内容,如果未能解决你的问题,请参考以下文章
Azure App Service - Spring Boot 应用程序在启动时卡住
Postgres RDS 数据库数据库连接在星期六无限增加,导致 Spring Boot Java API 应用程序中出现“JDBCConnectionException”