spring 总是需要 KafkaTemplate 吗?
Posted
技术标签:
【中文标题】spring 总是需要 KafkaTemplate 吗?【英文标题】:Does spring always require KafkaTemplate? 【发布时间】:2022-01-19 05:46:05 【问题描述】:问题:springboot 是否总是需要创建 KafkaTemplate 类型的 bean? 下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢
-
我一直在向一个 Spring Boot 项目的主题发布消息
为了创建回调机制,我使用了 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord
我的 Kafka 配置如下
@Configuration
public class KafkaEventConfig
private final KafkaProperties kafkaProperties;
@Value("$client.id")
private String clientId;
@Value("$topic.movie.name")
private String movieTopicName;
@Value("$retry.backoff.ms")
private int retryBackoffMilliseconds;
@Value("$request.timeout.ms")
private int requestTimeoutMilliseconds;
public KafkaEventConfig(KafkaProperties kafkaProperties)
this.kafkaProperties = kafkaProperties;
@Bean
public ProducerFactory<String, Movie> producerFactory()
Map<String, Object> props = kafkaProperties.buildProducerProperties();
populateCommonProperties(props);
return new DefaultKafkaProducerFactory<>(props);
private void populateCommonProperties(Map<String, Object> props)
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
@Bean
public KafkaProducer<String, Movie> movieKafkaProducer()
return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
@Bean
public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
MeterRegistry registry)
return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
我的 Kafka 回调如下
@Slf4j
public class KafkaProducerCallBack<K, V> implements Callback
private ProducerRecord<K, V> producerRecord;
public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord)
this.producerRecord = producerRecord;
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
String topicName= metadata.topic();
long offset= metadata.offset();
if (exception != null)
log.error("Failed to produce message [] to topic with exception ", producerRecord, topicName, exception);
else
log.info("Sucessfully published message [] to topic to offset ", producerRecord, topicName , offset);
我是这样发布消息的
movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));
请注意我在 KafkaEventConfig 中添加以下行的那一刻一切正常
@Bean
public KafkaTemplate<String, Movie> movieKafkaTemplate()
return new KafkaTemplate<String, Movie>(producerFactory());
【问题讨论】:
不,它没有,但是你是手动配置 Kafka 的部分,Spring Boot 也会尝试做一些自动配置。如果禁用 Kafka 的自动配置,则不需要KafkaTemplate
。或者您利用自动配置并使用KafkaTemplate
(可以由 Spring Boot 自动提供)来发送消息并简化您自己的配置。
谢谢,太棒了..这两个工作完美.. @EnableConfigurationProperties(KafkaProperties.class) @SpringBootApplication(exclude=KafkaAutoConfiguration.class)
【参考方案1】:
仔细查看您的异常堆栈跟踪会发现问题。
Error creating bean with name 'kafkaTemplate' defined in class path resource
[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]:
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations:
错误来自无法在 Spring Boot 中应用基于 Kafka 的自动配置。 KafkaAutoConfiguration
类期望并配置一些bean,如果找到某些bean,则退出。当您正在配置一些 bean 时,这将部分回退,因此无法自动配置 Kafka 类。
要修复,您可以排除KafkaAutoConfiguration
。您可以在 @SpringBootApplication
注释中执行此操作,就像这样
@SpringBootApplication(exclude=KafkaAutoConfiguration.class
或者您可以利用自动配置,让 Spring Boot 进行配置,然后您使用提供的 KafkaTemplate
或 ProducerFactory
来做您想做的事情。
后者将简化您自己的配置。我对 Kafka 自动配置和您的用例知之甚少,无法提供更有用的代码 sn-p,但您应该能够自己弄清楚,或者只是排除 KafkaAutoConfiguration
并使用您现在拥有的.
【讨论】:
【参考方案2】:除了@M.Deinum 提到的后者:
看看KafkaAutoConfiguration
类:
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener,
ObjectProvider<RecordMessageConverter> messageConverter)
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
如果您不创建自己的,Springboot 将为您创建一个 KafkaTemplate
bean。这个自动配置的 bean 依赖于 ProducerFactory<Object, Object>
bean,因为你声明了一个 ProducerFactory<String, Movie>
。如您所见,类型不合适,这就是您收到错误的原因。
我这样做的原因是因为使用 KafkaTemplate 时可听的future 仅在失败时提供异常(并且我想在我的所有用例中将回调注册为一个单独的可重用类)
您的情况,您仍然可以获得使用KafkaTemplate
的优势。您可以实现自己的ProducerListener<K, V>
并将其绑定到您的KafkaTemple
,而不是实现Callback
。例如:
FullLoggingProducerListener.class
public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V>
@Override
public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata)
log.info("Successful!");
@Override
public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception)
log.error("Error!");
YourConfigration.class
@Bean
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener)
KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
kafkaTemplate.setProducerListener(kafkaProducerListener);
return kafkaTemplate;
现在,每次您使用KafkaTemplate
发送记录时,您都会看到日志。
【讨论】:
以上是关于spring 总是需要 KafkaTemplate 吗?的主要内容,如果未能解决你的问题,请参考以下文章
spring-kafka整合:KafkaTemplate-kafka模板类介绍
Kafka问题 02KafkaTemplate 报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 问题解决