Pyspark:如何在 HDFS 中并行化多 gz 文件处理

Posted

技术标签:

【中文标题】Pyspark:如何在 HDFS 中并行化多 gz 文件处理【英文标题】:Pyspark: How to parallelize multi gz file processing in HDFS 【发布时间】:2019-08-03 07:27:37 【问题描述】:

我有许多 gz 文件存储在 20 个节点 HDFS 集群中,需要按列聚合。 gz 文件非常大(每个 1GByte,总共 200 个文件)。 数据格式为key-value,2列值:['key','value1','value2'],需要按key分组,按列聚合:sum(value1),count(value2)

数据已经按key排序,每个gz文件都有专属的key值。

例如:

File 1:
k1,v1,u1
k1,v2,u1
k2,v2,u2
k3,v3,u3
k3,v4,u4

File 2:
k4,v5,u6
k4,v7,u8
k5,v9,v10

File 3:
k6,...
...
...

File 200:
k200,v200,u200
k201,v201,u201

我首先解析日期并将数据转换为(key, list of (values))结构。解析器输出将是这样的:

parser output
(k1,[v1,u1])
(k1,[v2,u1])
(k2,[v2,u2])
(k3,[v3,u3])
(k3,[v4,u4])

然后使用reduceByKey函数按键值分组,比groupByKey函数效率更高。

reducer output:
(k1,[[v1,u1],[v2,u1])
(k2,[[v2,u2]])
(k3,[[v3,u3],[v4,u4]])

然后使用处理函数聚合列:

process 
(k1, sum([v1,v2], len([u1,u3])))
(k2, sum([v2], len([u2])))
(k3, sum([v3,v4], len([u3,u4])))

这是流程的示例代码

import pyspark
from pyspark import SparkFiles

def parser(line):
    try:
        key,val=line.split('\t)
        return (key,[val1,val2])
    except:
        return None

def process(line):
    key,gr= line[0],line[1]
    vals=zip(*gr)

    val1=sum(vals[0])
    val2=len(vals[1])
    return ('\t'.join([key,val1,val2]))

sc = pyspark.SparkContext(appName="parse")
logs=sc.textFile("hdfs:///home/user1/*.gz")
proc=logs.map(parser).filter(bool).reduceByKey(lambda acc,x: acc+x).map(process)
proc.saveAsTextFile('hdfs:///home/user1/output1')

我认为这段代码没有充分利用 spark 集群。我喜欢优化代码以充分利用处理考虑。

1.在 HDFS 和 Pyspark 中处理 gz 文件的最佳方式是什么? -- 如何将 gz 文件处理完全分布到整个集群?

2。如何充分利用每个节点的所有CPU?进行聚合和解析过程

【问题讨论】:

将文件加载到spark fisrt的dataframe中。 【参考方案1】:

您至少应该考虑以下几点:

    如果您使用的是 YARN,则为您分配给 Spark 应用程序的执行程序数量和每个执行程序的核心数。它们可以由--num-executors 和--executor-cores 控制。如果您不使用 YARN,您的调度程序可能会有类似的机制来控制并行性,请尝试寻找它。 DataFrame 中的分区数量,直接影响工作的并行度。您可以使用repartition 和/或coalesce 进行控制。

两者都可以限制作业使用的核心,从而限制集群的使用。此外,请考虑使用更多的 CPU 并不一定意味着更好的性能(或执行时间)。这将取决于集群的大小和问题的大小,我不知道有什么简单的规则可以决定这一点。对我来说,这通常归结为试验不同的配置,看看哪一种性能更好。

【讨论】:

以上是关于Pyspark:如何在 HDFS 中并行化多 gz 文件处理的主要内容,如果未能解决你的问题,请参考以下文章

如何访问安装在 hdfs 头节点集群内的 pyspark

如何在pyspark中更改DataFrame的hdfs块大小

如何更改pyspark中的并行任务数

如何将存储在 HDFS 中包含行的文本文件转换为 Pyspark 中的数据框?

如何在 AWS Glue PySpark 中运行并行线程?

如何使用 Pyspark 并行处理多个镶木地板文件?