春天Kafka集成测试听众不工作

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了春天Kafka集成测试听众不工作相关的知识,希望对你有一定的参考价值。

我正在尝试使用spring-boot和spring嵌入式Kafka进行集成测试。我能够为Spring嵌入式Kafka服务器生成消息,但是不是测试监听器,服务类中的监听器正在尝试使用记录和

KafkaProducerConfigTest包含所有bean的Config类

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest {

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events");
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

@Bean("consumerFactory")
 public ConsumerFactory<String, Object> createConsumerFactory() {
     Map<String, Object> props = new HashMap<>();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Object.class,false));
 }

@Bean("kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(createConsumerFactory());
     factory.setBatchListener(true);
     factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
     factory.getContainerProperties().setAckMode(AckMode.BATCH);
     return factory;
 }

@Bean
public StringJsonMessageConverter converter() {
    return new StringJsonMessageConverter();
}

@Bean
public Listener listener() {
    return new Listener();
}

public class Listener {
    public final CountDownLatch latch = new CountDownLatch(1);

    @Getter
    public List<Professor> list;

    @KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory")
    public void listen1(List<Professor> foo) {

        list=foo;
        this.latch.countDown();
      }
   }

 }

KafkaProducerServiceTest服务测试类,但是这里监听器没有消耗任何数据,我知道这个测试会失败,因为这不是正确的方法

@EnableKafka
@SpringBootTest(classes = { KafkaProducerConfigTest.class })
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Autowired
private Listener listener;

@Test
public void testReceive() throws Exception {
    kafkaConsumerService.professor(Arrays.asList(new Professor("Ajay", new Department("social", 1234))));
    System.out.println("The professor object is sent to kafka -----------------------------------");
    listener.latch.await();
    System.out.println(listener.getList());

    }

}

错误

2019-02-19 15:18:32.620 ERROR 22387 --- [ntainer#1-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
ConsumerRecord(topic = test-events, partition = 1, offset = 0, CreateTime = 1550611112583, serialized key size = 9, serialized value size = 64, headers = RecordHeaders(headers = [], isReadOnly = false), key = professor, value = {name=Ajay, department={deptName=social, deptId=1234}})

 java.lang.IllegalStateException: Only String or byte[] supported
at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:140) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:134) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.BatchMessagingMessageConverter.convert(BatchMessagingMessageConverter.java:217) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.support.converter.BatchMessagingMessageConverter.toMessage(BatchMessagingMessageConverter.java:165) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.toMessagingMessage(BatchMessagingMessageListenerAdapter.java:174) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:129) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:59) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:984) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:917) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:900) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:753) [spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]

基于@Gray Russel先生回答更新,但我仍有问题

答案

在容器工厂(或StringDeserializerStringJsonMessageConverter)使用BytesDeserializerBytesJsonMessageConverter;然后,框架可以从方法签名中确定目标类型,并将其传递给转换器。

反序列化器在堆栈中太远而无法进行推断。

以上是关于春天Kafka集成测试听众不工作的主要内容,如果未能解决你的问题,请参考以下文章

PayPal 没有向听众发送 IPN

Kafka 测试容器未运行

使用 Kafka Spout 的 Kafka Storm 集成

使用嵌入式 Kafka 进行春季 Kafka 集成测试

如何为@KafkaListener 编写单元测试?

如何测试(集成测试)springboot-kafka 微服务