Spring Kafka集成测试写入高水位文件时出错

Posted

技术标签:

【中文标题】Spring Kafka集成测试写入高水位文件时出错【英文标题】:Spring Kafka integration test Error while writing to highwatermark file 【发布时间】:2019-07-15 17:18:00 【问题描述】:

我在 Spring Boot 应用程序中使用 spring-kaka-2.2.0 编写集成测试,我几乎成功了,我的测试用例仍然返回 true,但之后我仍然看到多个错误。

2019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)

测试配置

@EnableKafka
@TestConfiguration
public class KafkaProducerConfigTest 

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() 
    return new EmbeddedKafkaBroker(1,false,2,"test-events");



@Bean
public ProducerFactory<String, Object> producerFactory() 
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);


@Bean
public KafkaTemplate<String, Object> kafkaTemplate() 
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   

@Bean("consumerFactory")
 public ConsumerFactory<String, Professor> createConsumerFactory() 
     Map<String, Object> props = new HashMap<>();
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker().getBrokersAsString());
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
     JsonDeserializer<Professor> jsonDeserializer = new JsonDeserializer<>(Professor.class,false);
     return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), jsonDeserializer);
 

@Bean("kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, Professor> kafkaListenerContainerFactory() 
     ConcurrentKafkaListenerContainerFactory<String, Professor> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(createConsumerFactory());
     factory.setBatchListener(true);
     factory.getContainerProperties().setAckMode(AckMode.BATCH);
     return factory;
 

@Bean
public StringJsonMessageConverter converter() 
    return new StringJsonMessageConverter();


@Bean
public Listener listener() 
    return new Listener();


public class Listener 
    public final CountDownLatch latch = new CountDownLatch(1);

    @Getter
    public List<Professor> list;

    @KafkaListener(topics = "test-events", containerFactory = "kafkaListenerContainerFactory")
    public void listen1(List<Professor> foo) 

        list=foo;
        this.latch.countDown();
       
    


测试类

@EnableKafka
@SpringBootTest(classes =  KafkaProducerConfigTest.class )
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest 

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Autowired
private Listener listener;

@Test
public void testReceive() throws Exception 
    Professor professor = new Professor("Ajay", new Department("social", 1234));
    List<Professor> pro = new ArrayList<>();
    pro.add(professor);
    System.out.println(pro);
    kafkaConsumerService.professor(pro);
    System.out.println("The professor object is sent to kafka -----------------------------------");
    listener.latch.await();
    List<Professor> result = listener.getList();
    Professor resultPro = result.get(0);
    System.out.println(result);
    System.out.println(resultPro);

    assertEquals(pro.get(0).getName(), result.get(0).getName());

     

 

测试用例 testReceive() 正在通过,但仍有多个错误消息

堆栈跟踪错误 1

019-02-21 11:12:35.434 ERROR 5717 --- [       Thread-7] kafka.server.ReplicaManager              : [ReplicaManager broker=0] Error while writing to highwatermark file in directory /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645

org.apache.kafka.common.errors.KafkaStorageException: Error while writing to checkpoint file /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/replication-offset-checkpoint.tmp (No such file or directory)

堆栈跟踪错误 2

2019-02-21 11:12:35.446  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)

java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/__consumer_offsets-4/00000000000000000000.index (No such file or directory)

堆栈跟踪错误 3

2019-02-21 11:12:35.451  WARN 5717 --- [pool-8-thread-1] kafka.utils.CoreUtils$                   : /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)

java.io.FileNotFoundException: /var/folders/s3/rz83xz3n1j13lgy9mtwkln594g3x0g/T/kafka-1246121392091602645/test-events-0/00000000000000000000.timeindex (No such file or directory)
at java.io.RandomAccessFile.open0(Native Method) ~[na:1.8.0_191]

【问题讨论】:

【参考方案1】:

我遇到了类似的问题,在 Gary Russell 的帮助下,我通过将日志目录指向 gradle build 输出目录 log.dir=out/embedded-kafka 或在 maven log.dir=target/embedded-kafka 的情况下解决了这个问题。

下面的代码 sn-p 显示了如何使用@EmbeddedKafka 来完成。

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, classes = Application.class)
@EmbeddedKafka(
        topics = "topic",
        partitions = 1,
        controlledShutdown = true,
        brokerProperties=
                "log.dir=out/embedded-kafka"
        )
@TestPropertySource(
        properties = 
                "spring.kafka.bootstrap-servers=$spring.embedded.kafka.brokers"
        )
public class OutboxEventsTest 
...

【讨论】:

只是提示,您也可以使用@SpringBootTest(properties = "...") 注解设置属性。 天哪。我在集成测试中使用了相同的配置,它就像一个魅力。希望我能在这里投 10 票。【参考方案2】:

您是否真的有权写信给/var/folders/s3 ...

您可以使用

覆盖位置
@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() 
    return new EmbeddedKafkaBroker(1,false,2,"test-events")
        .brokerProperties(Collections.singletonMap(KafkaConfig.LogDirProp(), "/tmp/foo"));

【讨论】:

先生,有没有办法为嵌入式 Kafka 代理传递自定义端口? @加里·罗素 不要在 cmets 中就旧答案提出新问题;总是问一个新问题。默认情况下,代理在随机端口上启动(这是 CI 构建的最佳想法)。使用new EmbeddedKafkaBroker(1,false,2,"test-events").kafkaPorts(1234) 指定一个特定端口(或多个端口,如果您正在启动多个代理)。【参考方案3】:

只需更改 Embedded Kafka 的代理属性

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = MyApplication.class)
@TestPropertySource(locations = "classpath:application-test.properties")
@EmbeddedKafka(
        topics = "my_topic_name",
        partitions = 1,
        brokerProperties = "log.dir=target/kafka"
        )

【讨论】:

以上是关于Spring Kafka集成测试写入高水位文件时出错的主要内容,如果未能解决你的问题,请参考以下文章

Kafka -- 关于高水位和Leader Epoch的讨论

Kafka - 日志结束偏移量(LEO)与高水位线(HW)之间的区别

kafka的副本同步机制---关于高水位和Leader Epoch

kafka的副本同步机制---关于高水位和Leader Epoch

kafka的副本同步机制---关于高水位和Leader Epoch

运行 Spring Boot 集成测试时出现 Liquibase 问题