如何在并行火花中运行转换
Posted
技术标签:
【中文标题】如何在并行火花中运行转换【英文标题】:How to run transformation in parallel spark 【发布时间】:2021-04-30 15:22:48 【问题描述】:我正在尝试读取 text.gz 文件,对其进行重新分区并进行一些转换,但是当我看到 DAG 时,stag1 正在读取数据并仅对 1 个任务进行转换,因此需要时间。
大部分时间都花在第 1 阶段。
我不明白为什么只对一项任务进行处理,我如何在读取数据后拆分它。 stag1 的代码
df1 =spark.read.text("text-04-14.log.gz")
df1 = df1.repartition(20)
text_to_json_udf = udf(text_to_json, ArrayType(StringType()))
df1 = df1.select(text_to_json_udf(df1.value).alias("arr_cols"))
df1 = df1.dropna()
cols = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k7', 'k8', 'k9']
df1 = df1.select([df1.arr_cols[i].alias(k) for i, k in enumerate(cols)])
【问题讨论】:
【参考方案1】:.gz 文件不可拆分,因此 Spark 在 1 个内核上读取整个文件 - 速度很慢。在该阶段之后,可以应用重新分区来增加处理的吞吐量。
【讨论】:
没错。根据用例,可能还值得以另一种格式重新压缩,例如bzip2、lz4、lzo、snappy 等 - 或者只是保持未压缩。当然取决于数据的大小和解压时间 @UninformedUser 当然,这也是一个选项 @thebluephantom 我知道 .gz 文件不可拆分,因此我在读取后重新分区数据。但之后的 udf 函数也在单核上运行。我不确定为什么会这样。我应该怎么做才能并行运行它。 那么你需要更好地提出这个问题。【参考方案2】:因为 .gz 是 gzip 文件(GNU zip,一种开源文件压缩程序)的文件扩展名。
还有压缩文件,分割前要解压。 当您将解压缩数据缓存在内存中或将其保存在磁盘上时,您可以在您的客户分区器或默认分区器中对其进行重新分区。还有,为什么stage1只有1个任务是因为spark job stage依赖; stage1的源代码如下。也就是说,下面的代码是stage1;所以stage2应该有20个task并行;
df1 =spark.read.text("text-04-14.log.gz")
df1 = df1.repartition(20)
所以stage1是shuffle map stage,如果输入数据小于128MB,它只会读取一个任务中的数据。
【讨论】:
我没有得到你的第二点。如果您可以参考我上面的代码,我在阅读后重新分区了如何改进它以并行运行 UDF 函数? 我添加了一些额外的信息来澄清这个问题。 在 stag1 中,如果只读取数据和重新分区就可以了,但在我的情况下,udf 函数也在 1core 上运行,而不是在 20 上并行运行。以上是关于如何在并行火花中运行转换的主要内容,如果未能解决你的问题,请参考以下文章