spark 实现大表数据合并
Posted 编程修仙传
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 实现大表数据合并相关的知识,希望对你有一定的参考价值。
在做 mysql 或其他数据迁移的时候,有时候需要将两份或者多份数据进行合并,生产一份新的数据后进行使用,对于数据量较小的场景下,可以直接使用 sql 语句进行关联,但是对于两张或者多张千万级记录的表进行合并时,使用 sql 进行 join 操作是不现实的,在这些场景下,需要使用 spark 或者 hive 进行操作。本文介绍如何使用 spark 进行大数据量的合并。
本文中提到的大表,数据量一般在几百万或者千万甚至是亿级别。小表的数据量一般在 1万条记录以内。
1. 大数据关联表一般会遇到三种情形:
-
一张大数据表关联一张字典表(小表) -
一张大数据表关联另一张大数据表,两张表存在id进行唯一对应,既 a 表中的一条记录在 b表中最多只有一条进行对应。 -
一张大数据表关联另一张大数据表,a 表中的一条记录在 b 表中有多条记录进行对应;或者是 a 表中的多条记录对应于 b 表中的一条记录。
2. 处理步骤:
-
使用 sqoop 工具将 mysql 数据导出到 hdfs
在机器内存和处理器资源有限的情况下,使用 spark-sql 直接读取 mysql 数据库,可能会导致 mysql 支撑不住,导致在数据读取过程中会发生连接中断,从而导致读取数据失败。将数据导入到 hdfs 中,可以很容易支持亿级别数据读取。
-
编写 spark core 算法进行关联操作
说明:采用 spark-sql 直接进行 sql 关联查询或者两个 DataFrame 进行 join 操作和在 mysql 用 sql 进行关联查询一样,会导致消耗大量服务器内存和 cpu 资源,很容易出现 OOM 异常,从而导致任务失败。
3. 算法实现
3.1 一张大表关联一张字典表(小表)
示例:有一份行业信息表,数据量在 7000万 左右,有一份行业编码表,数据量只有 2000 条,现在需要将编码表中的字典信息合并到行业信息表中,则可以采用以下处理方式
1. 加载编码字典表,将其作为广播变量
2. 循环企业信息表,从广播变量中取出映射信息
// 示例代码
// 加载字典表
val industryCodeMap = sc.textFile(industryCodePath)
.map(x => {
val params = x.split(",", -1)
val industryCode = new IndustryCode
industryCode.industry_code = params(1)
industryCode.industry_name = params(2)
industryCode.industry_standard = params(4)
(industryCode.industry_code, industryCode)
})
.collect()
.toMap
// 将字典表信息设置为广播变量
val bcIndustryCodeMap = sc.broadcast(industryCodeMap)
// 加载行业信息,循环读取映射值
val lastIndustryRdd = sc.textFile(lastIndustryPath)
.map(item => {
val params = item.split("\\0001", -1)
val lastIndustry = new LastIndustry
lastIndustry.eid = params(2)
lastIndustry.industry_code = params(3)
val industryCodeMap = bcIndustryCodeMap.value
val industryCode = industryCodeMap.get(lastIndustry.industry_code)
if (!industryCode.isEmpty) {
lastIndustry.industry_name = industryCode.get.industry_name
.industry_standard = industryCode.get.industry_standard
}
lastIndustry
})
3.2 一对一的两张大表
示例:一张企业信息表,约有 6600万记录,一张行业信息表,约有 7000万记录,行业信息表中每一条记录都含义企业 id 字典,现在需要将企业所在的行业信息合并到企业信息表中,可以采用以下方式:
1. 将企业信息映射为 (id,(企业信息,空的行业信息))
2. 将行业信息表映射为 (id,(空的企业信息, 行业信息))
3. 将两份数据进行合并
4. 根据 id 进行 reduce 计算,生产 (id,企业信息,行业信息)
5. 组合需要的结果信息返回
这样可以使用 reduce 计算替换 join 计算,极大减少数据 shuffle,从而加快计算速度。
// 示例代码
// 映射行业信息
val lastIndustryRdd = sc.textFile(lastIndustryPath)
.map(item => {
val params = item.split("\\0001", -1)
val lastIndustry = new LastIndustry
lastIndustry.eid = params(2)
lastIndustry.industry_code = params(3)
val industryCodeMap = bcIndustryCodeMap.value
val industryCode = industryCodeMap.get(lastIndustry.industry_code)
if (!industryCode.isEmpty) {
lastIndustry.industry_name = industryCode.get.industry_name
lastIndustry.industry_standard = industryCode.get.industry_standard
}
(lastIndustry.eid, (new Enterprise, lastIndustry))
})
// 映射企业信息
val enterpriseRdd = sc.textFile(enterprisePath)
.map(item => {
val params = item.split("\\0001", -1)
if (params.length < 60) {
("", (new Enterprise, new LastIndustry))
} else {
val enterprise = Enterprise.convertToEnterprise(params)
(enterprise.eid, (enterprise, new LastIndustry))
}
})
.filter(x => x._1 != "")
// 两份数据进行合并,按照 key 进行 reduce 计算
enterpriseRdd.union(lastIndustryRdd)
.reduceByKey((a, b) => {
var enterprise = a._1
var lastIndustry = a._2
if (a._1.eid == "" && b._1.eid != "") {
enterprise = b._1
}
if (a._2.eid == "" && b._2.eid != "") {
lastIndustry = b._2
}
enterprise.industry_code = lastIndustry.industry_code
enterprise.industry_name = lastIndustry.industry_name
enterprise.industry_standard = lastIndustry.industry_standard
(enterprise, lastIndustry)
})
.map(x => x._2._1)
.repartition(repartitionCount)
.saveAsTextFile(savePath)
3.3 一对多的两张大表
示例:一张企业投资表,约有 500万记录,一张企业信息表,约有 6600万记录,一家企业会进行多项投资,既一条企业信息会对应多条企业投资信息,现在需要将企业信息关联到企业投资表中,可以采用以下方式:
1. 将企业信息表映射为 (id,(企业信息,空的企业投资信息,标识位1))
2. 将企业投资表映射为 (id,(空的企业信息,企业投资信息,标识位2))
3. 合并两份数据
4. 按照 id 进行分组
5. 对分组中的数据进行计算,查询分组中标识位为 2 的企业信息和标识位为 1 的企业投资信息,
将相关信息进行关联,也可以使用 reduceByKey 计算替代 groupByKey,加快分析
// 示例代码
//映射企业信息表
val enterpriseDistributionRdd = sc.textFile(enterpriseDistributionPath)
.map(x => {
val params = x.split("@", -1)
val enterprise = Enterprise.convertToEnterprise(params)
(enterprise.eid, (enterprise, new Investment, 1))
})
// 映射企业投资信息
val investmentRdd = sc.textFile(investmentPath)
.map(x => {
val params = x.split("\\0001", -1)
val investment = Investment.convertToInvestment(params)
(investment.eid, (new Enterprise, investment, 2))
})
.filter(_._1 != "")
// 关联两份信息并进行分组计算
investmentRdd.union(enterpriseDistributionRdd)
.groupByKey()
.flatMap(item => {
val list = ListBuffer[Investment]()
val result = ListBuffer[Investment]()
var enterprise: Enterprise = new Enterprise
for (x <- item._2) {
if (x._3 == 1) {
enterprise = x._1
} else if (x._3 == 2) {
list.append(x._2)
}
}
for (investment <- list) {
investment.industry_code = enterprise.industry_code
investment.industry_name = enterprise.industry_name
investment.regist_capi_new = enterprise.regist_capi_new
investment.start_date = enterprise.start_date
investment.new_status_code = enterprise.new_status_code
result.append(investment)
}
result
})
.repartition(repartitionCount)
.saveAsTextFile(savePath)
以上是关于spark 实现大表数据合并的主要内容,如果未能解决你的问题,请参考以下文章
Spark调优大表join大表,少数key导致数据倾斜解决方案