Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步

Posted

技术标签:

【中文标题】Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步【英文标题】:Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction 【发布时间】:2020-03-07 08:17:00 【问题描述】:

我阅读了大量 Gary Russell 的答案和帖子,但没有找到以下序列同步常见用例的实际解决方案:

recieve from topic A => save to DB via Spring-data => send to topic B

据我了解:在这种情况下无法保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是 ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步强>(见下文@KafkaListener

卡夫卡配置:

@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig 

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    @Bean
    public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("$kafka.broker") String bootstrapServer) 

        Map<String, Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(MAX_POLL_RECORDS_CONFIG, 10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
        props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
        props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
            @Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
            @Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
            @Value("$kafka.concurrency:#T(java.lang.Runtime).getRuntime().availableProcessors()") Integer concurrency
    ) 

        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setTransactionManager(chainedKafkaTM);

        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
        arbp.setCommitRecovered(true);
        arbp.setKafkaTemplate(kafkaTemplate);

        factory.setAfterRollbackProcessor(arbp);
        factory.setConcurrency(concurrency);

        factory.afterPropertiesSet();

        return factory;
    

    @Bean
    public ProducerFactory<String, byte[]> producerFactory(@Value("$kafka.broker") String bootstrapServer) 

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG, 16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
        kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

        return kafkaProducerFactory;
    

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) 
        return new KafkaTemplate<>(producerFactory);
    

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory) 
        KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
        ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return ktm;
    

    @Bean
    public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
                                                         KafkaTransactionManager kafkaTransactionManager) 
        return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
    

    @Bean(name = "transactionManager")
    public JpaTransactionManager transactionManager(EntityManagerFactory em) 
        return new JpaTransactionManager(em);
    

卡夫卡监听器:

@KafkaListener(groupId = "$group.id", idIsGroup = false, topics = "$topic.name.import")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset) 
    for (byte[] record : records) 
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
        var result = storageService.persist(record);

        kafkaTemplate.send(result);
    

Spring-kafka 版本:2.3.3 春季启动版本:2.2.1

实现这种用例的正确方法是什么? Spring-kafka 文档仅限于小型/特定示例。

P.s.当我在 @KafkaListener 方法上使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class) 时,我面临无限循环回滚,但是设置了 FixedBackOff(1000L, 3L)

编辑:我计划通过可配置的重试次数在侦听器、生产者和数据库之间实现最大负担得起的同步。

编辑: 上面的代码 sn-ps 已根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环,因为第一条语句的谓词始终为 false (SeekUtils.doSeeks):

DefaultAfterRollbackProcessor
...
@Override
    public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
            boolean recoverable) 

        if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
                getSkipPredicate((List) records, exception), LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) 
            ConsumerRecord<K, V> skipped = records.get(0);
            this.kafkaTemplate.sendOffsetsToTransaction(
                    Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
                            new OffsetAndMetadata(skipped.offset() + 1)));
        
    

值得一提的是,Kafka Consumer方法(TransactionSynchronizationManager.isActualTransactionActive())中没有活跃的事务。

【问题讨论】:

【参考方案1】:

是什么让您认为它不同步?你真的不需要@Transactional,因为容器会启动两个事务。

您不应该在事务中使用SeekToCurrentErrorHandler,因为这发生在事务中。改为配置后回滚处理器。默认 ARBP 使用 FixedBackOff(0L, 9)(10 次尝试)。

这对我来说很好;并在 4 次交付尝试后停止:

@SpringBootApplication
public class So58804826Application 

    public static void main(String[] args) 
        SpringApplication.run(So58804826Application.class, args);
    

    @Bean
    public JpaTransactionManager transactionManager() 
        return new JpaTransactionManager();
    


    @Bean
    public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
            KafkaTransactionManager<?, ?> kafka) 

        kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(kafka, jpa);
    

    @Autowired
    private Saver saver;

    @KafkaListener(id = "so58804826", topics = "so58804826")
    public void listen(String in) 
        System.out.println("Storing: " + in);
        this.saver.save(in);
    

    @Bean
    public NewTopic topic() 
        return TopicBuilder.name("so58804826")
                .partitions(1)
                .replicas(1)
                .build();
    

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) 
        return args -> 
//          template.executeInTransaction(t -> t.send("so58804826", "foo"));
        ;
    



@Component
class ContainerFactoryConfigurer 

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
            ChainedKafkaTransactionManager<?, ?> tm) 

        factory.getContainerProperties().setTransactionManager(tm);
        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
    



@Component
class Saver 

    @Autowired
    private MyEntityRepo repo;

    private final AtomicInteger ids = new AtomicInteger();

    @Transactional("chainedTxM")
    public void save(String in) 
        this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
        throw new RuntimeException("foo");
    


我从两个 TxM 中看到“参与现有交易”。

使用@Transactional("transactionManager"),我只是从 JPATm 中得到它,正如人们所期望的那样。

编辑

批处理侦听器没有“恢复”的概念——框架不知道批处理中的哪条记录需要被跳过。在 2.3 中,我们为使用 MANUAL ack 模式时的批处理侦听器添加了一项新功能。

见Committing Offsets。

从 2.3 版本开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发 IllegalStateException。

使用批处理侦听器时,您可以指定批处理中发生故障的索引。当调用nack() 时,将为索引之前的记录提交偏移量,并在分区上为失败和丢弃的记录执行查找,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,SeekToCurrentBatchErrorHandler 只能寻找整个批次进行重新投递。

但是,失败的记录仍然会无限期地重播。

您可以跟踪不断失败的记录,然后点击index + 1 跳过它。

但是,由于您的 JPA tx 已回滚;这对你不起作用。

使用批处理侦听器,您必须在侦听器代码中处理批处理问题。

【讨论】:

哇,非常感谢!!!我今晚去看看!但在我的示例中,记录在抛出 RuntimeException 之前就已保留。你在“你真的不需要@Transactional”下是什么意思——我不需要在存储库或消费者上使用@Transactional?哦,对不起,现在我明白了:我不需要消费者/侦听器上的事务,但我需要使用 ChainedTM 配置 Spring-data,而不仅仅是普通的 JpaTransactionManager,对吗? 否;我说@Transactional("chainedTxM") 是多余的,因为侦听器容器在调用侦听器之前启动事务。它没有伤害(因为我们得到Participating in existing transaction),但它是不必要的。 我检查了设置,我遇到了两个问题: 1. Spring-data 无法在 3 TM (kafka, chain, jpa) 之间做出决定,所以它不会启动,所以我需要设置@Transactional("chainedTM") 在我的存储库类上,2. 如果在 listen 方法的开头(在任何模板或存储库使用之前)抛出运行时异常,它会无限循环并且 ARBP 不起作用。 在无限循环的情况下,无论ARBP如何,它总是寻求相同的偏移量:INFO Oakclients.consumer.KafkaConsumer - ....寻求偏移量 1...请看第二个编辑回答。 这没有意义;我的例子不管有没有@Transactional都可以正常工作。在某处发布调试日志以显示您所看到的行为。将您的代码与我的代码进行比较。

以上是关于Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步的主要内容,如果未能解决你的问题,请参考以下文章

Spring Kafka 和 Spring Integration Kafka 的区别

kafka:spring集成 kafka(springboot集成客户端集成)

Spring生态研习:Spring-kafka

解决 spring boot 访问 docker kafka 失败

spring boot引入kafka

用 spring 管理 Kafka 主题