无法使用 kafka-node 向 kafka Producer 发送消息

Posted

技术标签:

【中文标题】无法使用 kafka-node 向 kafka Producer 发送消息【英文标题】:Unable to send message to kafka Producer using kafka-node 【发布时间】:2021-11-09 15:09:46 【问题描述】:

我使用的是 Kafka 框架提供的默认 server.properties/zookeeper.properties 文件。

我正在尝试创建一个简单的 NodeJS 应用程序,它将向生产者发送消息并使用它们。

下面是 NodeJS 代码。

config.js

module.exports = 
  kafka_topic: 'catalog',
  kafka_server: 'localhost:9092',
;

nodejs-producer.js

const kafka = require('kafka-node');
const config = require('./config');

try 

    // set the desired timeout in options
    const options = 
        timeout: 5000,
    ;
    const Producer = kafka.Producer;
    const client = new kafka.KafkaClient(kafkaHost: config.kafka_server, requestTimeout: 5000);
    const producer = new Producer(client);
    const kafka_topic = config.kafka_topic;
    let payloads = [
        
            topic: kafka_topic,
            messages: 'This is test message'
        
    ];

    producer.on('ready', async function() 
        let push_status = producer.send(payloads, (err, data) => 
            if (err) 
                console.log(err.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
             else 
                console.log(data.toString());
                console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
            
        );
    );

    producer.on('error', function(err) 
        console.log(err);
        console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
        throw err;
    );

catch(e) 
    console.log(e);

kafka 版本 = 2.8.0 kafka-node 版本 = 5.0.0

我收到错误 - 错误:LeaderNotAvailable

如何解决这个问题?我尝试在 server.properties 文件中使用不同的值,例如adverted.listeners,但没有得到解决方案。

【问题讨论】:

这能回答你的问题吗? Leader Not Available Kafka in Console Producer 【参考方案1】:

我在发送消息时也遇到了同样的问题。我通过在有效负载中添加一个分区解决了这个问题,并且消费者也使用了相同的分区。

Code I have used

【讨论】:

【参考方案2】:

我已经answered this problem here

简而言之:在尝试向不存在的主题生成消息时会出现此问题。

您可以将您的 kafka 安装配置为在这种情况下自动创建主题:接下来会发生什么 - 按顺序:您仍然会收到错误消息并且框架将创建主题。在我的情况下,我不得不再次重新生成相同的消息,但这是在旧版本的 Kafka 上。

编辑: here a link 到一篇解释如何设置您的 kafka 配置以自动创建 kafka 主题的帖子。

【讨论】:

我已经通过 CLI 命令创建了主题。主题详情如下: -> 命令:/zookeeper-shell.sh ZooKeeper -server localhost:2181 get /brokers/topics/catalog -> 结果:"partitions":"0":[3,1], "1":[1,2],"2":[2,3],"topic_id":"1nKW2XXXXXXmtkNQ_A","adding_replicas":,"removing_replicas":,"version":3

以上是关于无法使用 kafka-node 向 kafka Producer 发送消息的主要内容,如果未能解决你的问题,请参考以下文章

node.js连接Kafka

带键的 kafka 消息似乎总是去同一个分区

无法通过java代码向kafka主题发送消息

无法从 docker 中运行的服务向 kafka 生成消息

无法向 Kafka 集群生成 PubNub 数据流

春季启动生产者在kafka重启后无法发送任何消息