Confent Kafka Python生产者未使用ACKS =所有配置进行生产

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Confent Kafka Python生产者未使用ACKS =所有配置进行生产相关的知识,希望对你有一定的参考价值。

我有一些将在kafka主题中产生的python代码,在默认设置acks=1下可以正常工作,但是当我更改为acks=allacks=2时,消息不会出现在主题中。主题上的min.insync.replicas配置设置为2。运行令人困惑的代码后没有返回错误消息?集群中有3个代理。

这里是代码

from confluent_kafka import Producer
from kafka.errors import KafkaError

def get_producer_config():
    return Producer(get_config())


def get_config():
    conf = 
        'bootstrap.servers': 'localhost:9092',
        'acks': '2',
    

    return conf

try:
    producer = get_producer_config()
    producer.produce('test', 'test message from local app')
    producer.flush()
except KafkaError as error:
    get_logger().error(str(error))

这源于调试卡夫卡生产者lambda的错误消息KafkaErrorcode=NOT_ENOUGH_REPLICAS,val=19,str="Broker: Not enough in-sync replicas"我正在尝试在本地进行顶级复制,但是没有收到错误,但是在尝试时注意到了这一点。

谢谢

答案

生产者要求领导者在确认请求完成之前已收到的确认数。允许的值:0表示无确认,1表示仅领导者,-1表示完整的ISR。

2)您必须检查主题级别的配置。有两个参数是相关的。min.insync.replicas(ISR) <= replication-factor
另一答案

[acks = 1:这是默认值,其中仅领导者将消息写入其日志,但是将响应而无需等待所有关注者的完全确认。”]

    acks = 0:生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。
  1. acks =all:这意味着领导者将等待整套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。
  2. 此外,请在min.insyn.replica中指定一个有效值,该值应包含编号。包括领导者的副本。
  • 例如,如果您的复制因子为4,并且min.insync.replica = 3,则它是acks = all的有效配置。
  • 以上是关于Confent Kafka Python生产者未使用ACKS =所有配置进行生产的主要内容,如果未能解决你的问题,请参考以下文章

    python 使用 kafka

    Kafka 简单实验二(Python实现简单生产者消费者)

    一对kafka消息生产者-消息消费者,python

    python生产消费Kafka

    Kafka 通过python简单的生产消费实现

    使用Python读写Kafka