Pyspark:如何在 HDFS 中并行化多 gz 文件处理
Posted
技术标签:
【中文标题】Pyspark:如何在 HDFS 中并行化多 gz 文件处理【英文标题】:Pyspark: How to parallelize multi gz file processing in HDFS 【发布时间】:2019-08-03 07:27:37 【问题描述】:我有许多 gz
文件存储在 20 个节点 HDFS
集群中,需要按列聚合。 gz
文件非常大(每个 1GByte,总共 200 个文件)。
数据格式为key-value,2列值:['key','value1','value2']
,需要按key分组,按列聚合:sum(value1)
,count(value2)
。
数据已经按key排序,每个gz文件都有专属的key值。
例如:
File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4
File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10
File 3:
k6,...
...
...
File 200:
k200,v200,u200
k201,v201,u201
我首先解析日期并将数据转换为(key, list of (values))
结构。解析器输出将是这样的:
parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])
然后使用reduceByKey
函数按键值分组,比groupByKey
函数效率更高。
reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])
然后使用处理函数聚合列:
process
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))
这是流程的示例代码
import pyspark
from pyspark import SparkFiles
def parser(line):
try:
key,val=line.split('\t)
return (key,[val1,val2])
except:
return None
def process(line):
key,gr= line[0],line[1]
vals=zip(*gr)
val1=sum(vals[0])
val2=len(vals[1])
return ('\t'.join([key,val1,val2]))
sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')
我认为这段代码没有充分利用 spark 集群。我喜欢优化代码以充分利用处理考虑。
1.在 HDFS 和 Pyspark 中处理 gz 文件的最佳方式是什么? -- 如何将 gz 文件处理完全分布到整个集群?
2。如何充分利用每个节点的所有CPU?进行聚合和解析过程
【问题讨论】:
将文件加载到spark fisrt的dataframe中。 【参考方案1】:您至少应该考虑以下几点:
-
如果您使用的是 YARN,则为您分配给 Spark 应用程序的执行程序数量和每个执行程序的核心数。它们可以由--num-executors 和--executor-cores 控制。如果您不使用 YARN,您的调度程序可能会有类似的机制来控制并行性,请尝试寻找它。
DataFrame 中的分区数量,直接影响工作的并行度。您可以使用repartition 和/或coalesce 进行控制。
两者都可以限制作业使用的核心,从而限制集群的使用。此外,请考虑使用更多的 CPU 并不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,这通常归结为试验不同的配置,看看哪一种性能更好。
【讨论】:
以上是关于Pyspark:如何在 HDFS 中并行化多 gz 文件处理的主要内容,如果未能解决你的问题,请参考以下文章
如何在pyspark中更改DataFrame的hdfs块大小