node.js连接Kafka

Posted 奔跑的猴

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了node.js连接Kafka相关的知识,希望对你有一定的参考价值。

        前段时间搭建了zookeeper+kafka环境,但是呢,有了环境一定要使用才会有它的意义。然而,作为一个躁动的猴,不可能按部就班的去使用Java去连接。剑走偏锋,尝试一下node去连接环境。

        需要注意的一点,kafka-node ^2.0版本支持的kafka环境是0.9以上,而我在搭建环境的时候使用的kafka是0.8.2.1,故,在安装依赖包时选择较低版本的kafka-node

        npm install kafka-node@1

        首先是生产者。

const kafka = require('kafka-node')

var client = new kafka.Client('192.168.56.121:2181',"kafka-node-client");
var Producer = kafka.Producer;

var producerOption = {
   // Configuration for when to consider a message as acknowledged, default 1
   requireAcks: 1,
   // The amount of time in milliseconds to wait for all acks before considered, default 100ms
   ackTimeoutMs: 100,
   // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
   partitionerType: 2
}

var producer = new Producer(client,producerOption);
// 要发送的消息队列,topic为主题,message为消息,支持String和[String]
var payloads = [
       { topic: 'mykafka', messages: 'hi', partition: 0 },
       { topic: 'mykafka', messages: ['hello', 'world'] }
   ];

producer.on('ready',()=>{
   console.log('ready');
   producer.send(payloads,(err,data)=>{
       console.log(data);
       producer.close();
   })
})

producer.on('error',(err)=>{
   console.log('err');
   console.log(err);
})

         然后是消费者。

var kafka = require('kafka-node'),
   Consumer = kafka.Consumer,
   client = new kafka.Client("192.168.56.121:2181,192.168.56.122:2181,192.168.56.123:2181","client"),
   consumer = new Consumer(
       client,
       [
           { topic: 'mykafka', partition: 0 }
       ],
       {
           groupId: 'kafka-node-group',//consumer group id, default `kafka-node-group`
           // Auto commit config
           autoCommit: true,
           autoCommitIntervalMs: 5000,
           // The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued, default 100ms
           fetchMaxWaitMs: 100,
           // This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
           fetchMinBytes: 1,
           // The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
           fetchMaxBytes: 1024 * 1024,
           // If set true, consumer will fetch message from the given offset in the payloads
           fromOffset: false,
           // If set to 'buffer', values will be returned as raw buffer objects.
           encoding: 'utf8'
       }
   );

consumer.on('message', function (message) {
   console.log(message);
});
consumer.on('error', function (error) {
   console.log('error:');
   console.log(error);
});


以上是关于node.js连接Kafka的主要内容,如果未能解决你的问题,请参考以下文章

澄清 node.js + promises 片段

为啥建议不要在 Node.js 代码的任何地方关闭 MongoDB 连接?

为啥建议不要在 Node.js 代码的任何地方关闭 MongoDB 连接?

基于 Koa平台Node.js开发的KoaHub.js连接打印机的代码

正式版来袭!Kafka 1.0 发布,告别4位数版本号;Angular 5.0 和 Node.js 9.0 迎重大更新

vscode代码片段建议bug