如何实现大数据量数据库的历史数据归档

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现大数据量数据库的历史数据归档相关的知识,希望对你有一定的参考价值。

使用工具pt-archiver
原理解析
作为mysql DBA,可以说应该没有不知道pt-archiver了,作为pt-toolkit套件中的重要成员,往往能够轻松帮助DBA解决数据归档的问题。例如线上一个流水表,业务仅仅只需要存放最近3个月的流水数据,三个月前的数据做归档即可,那么pt-archiver就可以轻松帮你完成这件事情,甚至你可以配置成自动任务,无需人工干预。

作为DBA,我们应该知其然更应该知其所以然,这样我们也能够放心地使用pt工具。相信很多DBA都研究过pt-online-schema-change的原理,那么今天我们深入刨一刨pt-archiver的工作原理。
一、原理观察
土人有土办法,我们直接开启general log来观察pt-archiver是如何完成归档的。
命令
pt-archiver --source h=127.0.0.1,u=xucl,p=xuclxucl,P=3306,D=xucl,t=t1 --dest h=127.0.0.1,P=3306,u=xucl,p=xuclxucl,D=xucl_archive,t=t1 --progress 5000 \
--statistics --charset=utf8mb4 --limit=10000 --txn-size 1000 --sleep 30
常用选项
--analyze
指定工具完成数据归档后对表执行'ANALYZE TABLE'操作。指定方法如'--analyze=ds',s代表源端表,d代表目标端表,也可以单独指定。
--ask-pass
命令行提示密码输入,保护密码安全,前提需安装模块perl-TermReadKey。
--buffer
指定缓冲区数据刷新到选项'--file'指定的文件并且在提交时刷新。
只有当事务提交时禁用自动刷新到'--file'指定的文件和刷新文件到磁盘,这意味着文件是被操作系统块进行刷新,因此在事务进行提交之前有一些数据隐式刷新到磁盘。默认是每一行操作后进行文件刷新到磁盘。
--bulk-delete
指定单个语句删除chunk的方式来批量删除行,会隐式执行选项'--commit-each'。
使用单个DELETE语句删除每个chunk对应的表行,通常的做法是通过主键进行逐行的删除,批量删除在速度上会有很大的提升,但如果有复杂的'WHERE'条件就可能会更慢。
--[no]bulk-delete-limit
默认值:yes
指定添加选项'--bulk-delete'和'--limit'到进行归档的语句中。
--bulk-insert
使用LOAD DATA LOCAL INFILE的方法,通过批量插入chunk的方式来插入行(隐式指定选项'--bulk-delete'和'--commit-each')
而不是通过逐行单独插入的方式进行,它比单行执行INSERT语句插入的速度要快。通过隐式创建临时表来存储需要批量插入的行(chunk),而不是直接进行批量插入操作,当临时表中完成每个chunk之后再进行统一数据加载。为了保证数据的安全性,该选项会强制使用选项'--bulk-delete',这样能够有效保证删除是在插入完全成功之后进行的。
--channel
指定当主从复制环境是多源复制时需要进行归档哪个主库的数据,适用于多源复制中多个主库对应一个从库的情形。
--charset,-A
指定连接字符集。
--[no]check-charset
默认值:yes
指定检查确保数据库连接时字符集和表字符集相同。
--[no]check-columns
默认值:yes
指定检查确保选项'--source'指定的源端表和'--dest'指定的目标表具有相同的字段。
不检查字段在表的排序和字段类型,只检查字段是否在源端表和目标表当中都存在,如果有不相同的字段差异,则工具报错退出。如果需要禁用该检查,则指定'--no-check-columns'。
--check-slave-lag
指定主从复制延迟大于选项'--max-lag'指定的值之后暂停归档操作。默认情况下,工具会检查所有的从库,但该选项只作用于指定的从库(通过DSN连接方式)。
--check-interval
默认值:1s
如果同时指定了选项'--check-slave-lag',则该选项指定的时间为工具发现主从复制延迟时暂停的时间。每进行操作100行时进行一次检查。
--columns,-c
指定需要归档的表字段,如有多个则用','(逗号)隔开。
--commit-each
指定按每次获取和归档的行数进行提交,该选项会禁用选项'--txn-size'。
在每次获取表数据并进行归档之后,在获取下一次数据和选项'--sleep'指定的休眠时间之前,进行事务提交和刷新选项'--file'指定的文件,通过选项'--limit'控制事务的大小。
--host,-h
指定连接的数据库IP地址。
--port,-P
指定连接的数据库Port端口。
--user,-u
指定连接的数据库用户。
--password,-p
指定连接的数据库用户密码。
--socket,-S
指定使用SOCKET文件连接。
--databases,-d
指定连接的数据库
--source
指定需要进行归档操作的表,该选项是必须指定的选项,使用DSN方式表示。
--dest
指定要归档到的目标端表,使用DSN方式表示。
如果该选项没有指定的话,则默认与选项'--source'指定源端表为相同表。

--where
指定通过WHERE条件语句指定需要归档的数据,该选项是必须指定的选项。不需要加上'WHERE'关键字,如果确实不需要WHERE条件进行限制,则指定'--where 1=1'。
--file
指定表数据需要归档到的文件。使用类似MySQL DATE_FORMAT()格式化命名方式。
文件内容与MySQL中SELECT INTO OUTFILE语句使用相同的格式,文件命名选项如下所示:
%Y:年,4位数(Year, numeric, four digits)
%m:月,2位数(Month, numeric (01..12))
%d:日,2位数(Day of the month, numeric (01..31))
%H:小时(Hour (00..23))
%i:分钟(Minutes, numeric (00..59))
%s:秒(Seconds (00..59))
%D:数据库名(Database name)
%t:表名(Table name)
二、原理解析
根据general log的输出,我们整理出时序表格如下
三、其他说明
咋一看这个过程貌似也没有什么问题,但是,假如在原表扫描出数据,插入到新表的过程中,旧数据发生了变化怎么办?
带着这个疑问,我们进行了源码的跟踪,我们在pt-archiver的6839行打上了断点
然后我分别在几个session窗口做了如下动作
最后pt-archiver输出如下:
# A software update is available:
TIME ELAPSED COUNT
2020-04-08T09:13:21 0 0
2020-04-08T09:13:21 0 1
Started at 2020-04-08T09:13:21, ended at 2020-04-08T09:13:51
Source: A=utf8mb4,D=xucl,P=3306,h=127.0.0.1,p=...,t=t1,u=xucl
Dest: A=utf8mb4,D=xucl_archive,P=3306,h=127.0.0.1,p=...,t=t1,u=xucl
SELECT 1
INSERT 1
DELETE 1
Action Count Time Pct
sleep 1 30.0002 99.89
inserting 1 0.0213 0.07
commit 2 0.0080 0.03
select 2 0.0017 0.01
deleting 1 0.0005 0.00
other 0 0.0008 0.00
很明显,id=3这条记录并没有进行归档(我们这里是改了条件列,实际生产中可能是更改了其他列,造成归档数据不准确)
那么如何来解决这种情况的发生呢?
显然,数据库在数据库中可以通过加排它锁来防止其他程序修改对应的数据,pt-archiver其实早就已经帮我们考虑到了这样的情况,pt-archiver提供了两种选择
--for-update:Adds the FOR UPDATE modifier to SELECT statements
--share-lock:Adds the LOCK IN SHARE MODE modifier to SELECT statements
四、总结
pt-archiver作为归档工具无疑是MySQL DBA日常运维的大利器之一,在使用过程中在知道如何使用的基础上也能够知晓其原理
归档过程中最好能对归档记录进行加锁操作,以免造成归档数据不准确
在主从环境中,归档过程最好控制速度,以免造成主从延迟
尽量控制好chunk的大小,不要过大,造成大事务
参考技术A //打开数据库
con.Open();
//读取数据
OdbcDataReader reader = cmd.ExecuteReader();
//把数据加载到临时表
dt.Load(reader);
//在使用完毕之后,一定要关闭,要不然会出问题
reader.Close();
参考技术B 处理办法
生产库仅对活跃数据做备份,减少了直接备份8T数据库的IO等资源消耗,非活跃数据,考虑建立和生产库表相同表结构的分区表,历史表可以按照月进行分区,周期性的把非活跃数据导入到历史表,同时历史备份的备份策略调整为完整+差异模式,这样不仅提高了历史数据的查询效率,而且可以避免每有新增数据进来时,导致需要完整备份,增加了备份时间,而且不利于数据的灵活恢复。
参考技术C 历史数据归档的前提是数据进入了终态,也就是不在改变。在符合自己的业务逻辑的情况下,可以选择每天或者每周或每月(视你数据量和业务逻辑而定),对不再常用的终态数据归入历史表,以保证当前表的数据量大小。 参考技术D 程序: public class Move public static void main(String args[]) System.out.println(-1<<2); System.out.println(-1>>2); System.out.println(-1>>>2); 输出答案: -4 -1 1073741823

spark 实现大表数据合并

在做 mysql 或其他数据迁移的时候,有时候需要将两份或者多份数据进行合并,生产一份新的数据后进行使用,对于数据量较小的场景下,可以直接使用 sql 语句进行关联,但是对于两张或者多张千万级记录的表进行合并时,使用 sql 进行 join 操作是不现实的,在这些场景下,需要使用 spark 或者 hive 进行操作。本文介绍如何使用 spark 进行大数据量的合并。

本文中提到的大表,数据量一般在几百万或者千万甚至是亿级别。小表的数据量一般在 1万条记录以内。

1. 大数据关联表一般会遇到三种情形:

  1. 一张大数据表关联一张字典表(小表)
  2. 一张大数据表关联另一张大数据表,两张表存在id进行唯一对应,既 a 表中的一条记录在 b表中最多只有一条进行对应。
  3. 一张大数据表关联另一张大数据表,a 表中的一条记录在 b 表中有多条记录进行对应;或者是 a 表中的多条记录对应于 b 表中的一条记录。

2. 处理步骤:

  1. 使用 sqoop 工具将 mysql 数据导出到 hdfs

    在机器内存和处理器资源有限的情况下,使用 spark-sql 直接读取 mysql 数据库,可能会导致 mysql 支撑不住,导致在数据读取过程中会发生连接中断,从而导致读取数据失败。将数据导入到 hdfs 中,可以很容易支持亿级别数据读取。

  2. 编写 spark core 算法进行关联操作

说明:采用 spark-sql 直接进行 sql 关联查询或者两个 DataFrame 进行 join 操作和在 mysql 用 sql 进行关联查询一样,会导致消耗大量服务器内存和 cpu 资源,很容易出现 OOM 异常,从而导致任务失败。

3. 算法实现

3.1 一张大表关联一张字典表(小表)

示例:有一份行业信息表,数据量在 7000万 左右,有一份行业编码表,数据量只有 2000 条,现在需要将编码表中的字典信息合并到行业信息表中,则可以采用以下处理方式

1. 加载编码字典表,将其作为广播变量
2. 循环企业信息表,从广播变量中取出映射信息
 // 示例代码

// 加载字典表
val industryCodeMap = sc.textFile(industryCodePath)
    .map(x => {
    val params = x.split(","-1)
    val industryCode = new IndustryCode
    industryCode.industry_code = params(1)
    industryCode.industry_name = params(2)
    industryCode.industry_standard = params(4)

    (industryCode.industry_code, industryCode)
    })
    .collect()
    .toMap

// 将字典表信息设置为广播变量
val bcIndustryCodeMap = sc.broadcast(industryCodeMap)

// 加载行业信息,循环读取映射值
val lastIndustryRdd = sc.textFile(lastIndustryPath)
    .map(item => {
    val params = item.split("\\0001"-1)

    val lastIndustry = new LastIndustry
    lastIndustry.eid = params(2)
    lastIndustry.industry_code = params(3)

    val industryCodeMap = bcIndustryCodeMap.value
    val industryCode = industryCodeMap.get(lastIndustry.industry_code)
    if (!industryCode.isEmpty) {
        lastIndustry.industry_name = industryCode.get.industry_name
        .industry_standard = industryCode.get.industry_standard
    }

    lastIndustry
    })

3.2 一对一的两张大表

示例:一张企业信息表,约有 6600万记录,一张行业信息表,约有 7000万记录,行业信息表中每一条记录都含义企业 id 字典,现在需要将企业所在的行业信息合并到企业信息表中,可以采用以下方式:

1. 将企业信息映射为 (id,(企业信息,空的行业信息))
2. 将行业信息表映射为 (id,(空的企业信息, 行业信息))
3. 将两份数据进行合并
4. 根据 id 进行 reduce 计算,生产 (id,企业信息,行业信息)
5. 组合需要的结果信息返回

这样可以使用 reduce 计算替换 join 计算,极大减少数据 shuffle,从而加快计算速度。

// 示例代码

// 映射行业信息
val lastIndustryRdd = sc.textFile(lastIndustryPath)
    .map(item => {
    val params = item.split("\\0001"-1)

    val lastIndustry = new LastIndustry
    lastIndustry.eid = params(2)
    lastIndustry.industry_code = params(3)

    val industryCodeMap = bcIndustryCodeMap.value
    val industryCode = industryCodeMap.get(lastIndustry.industry_code)
    if (!industryCode.isEmpty) {
        lastIndustry.industry_name = industryCode.get.industry_name
        lastIndustry.industry_standard = industryCode.get.industry_standard
    }

    (lastIndustry.eid, (new Enterprise, lastIndustry))
    })


// 映射企业信息
val enterpriseRdd = sc.textFile(enterprisePath)
    .map(item => {
    val params = item.split("\\0001"-1)
    if (params.length < 60) {
        ("", (new Enterprisenew LastIndustry))
    } else {
        val enterprise = Enterprise.convertToEnterprise(params)
        (enterprise.eid, (enterprise, new LastIndustry))
    }
    })
    .filter(x => x._1 != "")


// 两份数据进行合并,按照 key 进行 reduce 计算
enterpriseRdd.union(lastIndustryRdd)
    .reduceByKey((a, b) => {
    var enterprise = a._1
    var lastIndustry = a._2

    if (a._1.eid == "" && b._1.eid != "") {
        enterprise = b._1
    }

    if (a._2.eid == "" && b._2.eid != "") {
        lastIndustry = b._2
    }

    enterprise.industry_code = lastIndustry.industry_code
    enterprise.industry_name = lastIndustry.industry_name
    enterprise.industry_standard = lastIndustry.industry_standard

    (enterprise, lastIndustry)
    })
    .map(x => x._2._1)
    .repartition(repartitionCount)
    .saveAsTextFile(savePath)

3.3 一对多的两张大表

示例:一张企业投资表,约有 500万记录,一张企业信息表,约有 6600万记录,一家企业会进行多项投资,既一条企业信息会对应多条企业投资信息,现在需要将企业信息关联到企业投资表中,可以采用以下方式:

1. 将企业信息表映射为 (id,(企业信息,空的企业投资信息,标识位1))
2. 将企业投资表映射为 (id,(空的企业信息,企业投资信息,标识位2))
3. 合并两份数据
4. 按照 id 进行分组
5. 对分组中的数据进行计算,查询分组中标识位为 2 的企业信息和标识位为 1 的企业投资信息,
将相关信息进行关联,也可以使用 reduceByKey 计算替代 groupByKey,加快分析
// 示例代码

//映射企业信息表
val enterpriseDistributionRdd = sc.textFile(enterpriseDistributionPath)
    .map(x => {
    val params = x.split("@"-1)
    val enterprise = Enterprise.convertToEnterprise(params)
    (enterprise.eid, (enterprise, new Investment1))
    })

// 映射企业投资信息
val investmentRdd = sc.textFile(investmentPath)
    .map(x => {
    val params = x.split("\\0001"-1)
    val investment = Investment.convertToInvestment(params)
    (investment.eid, (new Enterprise, investment, 2))
    })
    .filter(_._1 != "")

// 关联两份信息并进行分组计算
investmentRdd.union(enterpriseDistributionRdd)
    .groupByKey()
    .flatMap(item => {
    val list = ListBuffer[Investment]()
    val result = ListBuffer[Investment]()
    var enterprise: Enterprise = new Enterprise

    for (x <- item._2) {
        if (x._3 == 1) {
        enterprise = x._1
        } else if (x._3 == 2) {
        list.append(x._2)
        }
    }

    for (investment <- list) {
        investment.industry_code = enterprise.industry_code
        investment.industry_name = enterprise.industry_name
        investment.regist_capi_new = enterprise.regist_capi_new
        investment.start_date = enterprise.start_date
        investment.new_status_code = enterprise.new_status_code

        result.append(investment)
    }

    result
    })
    .repartition(repartitionCount)
    .saveAsTextFile(savePath)


以上是关于如何实现大数据量数据库的历史数据归档的主要内容,如果未能解决你的问题,请参考以下文章

MYSQL数据量过亿了,如何提高查询效率

手动实现MySQL归档

Mysql数据归档如何实现利用Java

并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库

查看oracle每天及每小时产生归档日志的数据量

C#大数据量问题