我的node之kafka实践

Posted yxz1025

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了我的node之kafka实践相关的知识,希望对你有一定的参考价值。

最近公司基于微信做了一个微信发红包的工具,此工具需要对接授权给微信公众号,然后通过关键词实现发红包

一、整体架构

 系统主要构成模块如下:

一、微信转发Middle服务模块(负责接受用户关键词等请求转发到对应的前端服务器)

1)、微信转发server首先查询当前用户是否已经有过一次域名授权转发,若从在则直接获取之前的域名进行转发



二、前端+微信授权模块服务(可同时支持多个微信公众号授权,一个用户只能授权一种微信公众号token+openid


1)、通过微信转发server回复给用户对应公众号的图文消息带有授权公众号的域名

2)、授权成功保存用户信息,同时保存openidsession


3)、同一个用户只能抢一次,一旦抢过,则在session中记录用户抢记录,避免同一用户多次点击抢红包按钮,减少后端请求


4)、对于参与过抽奖的用户,二次之后的请求结果页面直接缓存,减少不必要的请求


三、红包模块(不同公众号对应创建一个红包队列)


1)、同一个公众号只能创建一个红包队列

2)、红包可进行须发



四、发放红包模块


1)、通过对应的用户授权公众号token区分不同微信支付



五、通用授权公众号hash配置

1)、通过授权域名做为键,对应的值则为公众号的基本信息、支付配置信息、授权转发域名等



六、消息日志记录采用kafka+zookeeper

单台服务器最高同时在线人数达到5000的并发写,发红包的记录通过脚本每五分钟从kafka中拉去消费数据,保存到数据库,做为后期的数据分析


以下为node发送消息和读取消息例子:

producer:

var kafka = require('kafka-node');
var Producer = kafka.Producer;
var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var argv = 
    topic: "topic1"
;
var topic = argv.topic || 'topic10';
var p = argv.p || 0;
var a = argv.a || 0;
var producer = new Producer(client, 
    requireAcks: 1
);
producer.on('ready', function() 
    var args = 
        appid: 'wx238c28839a133d0e',
        createTime: 'ddd',
        toUserName: 'wx238c28839a133d0e',
        fromUserName: 'wx238c28839a133d0e'
    ;
    // var keyedMessage = new KeyedMessage('keyed', 'a keyed message');
    producer.send([
        topic: topic,
        partition: p,
        messages: [JSON.stringify(args)],
        attributes: a
    ], function(err, result) 
        console.log(err || result);
        process.exit();
    );

    //create topics
    // producer.createTopics(['t1'], function (err, data) 
    //     console.log(data);
    // );
);

consumer:

'use strict';

var kafka = require('kafka-node');
var events = require('events');
var emitter = new events.EventEmitter();
//var HighLevelConsumer = kafka.HighLevelConsumer;
var Consumer = kafka.Consumer;
var Offset = kafka.Offset;
var Client = kafka.Client;
var argv = 
    topic: "hongbao_xxx"
;
var topic = argv.topic || 'topic1';

var client = new Client('localhost:2181');
var topics = [
        topic: topic,
        partition: 0,
        offset: 8000
    ],
    options = 
        autoCommit: false,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        fromOffset: true
    ;

var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);

//consumer.setOffset(topic, 0, 36);
consumer.on('message', function(message) 
    var obj = message;
    var message = JSON.parse(message.value);
    var args = [];
    args.push(message.openId);
    args.push(message.fromUserName);
    args.push(message.toUserName);
    args.push(message.money);
    args.push(message.attach);
    args.push(message.appId);
    args.push(message.cTime);
    emitter.emit('load', args);

);

consumer.on('error', function(err) 
    console.log('error', err);
);

emitter.on('load', function(args) 
        console.log('listener2', args);

        // function insert(args) 
        //     middleconsumer.saveRecord(args)
        //         .then(function(data) 
        //             insert(args);
        //         , function(er) 

        //         );
        // 
        // insert(args);
);
    /*
     * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
     */
consumer.on('offsetOutOfRange', function(topic) 
    topic.maxNum = 2;
    offset.fetch([topic], function(err, offsets) 
        var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
        consumer.setOffset(topic.topic, topic.partition, min);
    );
);

由于时间原因,写的比较粗糙,欢迎吐槽,另外附上公司红包工具平台,http://gatao.cn

以上是关于我的node之kafka实践的主要内容,如果未能解决你的问题,请参考以下文章

公共安全领域 Kafka 应用实践

KAFKA EAGLE 监控MRS kafka之操作实践

kafka基本命令和实践

kafka之kafka的伪分布式安装

5分钟spark streaming实践之 与kafka联姻

夯实Kafka知识体系及基本功「实践操作篇」单机部署实践手册(2.8.0)