如何在并行火花中运行转换

Posted

技术标签:

【中文标题】如何在并行火花中运行转换【英文标题】:How to run transformation in parallel spark 【发布时间】:2021-04-30 15:22:48 【问题描述】:

我正在尝试读取 text.gz 文件,对其进行重新分区并进行一些转换,但是当我看到 DAG 时,stag1 正在读取数据并仅对 1 个任务进行转换,因此需要时间。

大部分时间都花在第 1 阶段。

我不明白为什么只对一项任务进行处理,我如何在读取数据后拆分它。 stag1 的代码

df1 =spark.read.text("text-04-14.log.gz")
df1 = df1.repartition(20)
text_to_json_udf = udf(text_to_json, ArrayType(StringType()))
df1 = df1.select(text_to_json_udf(df1.value).alias("arr_cols"))
df1 = df1.dropna()
cols = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k7', 'k8', 'k9']
df1 = df1.select([df1.arr_cols[i].alias(k) for i, k in enumerate(cols)])

【问题讨论】:

【参考方案1】:

.gz 文件不可拆分,因此 Spark 在 1 个内核上读取整个文件 - 速度很慢。在该阶段之后,可以应用重新分区来增加处理的吞吐量。

【讨论】:

没错。根据用例,可能还值得以另一种格式重新压缩,例如bzip2、lz4、lzo、snappy 等 - 或者只是保持未压缩。当然取决于数据的大小和解压时间 @UninformedUser 当然,这也是一个选项 @thebluephantom 我知道 .gz 文件不可拆分,因此我在读取后重新分区数据。但之后的 udf 函数也在单核上运行。我不确定为什么会这样。我应该怎么做才能并行运行它。 那么你需要更好地提出这个问题。【参考方案2】:

因为 .gz 是 gzip 文件(GNU zip,一种开源文件压缩程序)的文件扩展名。

还有压缩文件,分割前要解压。 当您将解压缩数据缓存在内存中或将其保存在磁盘上时,您可以在您的客户分区器或默认分区器中对其进行重新分区。

还有,为什么stage1只有1个任务是因为spark job stage依赖; stage1的源代码如下。也就是说,下面的代码是stage1;所以stage2应该有20个task并行;

df1 =spark.read.text("text-04-14.log.gz")
df1 = df1.repartition(20)

所以stage1是shuffle map stage,如果输入数据小于128MB,它只会读取一个任务中的数据。

【讨论】:

我没有得到你的第二点。如果您可以参考我上面的代码,我在阅读后重新分区了如何改进它以并行运行 UDF 函数? 我添加了一些额外的信息来澄清这个问题。 在 stag1 中,如果只读取数据和重新分区就可以了,但在我的情况下,udf 函数也在 1core 上运行,而不是在 20 上并行运行。

以上是关于如何在并行火花中运行转换的主要内容,如果未能解决你的问题,请参考以下文章

如何将火花数据帧数组转换为元组

如何在火花中将rdd对象转换为数据框

如何在火花中将数据帧转换为csv [重复]

如何在火花中将列转换为数组[长]

如何将python for循环从顺序转换为并行运行

如何在火花上将json字符串转换为数据帧