无法从自动创建的 kafka 主题中获取消息

Posted

技术标签:

【中文标题】无法从自动创建的 kafka 主题中获取消息【英文标题】:Can not get message from autocreated kafka topic 【发布时间】:2021-01-22 11:29:39 【问题描述】:

我在 OpenShift 中部署了 Kafka,当我使用 spring-kafka 在我的应用程序中发送消息时,会自动创建一个主题。之前一切正常,但现在我切换到使用 kafka 事务,由于错误,我无法从 kafka 获取消息:

[/] 2020-10-05 09:20:44.589 ERROR --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] 1121:o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer : Transaction rolled back

org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:127)
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:52)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:137)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListenerInTx(KafkaMessageListenerContainer.java:1102)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1080)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:911)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:727)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction; nested exception is org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:150)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378)
    at org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:69)
    at org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:106)
    ... 10 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: TransactionalId tx-authorization-group-ms-c8f01388-4c6d-41ed-bce5-c382da07fd65.topic.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:758)
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:751)
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:216)
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:606)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:459)
    at brave.kafka.clients.TracingProducer.beginTransaction(TracingProducer.java:50)
    at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:63)
    at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:135)
    ... 13 common frames omitted

我也无法使用 kafka-console-consumer 从 kafka 主题获取消息,它只是挂起。 (我查了一下,topic确实有一些消息)

Kafka pod 具有以下设置:

--override log.dir=/var/lib/kafka/log
--override log.dirs=/var/lib/kafka/topics
--override auto.create.topics.enable=true
--override auto.leader.rebalance.enable=true
--override background.threads=10
--override compression.type=producer
--override delete.topic.enable=true
--override leader.imbalance.check.interval.seconds=300
--override leader.imbalance.per.broker.percentage=10
--override log.flush.interval.messages=9223372036854775807
--override log.flush.offset.checkpoint.interval.ms=60000
--override log.flush.scheduler.interval.ms=9223372036854775807
--override log.retention.bytes=-1
--override log.retention.hours=168
--override log.roll.hours=168
--override log.roll.jitter.hours=0
--override log.segment.bytes=1073741824
--override log.segment.delete.delay.ms=60000
--override message.max.bytes=1000012
--override min.insync.replicas=1
--override num.io.threads=8
--override num.network.threads=3
--override num.recovery.threads.per.data.dir=1
--override num.replica.fetchers=1
--override offset.metadata.max.bytes=4096
--override offsets.commit.required.acks=-1
--override offsets.commit.timeout.ms=5000
--override offsets.load.buffer.size=5242880
--override offsets.retention.check.interval.ms=600000
--override offsets.retention.minutes=1440
--override offsets.topic.compression.codec=0
--override offsets.topic.num.partitions=50
--override offsets.topic.replication.factor=3
--override offsets.topic.segment.bytes=104857600
--override queued.max.requests=500
--override quota.consumer.default=9223372036854775807
--override quota.producer.default=9223372036854775807
--override replica.fetch.min.bytes=1
--override replica.fetch.wait.max.ms=500
--override replica.high.watermark.checkpoint.interval.ms=5000
--override replica.lag.time.max.ms=10000
--override replica.socket.receive.buffer.bytes=65536
--override replica.socket.timeout.ms=30000
--override request.timeout.ms=30000
--override socket.receive.buffer.bytes=102400
--override socket.request.max.bytes=104857600
--override socket.send.buffer.bytes=102400
--override unclean.leader.election.enable=true
--override zookeeper.session.timeout.ms=6000
--override zookeeper.set.acl=false
--override broker.id.generation.enable=true
--override connections.max.idle.ms=600000
--override controlled.shutdown.enable=true
--override controlled.shutdown.max.retries=3
--override controlled.shutdown.retry.backoff.ms=5000
--override controller.socket.timeout.ms=30000
--override default.replication.factor=1
--override fetch.purgatory.purge.interval.requests=1000
--override group.max.session.timeout.ms=300000
--override group.min.session.timeout.ms=6000
--override log.cleaner.backoff.ms=15000
--override log.cleaner.dedupe.buffer.size=134217728
--override log.cleaner.delete.retention.ms=86400000
--override log.cleaner.enable=true
--override log.cleaner.io.buffer.load.factor=0.9
--override log.cleaner.io.buffer.size=524288
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308
--override log.cleaner.min.cleanable.ratio=0.5
--override log.cleaner.min.compaction.lag.ms=0
--override log.cleaner.threads=1
--override log.cleanup.policy=delete
--override log.index.interval.bytes=4096
--override log.index.size.max.bytes=10485760
--override log.message.timestamp.difference.max.ms=9223372036854775807
--override log.message.timestamp.type=CreateTime
--override log.preallocate=false
--override log.retention.check.interval.ms=300000
--override max.connections.per.ip=2147483647
--override num.partitions=1
--override producer.purgatory.purge.interval.requests=1000
--override replica.fetch.backoff.ms=1000
--override replica.fetch.max.bytes=1048576
--override replica.fetch.response.max.bytes=10485760
--override reserved.broker.max.id=1000

我猜 ISR 存在一些错误配置。谁能告诉我怎么了?

附:由于我使用 spring-kafka,因此我的 application.properties 中有以下内容:

kafka:
  consumer:
    group-id: my-site
  producer:
    transaction-id-prefix: tx-
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: my.site.serializer.JacksonSerializer
    properties:
      interceptor.classes: my.site.interceptors.TokenInterceptor

然后我使用自动装配的 KafkaTemplate 发送它。

【问题讨论】:

@mike,已添加到帖子中 【参考方案1】:

我不完全确定 Spring 中的事务 API。

读取异常堆栈跟踪,在生产者站点上启动事务似乎存在问题。当事务 ID 不唯一时会发生这种情况。因此,您需要将设置 transaction-id-prefix: tx- 更改为其他设置,因为您可能有另一个生产者使用相同的前缀。

为了能够读取 kafka-console-consumer 中的事务数据,您需要确保设置--property isolation.level=read_commited。否则 kafka-console-consumer 默认只会读取未提交的消息。

【讨论】:

以上是关于无法从自动创建的 kafka 主题中获取消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]

Kafka--03---kafka安装消息的细节 主题和分区的概念

kafka存储结构以及Log清理机制

C# 无法使用 Kafka 主题的消息?

主题管理

python 使用 kafka