node.js+mysql实现分库分表存查数据:

Posted 苦海123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了node.js+mysql实现分库分表存查数据:相关的知识,希望对你有一定的参考价值。

node.js+mysql实现分库分表:

1.分库分表使用场景:

互联网项目中常用到的关系型数据库,如MySQL,随着用户和业务的增长,传统的单库单表模式难以满足大量的业务数据存储以及查询,单库单表中大量的数据会使写入、查询效率非常之慢,此时可以采取分库分表策略来解决,下面将以仪表采集数据项目为例来介绍。

2.业务需求:

手上有一个项目,需要按秒或按分钟采集数据存到数据库中,不同的用户有N多台不同的仪表,每个仪表有很多基础的配置信息和采集的数据,此时要分库分表(按年分库)存数据来优化性能。

3.分表方式(水平分表和垂直分表):

水平分表: 按数据量的范围对数据进行分表存储,例如:11000000为一个表,10000012000000为一个表,一次类推。

优点: 扩容简单,提前建好库、表就好;

缺点:操作数据会有IO瓶颈,单表热点访问量过大问题;

针对这种问题可以使用HASH取模的方式将数据平均的落到每一个表中(仪表id取模表的数据得到的余数作为实时采集表的后缀如:data_0表、data_1表)

HASH取模方法优点: 能保证数据较均匀的分散落在不同的库、表中,减轻了数据库表压力

HASH取模方法缺点: 扩容麻烦、迁移数据时每次都需要重新计算hash值分配到不同的库和表

垂直分表:指将某个表中某些业务数据分到多个表中,如:之前的用户订单表拆分为用户表和订单表。

4.代码演示:

实际项目中可根据项目业务结合两种分表方法使用,按年分库按数量分表,具体如下:

1.对递增幅度比较大的表进行水平拆分: 这种方法需要估算单表在不分表的情况下大概数据,而mysql单表数据量推荐300万条以下,具体还要看每条数据所占的字节数。比如一年的数据量有1亿条,此时按每个表存100W条计算,那么需要分表数量为:100000000 / 1000000 = 10,此时我们可以提前将一年的数据表建好,表名可以使用前缀加后缀索引的方式,如:data_0、data_1、…、data_9

2.将数据均匀地落到对应的表中: 上面的表已经分好了,此时需要将仪表的数据存到对应的表中,每一个表都有一个唯一的id,当然你的项目可能不是仪表,比如某个用户也是有唯一id的,此时可以用仪表的id 取模 表的数量,得到的余数就是要落入到对应表的后缀,如:仪表id为9,那么数据应该落到:9%10 = 9 ,即:data_9, 注意:查某个仪表的数据也是通过仪表id取模表的数量找到对应的表。

3.分库: 上面已经对数据进行了分表处理,那么接下来就是分库处理,需要知道的是,不能使用同一条SQL语句查询多个库中的记录数据(查阅资料大概可以垮库查询,市面上很多程序也不支持垮库,这里我不做过多纠结),因此我们只需要将数据库有规律的命名查询即可,这里推荐按年份分库,如:platformdatadb2022、platformdatadb2023、platformdatadb2024,需要注意的是每个库中应提前建好data_索引表,此时我们写入和读数据只需要切换到对应的库即可。

4.默认库: 项目中应该有个默认数据变化幅度比较小的库,我们此时可以在这个默认的库上使用切换库查询,数据写入应该每年切换到当前年所在的库,而切换库时应该确保库的存在,此时查询当前年库是否存在也是需要在默认库的基础上查询的,默认库配置如下:

// 引入mysql数据库模块
const mysql = require('mysql')
// 默认数据库配置:
let mysqlHost = 
  host : 'localhost',
  user : 'root',
  database : 'platf***defaultdb',
  password : '*********',
  timezone: "08:00",
  connectionLimit: 2

const connection = mysql.createConnection(mysqlHost)
//导出默认数据库模块:
module.exports = connection

5.定时事务: 了解上面后,你可能会有个问题,那么谁每年去创建存大数量的数据库呢?此时你可能想到定时器,但是想想简单的定时器也不能满足要求,因为简单的定时器是隔一段时间执行一次或不断的隔一段时间执行某段代码,那么这个定时器是需要创建的,可能是某个不确定的时间,那么这样就不能控制什么时间去执行某段代码,为解决上面问题,我们可以使用定时事务,所谓定时事务是指,在某个时间点去执行某段代码,比如一天中的某个时刻、一年中的某个时刻,node.js中定时事务我们可以装一个node-schedule包来完成,具体如下(此模块将导出每年自动切换的存大量数据的数据库,其他基础数据应该从默认库中操作):

// 引入mysql数据库模块,供创建新数据库连接
const mysql = require('mysql')
// 导入定时任务模块:
const schedule = require('node-schedule')
// 导入默认数据库:
const connection = require('./mysqldbconfig')

// 1.创建数据库方法:将每年创建数据库的业务封装到一个函数中:
function createDataBaseHandle (year) 
  return new Promise((resolve)=>
    // 1.创建库:
    function createDataBase() 
      return new Promise((resolve1)=>
        let createDataBaceSql = 'CREATE DATABASE platformdatadb'+year+''
        connection.query(createDataBaceSql,(error)=>
          try 
            if (error) 
              throw error
             else 
              console.log(year+'年数据库已创建,开始创建报警表:')
              resolve1()
            
           catch 
            console.log('1.创建数据库错误:'+error)
          
        )
      )
    
    // 2.创建报警表:
    async function createWarnTable () 
      // 创建数据库完成:
      await createDataBase()
      return new Promise((resolve2)=>
        let createWarnTableSql = 'CREATE TABLE platformdatadb'+year+'.`sms_record`( `sms_id` INT NOT NULL AUTO_INCREMENT, `subtime` TIMESTAMP NOT NULL, `equipment_id` INT NOT NULL, `user_id` INT NOT NULL, `content` VARCHAR(255), `sxdx` INT NOT NULL, `number` VARCHAR(255), `sffcg` VARCHAR(255) NOT NULL, `tongji` INT DEFAULT 0, PRIMARY KEY (`sms_id`) ); '
        connection.query(createWarnTableSql,(error)=>
          try 
            if (error) 
              throw error
             else 
              console.log('创建报警表已完成,开始创建实时值表:')
              resolve2()
            
           catch 
            console.log('2.创建报警表错误:'+error)
          
        )
      )
    
     // 3.创建订单表:
    async function createOrderTable () 
      // 创建报警表完成
      await createWarnTable()
      return new Promise((resolve) => 
        let createOrderTableSql = 'CREATE TABLE platformdatadb'+year+'.`recharge_order`(`id` INT NOT NULL AUTO_INCREMENT, `order_private` VARCHAR(20) NOT NULL, `uid` INT NOT NULL, `pid` INT NOT NULL, `total_money` DECIMAL(19,2) NOT NULL, `create_time` TIMESTAMP NOT NULL, `mark` VARCHAR(100), `action_time` TIMESTAMP, `state` INT NOT NULL DEFAULT 0, `pay_time` TIMESTAMP NOT NULL, PRIMARY KEY (`id`) ); '
        defaultConnection.query(createOrderTableSql,(error)=>
          try 
            if (error) 
              throw error
             else 
              console.log('订单表创建已完成,开始创建实时值表:')
              resolve()
            
           catch 
            console.log('创建订单表错误:'+error)
          
        )
      )
    
    // 4.创建实时值表:
    async function createDataTable () 
      // 创建订单表完成
      await createOrderTable()
      // 递归的方式创建实时值表:
      return new Promise((resolve3)=>
        let i = 0
        function start() 
          if (i > 999) 
            console.log('实时值表创建完成,开始创建累计值表:')
            resolve3()
            return
           else 
            console.log('正在创建表:data_'+i)
          
          let createDataTableSql = 'CREATE TABLE platformdatadb'+year+'.`data_'+i+'`( `id` INT NOT NULL AUTO_INCREMENT,`pid` INT NOT NULL, `D1` VARCHAR(50), `D2` VARCHAR(50), `D3` VARCHAR(50), `D4` VARCHAR(50), `D5` VARCHAR(50), `D6` VARCHAR(50), `D7` VARCHAR(50), `D8` VARCHAR(50), `D9` VARCHAR(50), `D10` VARCHAR(50), `D11` VARCHAR(50), `D12` VARCHAR(50), `D13` VARCHAR(50), `D14` VARCHAR(50), `D15` VARCHAR(50), `D16` VARCHAR(50), `D17` VARCHAR(50), `D18` VARCHAR(50), `D19` VARCHAR(50), `D20` VARCHAR(50), `D21` VARCHAR(50), `D22` VARCHAR(50), `D23` VARCHAR(50), `D24` VARCHAR(50), `D25` VARCHAR(50), `D26` VARCHAR(50), `D27` VARCHAR(50), `D28` VARCHAR(50), `D29` VARCHAR(50), `D30` VARCHAR(50), `D31` VARCHAR(50), `D32` VARCHAR(50), `D33` VARCHAR(50), `D34` VARCHAR(50), `D35` VARCHAR(50), `D36` VARCHAR(50), `D37` VARCHAR(50), `D38` VARCHAR(50), `D39` VARCHAR(50), `D40` VARCHAR(50), `D41` VARCHAR(50), `D42` VARCHAR(50), `D43` VARCHAR(50), `D44` VARCHAR(50), `D45` VARCHAR(50), `D46` VARCHAR(50), `D47` VARCHAR(50), `D48` VARCHAR(50), `D49` VARCHAR(50), `D50` VARCHAR(50) ,`subTime` TIMESTAMP NOT NULL, PRIMARY KEY (`id`) )'
          connection.query(createDataTableSql,(error)=>
            try 
              if (error) 
                throw error
               else 
                i++
                start()
              
             catch 
              console.log('3创建实时值表错误:'+error)
            
          )
        
        start()
      )
    
    // 5. 创建累计值表
    async function createCumulativeTable () 
      // 创建实时值表完成
      await createDataTable()
      // 递归的方式创建实时值表:
      let i = 0
      function start() 
        if (i > 999) 
          console.log('数据库和表创建已完成!')
          resolve()
          return
         else 
          console.log('正在创建表:cumulative_'+i)
        
        let createDataTableSql = 'CREATE TABLE platformdatadb'+year+'.`cumulative_'+i+'`( `id` INT NOT NULL AUTO_INCREMENT,`pid` INT NOT NULL, `accum1` VARCHAR(50), `accum2` VARCHAR(50), `accum3` VARCHAR(50), `accum4` VARCHAR(50), `accum5` VARCHAR(50), `accum6` VARCHAR(50), `accum7` VARCHAR(50), `accum8` VARCHAR(50), `accum9` VARCHAR(50), `accum10` VARCHAR(50), `accum11` VARCHAR(50), `accum12` VARCHAR(50), `accum13` VARCHAR(50), `accum14` VARCHAR(50), `accum15` VARCHAR(50), `accum16` VARCHAR(50), `accum17` VARCHAR(50), `accum18` VARCHAR(50), `accum19` VARCHAR(50), `accum20` VARCHAR(50), `accum21` VARCHAR(50), `accum22` VARCHAR(50), `accum23` VARCHAR(50), `accum24` VARCHAR(50), `accum25` VARCHAR(50), `accum26` VARCHAR(50), `accum27` VARCHAR(50), `accum28` VARCHAR(50), `accum29` VARCHAR(50), `accum30` VARCHAR(50), `accum31` VARCHAR(50), `accum32` VARCHAR(50), `accum33` VARCHAR(50), `accum34` VARCHAR(50), `accum35` VARCHAR(50), `accum36` VARCHAR(50), `accum37` VARCHAR(50), `accum38` VARCHAR(50), `accum39` VARCHAR(50), `accum40` VARCHAR(50), `accum41` VARCHAR(50), `accum42` VARCHAR(50), `accum43` VARCHAR(50), `accum44` VARCHAR(50), `accum45` VARCHAR(50), `accum46` VARCHAR(50), `D47` VARCHAR(50), `accum48` VARCHAR(50), `accum49` VARCHAR(50), `accum50` VARCHAR(50) ,`subtime` TIMESTAMP NOT NULL, PRIMARY KEY (`id`) )'
        connection.query(createDataTableSql,(error)=>
          try 
            if (error) 
              throw error
             else 
              i++
              start()
            
           catch 
            console.log('4.创建实时值表错误:'+error)
          
        )
      
      start()
    
    createCumulativeTable()
  )


// 2.每年12月1日自动创建下一年数据库:(利用定时事务每年的12月1日0点0时0分开始创建下一年数据库)
schedule.scheduleJob('0 0 0 1 12 *', async function () 
  let date = new Date()
  let yearstr = date.getFullYear()
  // 调用创建数据库方法创建数据库:
  let nextYear = parseInt(yearstr) + 1
  console.log('自动开始创建'+nextYear+'年数据库')
  await createDataBaseHandle(nextYear)
)

// 3.创建动态数据库配置(首次启动程序连接当前年份数据库,下一年自动切换到新一年数据库):
function createMysqlObj() 
  let date = new Date()
  let yearstr = date.getFullYear()
  // 动态数据库配置:
  let mysqlHost = 
    host : 'localhost',
    user : 'root',
    database : 'platf***datadb' + yearstr,
    password : '*********',
    timezone: "08:00",
    connectionLimit: 100
  
  return mysqlHost


// 4.程序每次启动获取当前时间并切换到当前时间数据库:
let mysqlConfig = createMysqlObj()

// 5.每年1月1日自动切换到最新数据库(当前年的数据库):
schedule.scheduleJob('0 0 0 1 1 *', function () 
  let date = new Date()
  let yearstr = date.getFullYear()
  // 如果新的一年数据库创建了那么立马切换到新创建的数据库,否则还是沿用旧数据库
  let sql = 'SELECT table_name FROM `INFORMATION_SCHEMA`.`TABLES` WHERE table_schema = platformdatadb'+yearstr
  connection.query(sql,async (error,result)=>
    try 
      if (error) 
        throw error
       else 
        if (result.length !== 0) 
          mysqlConfig.database = 'platformdatadb' + yearstr
         else 
          // 如果发现新的数据库没有创建,那么自动创建数据库,创建完后再切换到新的数据库:(此问题一般不会出现,除非在创建数据库的时候服务器突然掉线,这种概率很小的事情即使发生,我们也可以手动创建库,手动创建的代码我会粘在后面)
          await createDataBaseHandle(yearstr)
          mysqlConfig.database = 'platformdatadb' + yearstr
        
      
     catch 
      console.log('查询新创建数据库是否存在失败')
    
  )
)

const dataConnection = mysql.createConnection(mysqlConfig)
//导出动态按年切换数据库模块:
module.exports = dataConnection

6.接口中使用默认库和动态库:

创建完上面默认库和动态库后,你可能还是不太会,那么下面我将我项目中某个接口代码粘到下面供大家参考,下面是我历史查询数据业务的接口代码(因为之前按年分库,如果时间段跨年此时就会有查不到的数据,我在这里做出提示,时间段不能跨年,不能跨年指同一时间段不能有两个年份出现,单个年份是可以查到数据的!)

const express = require('express')
const router = express.Router()
// 引入默认数据库,用来查基础数据
const connection = require('../config/mysqldbconfig')
// 引入存数据的动态数据库配置:
const dataConnection = require('../config/mysqldbdataconfig')
//引入token工具
const verifyToken = require('../commethods/creattoken')

// 根据时间进行查询历史数据的接口:
router.post('/api/getdailycount',(request,response)=>
  let equipId,selectday,type = request.body
  const authorization = request.headers
  let isOk = verifyToken(authorization)
  isOk.then(() => 
    // 当selectday为空字符或空数组时返回cod:201,msg:'超出查询范围,数据库不存在!'
    if (selectday == '' || selectday == []) 
      response.send(cod:201,msg:'超出查询范围,数据库不存在!')
      return
    
    // 通道名称容器:
    let arrnam = []
    // 仪表id取模表的数量得到要查询的表后缀索引(我项目中表data_表有1000个)
    let tableIndex = equipId % 1000
    // 查询要显示通道数量和通道名称:
    function queryCount() 
      let sql = 'SELECT qp.equipment_d, cn.* FROM `equipment` qp INNER JOIN `channe_name` cn ON cn.pid = "'+equipId+'" WHERE qp.id = "'+equipId+'" LIMIT 1'
      return new Promise((resolve)=>
        // 从默认库中查基础数据:
        connection.query(sql,(error,result) => 
          try 
            if (error) 
                throw error
             else 
              if (result.length !== 0) 
                let counts = result[0].equipment_d
                for (let i = 1; i <= counts -2; i++) 
                  //  遍历对象拿到通道名称:
                  for (let key in result[0])
                    if(key === ('named'+i))
                      // 将通道名称和绑定的D值存到对象中
                      arrnam.push(nam:result[0][key],dat:'accum'+i)
                    
                  
                
                resolve(result)
               else 
                response.send(cod:201,msg:'查询历史仪表名称数据失败')
              
            
           catch(err)
            console.log('查询设备通道名称和显示通道数错误:'+err)
          
        )
      )
    
    // 动态库中查统计值
    async function selectdata()
      let countAndName = await queryCount()
      // 生成要查询的通道值字段:
      let counts = parseInt(countAndName[0].equipment_d) - 2
      let nameArray = []
      for (let i = 1; i <= counts; i++) 
        nameArray.push('accum'+i)
      
      let dStr = nameArray.toString()
      // 判断查询时间是否为当前年份,如果是的话,直接查当前时间默认数据库,否则以输入时间为根据查对应的时间数据库
      let startYear = selectday.slice(0,4)
      let nowYear = new Date().getFullYear()
      // 要查询数据的sql语句
      let sql = ''
      // 判断查询类型切换到对应的sql:
      if (type === 'daily') 
        if (startYear == nowYear) 
          // 判断查询时间是否为当前年份,如果不是则从对应的数据库中查询数据
          sql = 'SELECT '+dStr+', subtime FROM cumulative_'+tableIndex+' WHERE pid = '+equipId+' AND DATE_FORMAT(subtime,"\\%Y-\\%m-\\%d") ="'+selectday+'" ORDER BY id DESC'
         else 
          sql = 'SELECT '+dStr+', subtime FROM platformdatadb'+startYear+'.cumulative_'+tableIndex+' WHERE pid ='+equipId+' AND DATE_FORMAT(`subtime`,"\\%Y-\\%m-\\%d") ="'+selectday+'" ORDER BY id DESC'
        
      
      if (type === 'monthly') 
        if (startYear == nowYear) 
          // 判断查询时间是否为当前年份,如果不是则从对应的数据库中查询数据
          sql = 'SELECT '+dStr+', subtime FROM cumulative_'+tableIndex+' WHERE pid = '+equipId+' AND DATE_FORMAT(subtime,"\\%Y-\\%m") ="'+selectday+'" ORDER BY id DESC'
         else 
          sql = 'SELECT '+dStr+',subtime FROM platformdatadb'+startYear+'.cumulative_'+tableIndex+' WHERE pid ='+equipId+' AND DATE_FORMAT(`subtime`,"\\%Y-\\%m") ="'+selectday+'" ORDER BY id DESC'
        
      
      以上是关于node.js+mysql实现分库分表存查数据:的主要内容,如果未能解决你的问题,请参考以下文章

如何实现mysql的分库分表

MySQL分库分表之MyCat实现

浅谈mysql数据库分库分表那些事-亿级数据存储方案

MySQL分库分表之MyCat实现

oracle分库分表

MySQL 高可用:mysql+mycat实现数据库分片(分库分表)