计一次node消费kafka消息的踩坑
Posted bbwolf80600
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了计一次node消费kafka消息的踩坑相关的知识,希望对你有一定的参考价值。
项目场景:
项目场景:node消费kafka消息,保存到mysql数据库。
实现方式
- 单数据库连接循环写入
- 线程池管理连接,循环写入
- 线程池管理连接,拼接操作语句批量写入
问题描述
按照正常分析逻辑方式一最慢,方式二居中,方式三最快。
测试代码如下(20个最大连接):
单数据源循环写入:
let connection: Connection;
/**
* 创建 mysqlClient 对象
* @returns
*/
export async function makeConnection()
if(connection)
return connection;
if (!getConnectionManager().has('default'))
const connectionOptions = await getConnectionOptions();
connection = await createConnection(connectionOptions);
else
connection = getConnection();
return connection;
线程池管理连接,循环写入
//mysql连接池
var poor = mysql.createPool(
host:config.MysqlHost,//"数据库服务器地址",
port: config.MysqlPort, // MySQL数据库端口号
database:config.MysqlDatabase,// "数据库名",
user:config.MysqlUsername,//"连接数据库的用户名",
password:config.MysqlPassword,//"连接数据库的密码",
connectionLimit: config.MysqlPoolConnectionLimit,//"指定连接池中最大的链接数,默认是10",
multipleStatements: true,//"是否运行执行多条sql语句,默认值为false"
)
/**
* 数据库的操作
* @param sql 执行的操作语句
* @param params 给sql语句的占位符进行赋值的参数数组
* @returns Promise<unknown>
*/
function poorQuery(sql, params = [])
return new Promise((resolve, reject) =>
poor.getConnection((err,conn) =>
if (err)
console.log(`数据库连接失败:$err.message`);
reject(code:500)
else
conn.query(sql, params, (err, res) =>
if (err)
console.log('数据库查询失败', err)
reject(code:500)
else
resolve(code:200,data:res)
)
conn.release();//释放连接
)
)
线程池管理连接,拼接操作语句批量写入
//mysql连接池
var poor = mysql.createPool(
host:config.MysqlHost,//"数据库服务器地址",
port: config.MysqlPort, // MySQL数据库端口号
database:config.MysqlDatabase,// "数据库名",
user:config.MysqlUsername,//"连接数据库的用户名",
password:config.MysqlPassword,//"连接数据库的密码",
connectionLimit: config.MysqlPoolConnectionLimit,//"指定连接池中最大的链接数,默认是10",
multipleStatements: true,//"是否运行执行多条sql语句,默认值为false"
)
/**
* 数据库的操作--开启事务
* @param sql 执行的操作语句
* @param params 给sql语句的占位符进行赋值的参数数组
* @returns Promise<unknown>
*/
function poorQuery(sql, params = [])
return new Promise((resolve, reject) =>
poor.getConnection((err,conn) =>
if (err)
console.log(`数据库连接失败:$err.message`);
reject(code:500)
else
conn.beginTransaction(function(err)
if (err)
conn.rollback(function()
throw err;
);
console.log(`数据库事务创建失败:$err.message`);
reject(code:500)
conn.query(sql, params, (err, res) =>
if (err)
conn.rollback(function()
throw err;
);
console.log('数据库查询失败', err)
//reject(err)
reject(code:500)
conn.commit(function(err)
if (err)
console.log('数据库事务提交失败', err)
reject(code:500)
else
resolve(code:200,data:res)
);
)
);
conn.release();//释放连接
)
)
//暂存kafka数据
export let dataArray = new Array();
//循环遍历list。顺序消费数据
let logger = makeLogger('mysql', uuidv4(), null, null, null);
async function creatTimeout()
let map = new Map<String,sql:string,params:Array<string>>()
let num = 0
while(true)
let line = dataArray.shift();
if(line)
let sql = '***';
let params = new Array();
params.push(line.xxx);
map.set("**_" + line.xxx,sql,params)
num++
if(!line || num >= 1000)
break;
if(num > 0)
let sql = '';
let params = new Array()
map.forEach(function(value)
sql += value.sql;
params = params.concat(value.params)
)
const res:any = await poorQuery(sql,params)
if(res.code != 500)
logger.debug('消费事件完成:' + JSON.stringify(res))
break;
setTimeout(await function()
creatTimeout();
,1);
运行结果
方式 | 执行速度 |
---|---|
方式1 | 100 |
方式2 | 1300 |
方式3 | 30 |
== 方式3慢的超乎想象 ==
原因分析:
第三种方式批量处理会先对每条数据做delete
操作。对应where条件不是索引
。造成每次删除都是全表扫描,数据越多速度越慢。
解决方案:
添加索引
KEY index_n (n)
最终效果
方式3平均速度3000+
以上是关于计一次node消费kafka消息的踩坑的主要内容,如果未能解决你的问题,请参考以下文章