如何为@KafkaListener 编写单元测试?
Posted
技术标签:
【中文标题】如何为@KafkaListener 编写单元测试?【英文标题】:How to write Unit test for @KafkaListener? 【发布时间】:2019-03-17 21:27:24 【问题描述】:试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。
我的听众课。
public class MyKafkaListener
@Autowired
private MyMessageProcessor myMessageProcessor;
@KafkaListener(topics = "$kafka.topic.01", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message)
myMessageProcessor.process(message);
log.info("MyMessage processed");
我的测试课:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = "I1.Topic.json.001")
@ContextConfiguration(classes = TestKafkaConfig.class)
public class MyMessageConsumersTest
@Autowired
private MyMessageProcessor myMessageProcessor;
@Value("$kafka.topic.01")
private String TOPIC_01;
@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;
@Test
public void testSalesforceMessageListner()
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
我的测试配置类:
@Configuration
@EnableKafka
public class TestKafkaConfig
@Bean
public MyMessageProcessor myMessageProcessor()
return mock(MyMessageProcessor.class);
@Bean
public KafkaEmbedded kafkaEmbedded()
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
//Consumer
@Bean
public ConsumerFactory<String, MyMessage> myMessageConsumerFactory()
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
//Producer
@Bean
public ProducerFactory<String, MyMessage> producerFactory()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
@Bean
public KafkaTemplate<String, MyMessage> messageProducer()
return new KafkaTemplate<>(producerFactory());
有什么简单的方法可以完成这项工作吗?
或者我应该以其他方式对@KafkaListener 进行测试吗?在单元测试中,如何确保当新消息到达Kafka时调用@KafkaListener。
【问题讨论】:
【参考方案1】:当新消息到达 Kafka 时,如何确保调用 @KafkaListener。
嗯,这本质上是测试此类功能的框架职责。在您的情况下,您只需要专注于业务逻辑和单元测试您的自定义代码,而不是在框架中编译的代码。此外,测试仅记录传入消息的@KafkaListener
方法也没有什么好处。要找到测试用例验证的钩子肯定太难了。
另一方面,我真的相信您的 @KafkaListener
方法中的业务逻辑比您展示的要复杂得多。因此,验证从该方法调用的自定义代码(例如 DB 插入、其他一些服务调用等)可能会更好,而不是尝试准确找出 myMessageListener()
的钩子。
你用mock(MyMessageProcessor.class)
做的事情确实是业务逻辑验证的好方法。只有代码中的错误在于 EmbeddedKafka
的重复:您使用注释并且还在配置中声明了 @Bean
。您应该考虑删除其中一个。尽管尚不清楚您的生产代码在哪里,但它确实不受嵌入式 Kafka 的影响。否则,如果一切都在测试范围内,我看不出您的消费者和生产者工厂配置有任何问题。对于@KafkaListener
和KafkaTemplate
,您肯定有一个最小的可能配置。你只需要删除一个@EmbeddedKafka
不要启动代理两次。
【讨论】:
【参考方案2】:您可以将侦听器包装在您的测试用例中。
给定
@SpringBootApplication
public class So52783066Application
public static void main(String[] args)
SpringApplication.run(So52783066Application.class, args);
@KafkaListener(id = "so52783066", topics = "so52783066")
public void listen(String in)
System.out.println(in);
然后
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52783066ApplicationTests
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "so52783066");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@Before
public void setup()
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
@Test
public void test() throws Exception
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry
.getListenerContainer("so52783066");
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, String> messageListener = (AcknowledgingConsumerAwareMessageListener<String, String>) container
.getContainerProperties().getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties()
.setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, String>()
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer)
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
);
container.start();
template.send("so52783066", "foo");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
【讨论】:
什么是 KafkaEmbedded? 不要对旧答案提问;而是问一个新问题。这是KafkaEmbeddedBroker
的旧名称 - 请参阅 the documentation。
嗨,Gary,这里的消费者如何获取嵌入式 kafka 代理 ID。因为消费者配置将在测试用例执行之前创建。
不要对旧答案提出新问题;它不会帮助其他人找到答案。见the documentation。 @EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
将覆盖应用程序属性。如果这不是您的意思,请提出一个新问题。【参考方案3】:
这是我根据您的代码为消费者提供的工作解决方案。谢谢:-)
配置如下:
@TestConfiguration
@EnableKafka
@Profile("kafka_test")
public class KafkaTestConfig
private static Logger log = LoggerFactory.getLogger(KafkaTestConfig.class);
@Value("$spring.kafka.bootstrap-servers")
private String bootstrapServers;
@Bean
@Primary
public Map<String, Object> consumerConfigs()
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
log.info("Consumer TEST config = ", props);
return props;
@Bean
public Map<String, Object> producerConfigs()
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
log.info("Producer TEST config = ", props);
return props;
@Bean
public ConsumerFactory<String, String> consumerFactory()
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<String>());
@Bean
public ProducerFactory<String, String> producerFactory()
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
return pf;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> kafkaConsumerFactory)
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.setConcurrency(2);
return factory;
@Bean
public KafkaTemplate<String, String> kafkaTemplate()
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
@Bean
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry()
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = new KafkaListenerEndpointRegistry();
return kafkaListenerEndpointRegistry;
将您需要包含在测试中的所有 bean 放在不同的类中:
@TestConfiguration
@Profile("kafka_test")
@EnableKafka
public class KafkaBeansConfig
@Bean
public MyProducer myProducer()
return new MyProducer();
// more beans
我创建了一个 BaseKafkaConsumerTest 类来重用它:
@ExtendWith(SpringExtension.class)
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=$spring.embedded.kafka.brokers" )
@TestInstance(Lifecycle.PER_CLASS)
@DirtiesContext
@ContextConfiguration(classes = KafkaTestConfig.class)
@ActiveProfiles("kafka_test")
public class BaseKafkaConsumerTest
@Autowired
protected EmbeddedKafkaBroker embeddedKafka;
@Value("$spring.embedded.kafka.brokers")
private String brokerAddresses;
@Autowired
protected KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
protected KafkaTemplate<String, String> senderTemplate;
public void setUp()
embeddedKafka.brokerProperty("controlled.shutdown.enable", true);
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers())
System.err.println(messageListenerContainer.getContainerProperties().toString());
ContainerTestUtils.waitForAssignment(messageListenerContainer, embeddedKafka.getPartitionsPerTopic());
@AfterAll
public void tearDown()
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers())
messageListenerContainer.stop();
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
embeddedKafka.getKafkaServers().forEach(b -> b.awaitShutdown());
扩展基类来测试你的消费者:
@EmbeddedKafka(topics = MyConsumer.TOPIC_NAME)
@Import(KafkaBeansConfig.class)
public class MYKafkaConsumerTest extends BaseKafkaConsumerTest
private static Logger log = LoggerFactory.getLogger(PaymentMethodsKafkaConsumerTest.class);
@Autowired
private MyConsumer myConsumer;
// mocks with @MockBean
@Configuration
@ComponentScan( "com.myfirm.kafka" )
static class KafkaLocalTestConfig
@BeforeAll
public void setUp()
super.setUp();
@Test
public void testMessageIsReceived() throws Exception
//mocks
String jsonPayload = "\"id\":\"12345\","cookieDomain\":"helloworld"";
ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);
Thread.sleep(10000);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>()
@Override
public void onSuccess(SendResult<String, String> result)
log.info("successfully sent message='' with offset=", jsonPayload,
result.getRecordMetadata().offset());
@Override
public void onFailure(Throwable ex)
log.error("unable to send message=''", jsonPayload, ex);
);
Mockito.verify(myService, Mockito.times(1))
.update(Mockito.any(MyDetails.class));
正如我在其他帖子中所读到的,不要以这种方式测试业务逻辑。只是打了电话。
【讨论】:
你的例子中的 myService 是什么 也许是我的消费者。我不记得了。我替换了公司变量 好的,谢谢,我正在努力为 Kafka 监听器编写测试。以上是关于如何为@KafkaListener 编写单元测试?的主要内容,如果未能解决你的问题,请参考以下文章