Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步
Posted
技术标签:
【中文标题】Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步【英文标题】:Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction 【发布时间】:2020-03-07 08:17:00 【问题描述】:我阅读了大量 Gary Russell 的答案和帖子,但没有找到以下序列同步常见用例的实际解决方案:
recieve from topic A => save to DB via Spring-data => send to topic B
据我了解:在这种情况下无法保证完全原子处理,我需要在客户端处理消息重复数据删除,但主要问题是 ChainedKafkaTransactionManager 不与 JpaTransactionManager 同步强>(见下文@KafkaListener
)
卡夫卡配置:
@Production
@EnableKafka
@Configuration
@EnableTransactionManagement
public class KafkaConfig
private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConsumerFactory<String, byte[]> commonConsumerFactory(@Value("$kafka.broker") String bootstrapServer)
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(AUTO_OFFSET_RESET_CONFIG, 'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(MAX_POLL_RECORDS_CONFIG, 10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG, 17000);
props.put(FETCH_MIN_BYTES_CONFIG, 1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ISOLATION_LEVEL_CONFIG, 'read_committed');
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String, byte[]> consumerFactory,
@Qualifier("chainedKafkaTM") ChainedKafkaTransactionManager chainedKafkaTM,
@Qualifier("kafkaTemplate") KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("$kafka.concurrency:#T(java.lang.Runtime).getRuntime().availableProcessors()") Integer concurrency
)
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setTransactionManager(chainedKafkaTM);
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String, byte[]>(new FixedBackOff(1000L, 3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);
factory.setAfterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);
factory.afterPropertiesSet();
return factory;
@Bean
public ProducerFactory<String, byte[]> producerFactory(@Value("$kafka.broker") String bootstrapServer)
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String, byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');
return kafkaProducerFactory;
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory)
return new KafkaTemplate<>(producerFactory);
@Bean
public KafkaTransactionManager kafkaTransactionManager(@Qualifier("producerFactory") ProducerFactory<String, byte[]> producerFactory)
KafkaTransactionManager ktm = new KafkaTransactionManager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return ktm;
@Bean
public ChainedKafkaTransactionManager chainedKafkaTM(JpaTransactionManager jpaTransactionManager,
KafkaTransactionManager kafkaTransactionManager)
return new ChainedKafkaTransactionManager(kafkaTransactionManager, jpaTransactionManager);
@Bean(name = "transactionManager")
public JpaTransactionManager transactionManager(EntityManagerFactory em)
return new JpaTransactionManager(em);
卡夫卡监听器:
@KafkaListener(groupId = "$group.id", idIsGroup = false, topics = "$topic.name.import")
public void consume(List<byte[]> records, @Header(KafkaHeaders.OFFSET) Long offset)
for (byte[] record : records)
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");
// spring-data storage with @Transactional("chainedKafkaTM"), since Spring-data can't determine TM among transactionManager, chainedKafkaTM, kafkaTransactionManager
var result = storageService.persist(record);
kafkaTemplate.send(result);
Spring-kafka 版本:2.3.3 春季启动版本:2.2.1
实现这种用例的正确方法是什么? Spring-kafka 文档仅限于小型/特定示例。
P.s.当我在 @KafkaListener
方法上使用 @Transactional(transactionManager = "chainedKafkaTM", rollbackFor = Exception.class)
时,我面临无限循环回滚,但是设置了 FixedBackOff(1000L, 3L)
。
编辑:我计划通过可配置的重试次数在侦听器、生产者和数据库之间实现最大负担得起的同步。
编辑: 上面的代码 sn-ps 已根据建议的配置进行了编辑。使用 ARBP 并不能解决我的无限回滚循环,因为第一条语句的谓词始终为 false (SeekUtils.doSeeks
):
DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
boolean recoverable)
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getSkipPredicate((List) records, exception), LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional())
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)));
值得一提的是,Kafka Consumer方法(TransactionSynchronizationManager.isActualTransactionActive()
)中没有活跃的事务。
【问题讨论】:
【参考方案1】:是什么让您认为它不同步?你真的不需要@Transactional
,因为容器会启动两个事务。
您不应该在事务中使用SeekToCurrentErrorHandler
,因为这发生在事务中。改为配置后回滚处理器。默认 ARBP 使用 FixedBackOff(0L, 9)
(10 次尝试)。
这对我来说很好;并在 4 次交付尝试后停止:
@SpringBootApplication
public class So58804826Application
public static void main(String[] args)
SpringApplication.run(So58804826Application.class, args);
@Bean
public JpaTransactionManager transactionManager()
return new JpaTransactionManager();
@Bean
public ChainedKafkaTransactionManager<?, ?> chainedTxM(JpaTransactionManager jpa,
KafkaTransactionManager<?, ?> kafka)
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(kafka, jpa);
@Autowired
private Saver saver;
@KafkaListener(id = "so58804826", topics = "so58804826")
public void listen(String in)
System.out.println("Storing: " + in);
this.saver.save(in);
@Bean
public NewTopic topic()
return TopicBuilder.name("so58804826")
.partitions(1)
.replicas(1)
.build();
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template)
return args ->
// template.executeInTransaction(t -> t.send("so58804826", "foo"));
;
@Component
class ContainerFactoryConfigurer
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm)
factory.getContainerProperties().setTransactionManager(tm);
factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L, 3)));
@Component
class Saver
@Autowired
private MyEntityRepo repo;
private final AtomicInteger ids = new AtomicInteger();
@Transactional("chainedTxM")
public void save(String in)
this.repo.save(new MyEntity(in, this.ids.incrementAndGet()));
throw new RuntimeException("foo");
我从两个 TxM 中看到“参与现有交易”。
使用@Transactional("transactionManager")
,我只是从 JPATm 中得到它,正如人们所期望的那样。
编辑
批处理侦听器没有“恢复”的概念——框架不知道批处理中的哪条记录需要被跳过。在 2.3 中,我们为使用 MANUAL ack 模式时的批处理侦听器添加了一项新功能。
见Committing Offsets。
从 2.3 版本开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发 IllegalStateException。
使用批处理侦听器时,您可以指定批处理中发生故障的索引。当调用
nack()
时,将为索引之前的记录提交偏移量,并在分区上为失败和丢弃的记录执行查找,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,SeekToCurrentBatchErrorHandler 只能寻找整个批次进行重新投递。
但是,失败的记录仍然会无限期地重播。
您可以跟踪不断失败的记录,然后点击index + 1
跳过它。
但是,由于您的 JPA tx 已回滚;这对你不起作用。
使用批处理侦听器,您必须在侦听器代码中处理批处理问题。
【讨论】:
哇,非常感谢!!!我今晚去看看!但在我的示例中,记录在抛出 RuntimeException 之前就已保留。你在“你真的不需要@Transactional”下是什么意思——我不需要在存储库或消费者上使用@Transactional?哦,对不起,现在我明白了:我不需要消费者/侦听器上的事务,但我需要使用 ChainedTM 配置 Spring-data,而不仅仅是普通的 JpaTransactionManager,对吗? 否;我说@Transactional("chainedTxM")
是多余的,因为侦听器容器在调用侦听器之前启动事务。它没有伤害(因为我们得到Participating in existing transaction
),但它是不必要的。
我检查了设置,我遇到了两个问题: 1. Spring-data 无法在 3 TM (kafka, chain, jpa) 之间做出决定,所以它不会启动,所以我需要设置@Transactional("chainedTM") 在我的存储库类上,2. 如果在 listen 方法的开头(在任何模板或存储库使用之前)抛出运行时异常,它会无限循环并且 ARBP 不起作用。
在无限循环的情况下,无论ARBP如何,它总是寻求相同的偏移量:INFO Oakclients.consumer.KafkaConsumer - ....寻求偏移量 1...请看第二个编辑回答。
这没有意义;我的例子不管有没有@Transactional
都可以正常工作。在某处发布调试日志以显示您所看到的行为。将您的代码与我的代码进行比较。以上是关于Spring Kafka ChainedKafkaTransactionManager 不与 JPA Spring-data 事务同步的主要内容,如果未能解决你的问题,请参考以下文章
Spring Kafka 和 Spring Integration Kafka 的区别
kafka:spring集成 kafka(springboot集成客户端集成)