node.js连接Kafka
Posted 奔跑的猴
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了node.js连接Kafka相关的知识,希望对你有一定的参考价值。
前段时间搭建了zookeeper+kafka环境,但是呢,有了环境一定要使用才会有它的意义。然而,作为一个躁动的猴,不可能按部就班的去使用Java去连接。剑走偏锋,尝试一下node去连接环境。
需要注意的一点,kafka-node ^2.0版本支持的kafka环境是0.9以上,而我在搭建环境的时候使用的kafka是0.8.2.1,故,在安装依赖包时选择较低版本的kafka-node
npm install kafka-node@1
首先是生产者。
const kafka = require('kafka-node')
var client = new kafka.Client('192.168.56.121:2181',"kafka-node-client");
var Producer = kafka.Producer;
var producerOption = {
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
partitionerType: 2
}
var producer = new Producer(client,producerOption);
// 要发送的消息队列,topic为主题,message为消息,支持String和[String]
var payloads = [
{ topic: 'mykafka', messages: 'hi', partition: 0 },
{ topic: 'mykafka', messages: ['hello', 'world'] }
];
producer.on('ready',()=>{
console.log('ready');
producer.send(payloads,(err,data)=>{
console.log(data);
producer.close();
})
})
producer.on('error',(err)=>{
console.log('err');
console.log(err);
})
然后是消费者。
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client("192.168.56.121:2181,192.168.56.122:2181,192.168.56.123:2181","client"),
consumer = new Consumer(
client,
[
{ topic: 'mykafka', partition: 0 }
],
{
groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group`
// Auto commit config
autoCommit: true,
autoCommitIntervalMs: 5000,
// The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
fetchMaxWaitMs: 100,
// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
fetchMinBytes: 1,
// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
fetchMaxBytes: 1024 * 1024,
// If set true, consumer will fetch message from the given offset in the payloads
fromOffset: false,
// If set to 'buffer', values will be returned as raw buffer objects.
encoding: 'utf8'
}
);
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (error) {
console.log('error:');
console.log(error);
});
以上是关于node.js连接Kafka的主要内容,如果未能解决你的问题,请参考以下文章
为啥建议不要在 Node.js 代码的任何地方关闭 MongoDB 连接?
为啥建议不要在 Node.js 代码的任何地方关闭 MongoDB 连接?
基于 Koa平台Node.js开发的KoaHub.js连接打印机的代码