使用嵌入式 Kafka 进行春季 Kafka 集成测试

Posted

技术标签:

【中文标题】使用嵌入式 Kafka 进行春季 Kafka 集成测试【英文标题】:spring Kafka integration testing with embedded Kafka 【发布时间】:2019-07-12 06:33:04 【问题描述】:

我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费并生产到不同集群中的另一个主题。

现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource

消费类

@Service
public class KafkaConsumerService 

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "$kafka.producer.topic")
public void professor(List<Professor> pro) 
    pro.forEach(kafkaProducerService::produce);
    
   


生产者类

@Service
public class KafkaProducerService 

@Value("$kafka.producer.topic")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) 
    kafkaTemplate.send(topic,"professor",pro);
  

 

在我的测试用例中,我想覆盖KafkaTemplate,这样当我在Test 中调用kafkaConsumerService.professor 方法时,它应该将数据生成到嵌入式Kafka 中并且我应该对其进行验证。

测试配置

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = "listeners=PLAINTEXT://localhost:3333", "port=3333")
public class KafkaProducerConfigTest 

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception 
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) 
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  


@Bean
public ProducerFactory<String, Object> producerFactory() 
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));


@Bean
public KafkaTemplate<String, Object> kafkaTemplate() 
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   

 

测试类

@EnableKafka
@SpringBootTest(classes = KafkaProducerConfigTest.class)
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest 

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception 
     kafkaConsumerService.professor(Arrays.asList(new Professor()));
     
     //How to check messages is sent to kafka?


 

错误

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

还有人可以帮助我如何验证发送到嵌入式 Kafka 服务器的消息吗?

注意我有一些已弃用的警告

KafkaEmbedded 类型已弃用

KafkaEmbedded 类型的方法 getPartitionsPerTopic() 已弃用

KafkaTestUtils 类型的方法 producerProps(KafkaEmbedded) 已弃用

【问题讨论】:

【参考方案1】:

引导 2.1 disables bean overriding by default.

默认情况下禁用 Bean 覆盖以防止意外覆盖 bean。如果您依赖覆盖,则需要将spring.main.allow-bean-definition-overriding 设置为true

关于弃用;请参阅 @EmbeddedKafka 的 javadocs。替换为EmbeddedKafkaBroker

【讨论】:

感谢您的回复先生,我做到了,但现在不同的问题Field kafkaEmbeded in com.kafka.configuration.KafkaProducerConfigTest required a bean of type 'org.springframework.kafka.test.rule.KafkaEmbedded' that could not be found. 如我所说;现在是EmbeddedKafkaBroker@Autowired public EmbeddedKafkaBroker broker;.

以上是关于使用嵌入式 Kafka 进行春季 Kafka 集成测试的主要内容,如果未能解决你的问题,请参考以下文章

集成测试之嵌入式Kafka

春季启动生产者在kafka重启后无法发送任何消息

java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试

java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试

storm集成kafka的应用,从kafka读取,写入kafka

大数据Spark集成Kafka