计一次node消费kafka消息的踩坑

Posted bbwolf80600

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了计一次node消费kafka消息的踩坑相关的知识,希望对你有一定的参考价值。

项目场景:

项目场景:node消费kafka消息,保存到mysql数据库。


实现方式

  1. 单数据库连接循环写入
  2. 线程池管理连接,循环写入
  3. 线程池管理连接,拼接操作语句批量写入

问题描述

按照正常分析逻辑方式一最慢,方式二居中,方式三最快。
测试代码如下(20个最大连接):
单数据源循环写入:

let connection: Connection;
/**
 * 创建 mysqlClient 对象
 * @returns 
 */
export async function makeConnection() 
    if(connection)
        return connection;
      
    if (!getConnectionManager().has('default')) 
        const connectionOptions = await getConnectionOptions();
        connection = await createConnection(connectionOptions);
     else 
        connection = getConnection();
    
    return connection;

线程池管理连接,循环写入

//mysql连接池
var poor = mysql.createPool(
    host:config.MysqlHost,//"数据库服务器地址",
    port: config.MysqlPort, // MySQL数据库端口号
    database:config.MysqlDatabase,// "数据库名",
    user:config.MysqlUsername,//"连接数据库的用户名",
    password:config.MysqlPassword,//"连接数据库的密码",
    connectionLimit: config.MysqlPoolConnectionLimit,//"指定连接池中最大的链接数,默认是10",
    multipleStatements: true,//"是否运行执行多条sql语句,默认值为false"
)

/**
 * 数据库的操作
 * @param sql 执行的操作语句
 * @param params 给sql语句的占位符进行赋值的参数数组
 * @returns Promise<unknown>
 */
function poorQuery(sql, params = []) 
    return new Promise((resolve, reject) => 
        poor.getConnection((err,conn) => 
            if (err) 
                console.log(`数据库连接失败:$err.message`);
                reject(code:500)
            else
                conn.query(sql, params, (err, res) => 
                    if (err) 
                        console.log('数据库查询失败', err)
                        reject(code:500)
                     else 
                        resolve(code:200,data:res)
                    
                )
                conn.release();//释放连接
            
        )
    )

线程池管理连接,拼接操作语句批量写入

//mysql连接池
var poor = mysql.createPool(
    host:config.MysqlHost,//"数据库服务器地址",
    port: config.MysqlPort, // MySQL数据库端口号
    database:config.MysqlDatabase,// "数据库名",
    user:config.MysqlUsername,//"连接数据库的用户名",
    password:config.MysqlPassword,//"连接数据库的密码",
    connectionLimit: config.MysqlPoolConnectionLimit,//"指定连接池中最大的链接数,默认是10",
    multipleStatements: true,//"是否运行执行多条sql语句,默认值为false"
)

/**
 * 数据库的操作--开启事务
 * @param sql 执行的操作语句
 * @param params 给sql语句的占位符进行赋值的参数数组
 * @returns Promise<unknown>
 */
function poorQuery(sql, params = []) 
    return new Promise((resolve, reject) => 
        poor.getConnection((err,conn) => 
            if (err) 
                console.log(`数据库连接失败:$err.message`);
                reject(code:500)
            else
                conn.beginTransaction(function(err) 
                    if (err) 
                        conn.rollback(function() 
                            throw err;
                        );
                        console.log(`数据库事务创建失败:$err.message`);
                        reject(code:500)
                    
                    conn.query(sql, params, (err, res) => 
                        if (err) 
                            conn.rollback(function() 
                                throw err;
                            );
                            console.log('数据库查询失败', err)
                            //reject(err)
                            reject(code:500)
                        
                        conn.commit(function(err) 
                            if (err) 
                                console.log('数据库事务提交失败', err)
                                reject(code:500)
                            else
                                resolve(code:200,data:res)
                            
                        );
                    )
                );
                conn.release();//释放连接
            
        )
    )


//暂存kafka数据
export let dataArray = new Array();
//循环遍历list。顺序消费数据
let logger = makeLogger('mysql', uuidv4(), null, null, null);
async function creatTimeout()
    let map = new Map<String,sql:string,params:Array<string>>()
    let num = 0
    while(true)
        let line = dataArray.shift();
        if(line)
            let sql = '***';
            let params = new Array();
            params.push(line.xxx);
            map.set("**_" + line.xxx,sql,params)
            num++
        
        if(!line || num >= 1000)
            break;
        
    
    if(num > 0)
        let sql = '';
        let params = new Array()
        map.forEach(function(value)
            sql += value.sql;
            params = params.concat(value.params)
        )
        const res:any = await poorQuery(sql,params)
        if(res.code != 500)
            logger.debug('消费事件完成:' + JSON.stringify(res))
            break;
        
    
    
    setTimeout(await function()
        creatTimeout();
    ,1);

运行结果

方式执行速度
方式1100
方式21300
方式330

== 方式3慢的超乎想象 ==


原因分析:

第三种方式批量处理会先对每条数据做delete操作。对应where条件不是索引。造成每次删除都是全表扫描,数据越多速度越慢。


解决方案:

添加索引

KEY index_n (n)

最终效果

方式3平均速度3000+

以上是关于计一次node消费kafka消息的踩坑的主要内容,如果未能解决你的问题,请参考以下文章

科目一考试一次必过的踩坑笔记 All In One

记一次pm2的踩坑

一次php foreach 变量作用域的踩坑记录

踩坑adb——我的一次使用adb命令的踩坑之旅

记一次webpack3升级webpack4的踩坑

angular脚手架安装的踩坑记录