Spring Kafka - 事务回滚后重试时,通过侦听器容器事务发布消息和提交记录偏移失败

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Kafka - 事务回滚后重试时,通过侦听器容器事务发布消息和提交记录偏移失败相关的知识,希望对你有一定的参考价值。

我在Apache Kafka v2.0.1中使用spring-boot&spring-kafka(请参阅pom.xml了解特定版本),并且在通过Listener Container使用事务时遇到了一个奇怪的问题。

当Kafka在尝试在消费过程产生周期上发布新消息时发生错误时,会引发此问题。为了模仿它,我将主题的min.insync.replicas设置(4)设置为大于Kafka的可用代理(3)。

我期待由于factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));,每次不成功的尝试和无限重试都会有事务回滚。

正如您在output.log上看到的,当第一次尝试(消息接收)处理失败时,侦听器容器启动事务回滚,然后它再次接收相同的消息(如预期的那样)。

然而,当第二次尝试处理和发布新消息时,它会永久阻止kafkaTemplate.send(record).get();并且没有启动事务回滚...顺便说一句,那时,如果我将min.insync.replicas设置设置为可接受的值(<=代理),进程正常继续并且事务已提交。

为了模拟它,只需向主题发布一个字符串消息,将主题的min.insync.replicas增加到不可接受的值并运行应用程序。

pom.hml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
    </parent>
    <groupId>com.project.test</groupId>
    <artifactId>kafka-transactions</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.Java

package com.project.test;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@EnableKafka
@EnableTransactionManagement
@SpringBootApplication
public class Application {

    private String kafkaBootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, "1");

        DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configProps);
        producerFactory.setTransactionIdPrefix("tx.");

        return producerFactory;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaTransactionManager<String, String> kafkaTransactionManager() {
        KafkaTransactionManager<String, String> manager = new KafkaTransactionManager<String, String>(producerFactory());
        return manager;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-kafka-tx-consumer");

        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);

        factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);
        factory.getContainerProperties().setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level.INFO);

        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, String>(-1));

        return factory;
    }

}

Kafka transactions.Java

package com.project.test;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaTransactions {

    private static final Logger log = LoggerFactory.getLogger(KafkaTransactions.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @KafkaListener(topics = "test-kafka-transactions")
    public void messageListener(String value) throws Exception {

        log.info("Received message");

        ProducerRecord<String, String> record = new ProducerRecord<>("test-kafka-transactions", null, value);

        Thread.sleep(2000);

        log.info("Adding new message on Kafka transaction (commit is handled by the Listener Container)");
        kafkaTemplate.send(record).get();

        log.info("Processed message");
    }

}

output.log

2019-04-04 10:21:25.741  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-kafka-transactions-3, test-kafka-transactions-2, test-kafka-transactions-1, test-kafka-transactions-0]
2019-04-04 10:23:03.240  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:05.245  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)
2019-04-04 10:23:05.247  INFO 19316 --- [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: gvDhAK6YRsWzh2FrxukHnA
2019-04-04 10:23:05.267  WARN 19316 --- [kafka-producer-network-thread | producer-1] o.a.k.clients.producer.internals.Sender  : [Producer clientId=producer-1, transactionalId=tx.test-kafka-tx-consumer.test-kafka-transactions.3] Got error produce response with correlation id 12 on topic-partition test-kafka-transactions-3, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS
2019-04-04 10:23:05.372 ERROR 19316 --- [kafka-producer-network-thread | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='1' and payload='1' to topic test-kafka-transactions:

org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372 ERROR 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.project.test.KafkaTransactions.messageListener(java.lang.String) throws java.lang.Exception' threw exception; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:302) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1224) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1217) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1600(KafkaMessageListenerContainer.java:384) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$3.doInTransactionWithoutResult(KafkaMessageListenerContainer.java:1128) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1118) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:934) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:750) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) [spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_191]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_191]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_191]
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at com.project.test.KafkaTransactions.messageListener(KafkaTransactions.java:29) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.1.5.RELEASE.jar:5.1.5.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    ... 17 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$0(KafkaTemplate.java:396) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
    at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1235) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:635) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:604) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:561) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:485) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:700) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:532) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:524) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-2.0.1.jar:na]
    ... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

2019-04-04 10:23:05.372  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Received message
2019-04-04 10:23:07.391  INFO 19316 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.project.test.KafkaTransactions       : Adding new message on Kafka transaction (commit is handled by the Listener Container)

答案

将Spring从图片中取出后,我能够重现它。

我提交了一个bug KAFKA-8195

以上是关于Spring Kafka - 事务回滚后重试时,通过侦听器容器事务发布消息和提交记录偏移失败的主要内容,如果未能解决你的问题,请参考以下文章

Spring Data:重试时回滚事务

Kafka 事务和幂等详解

MySQL事务回滚后自增键不连续

为什么mysql事务回滚后, 自增ID依然自增

休眠:java.sql.SQLException:事务回滚后尝试继续工作

Spring kafka记录标题未正确填充