无法使用 kafka-node 向 kafka Producer 发送消息
Posted
技术标签:
【中文标题】无法使用 kafka-node 向 kafka Producer 发送消息【英文标题】:Unable to send message to kafka Producer using kafka-node 【发布时间】:2021-11-09 15:09:46 【问题描述】:我使用的是 Kafka 框架提供的默认 server.properties/zookeeper.properties 文件。
我正在尝试创建一个简单的 NodeJS 应用程序,它将向生产者发送消息并使用它们。
下面是 NodeJS 代码。
config.js
module.exports =
kafka_topic: 'catalog',
kafka_server: 'localhost:9092',
;
nodejs-producer.js
const kafka = require('kafka-node');
const config = require('./config');
try
// set the desired timeout in options
const options =
timeout: 5000,
;
const Producer = kafka.Producer;
const client = new kafka.KafkaClient(kafkaHost: config.kafka_server, requestTimeout: 5000);
const producer = new Producer(client);
const kafka_topic = config.kafka_topic;
let payloads = [
topic: kafka_topic,
messages: 'This is test message'
];
producer.on('ready', async function()
let push_status = producer.send(payloads, (err, data) =>
if (err)
console.log(err.toString());
console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
else
console.log(data.toString());
console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
);
);
producer.on('error', function(err)
console.log(err);
console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
throw err;
);
catch(e)
console.log(e);
kafka 版本 = 2.8.0 kafka-node 版本 = 5.0.0
我收到错误 - 错误:LeaderNotAvailable
如何解决这个问题?我尝试在 server.properties 文件中使用不同的值,例如adverted.listeners,但没有得到解决方案。
【问题讨论】:
这能回答你的问题吗? Leader Not Available Kafka in Console Producer 【参考方案1】:我在发送消息时也遇到了同样的问题。我通过在有效负载中添加一个分区解决了这个问题,并且消费者也使用了相同的分区。
Code I have used
【讨论】:
【参考方案2】:我已经answered this problem here
简而言之:在尝试向不存在的主题生成消息时会出现此问题。
您可以将您的 kafka 安装配置为在这种情况下自动创建主题:接下来会发生什么 - 按顺序:您仍然会收到错误消息并且框架将创建主题。在我的情况下,我不得不再次重新生成相同的消息,但这是在旧版本的 Kafka 上。
编辑: here a link 到一篇解释如何设置您的 kafka 配置以自动创建 kafka 主题的帖子。
【讨论】:
我已经通过 CLI 命令创建了主题。主题详情如下: -> 命令:/zookeeper-shell.sh ZooKeeper -server localhost:2181 get /brokers/topics/catalog -> 结果:"partitions":"0":[3,1], "1":[1,2],"2":[2,3],"topic_id":"1nKW2XXXXXXmtkNQ_A","adding_replicas":,"removing_replicas":,"version":3以上是关于无法使用 kafka-node 向 kafka Producer 发送消息的主要内容,如果未能解决你的问题,请参考以下文章