如何有效地将 PySpark 数据框中的行相乘?

Posted

技术标签:

【中文标题】如何有效地将 PySpark 数据框中的行相乘?【英文标题】:How to efficiently multiply rows in PySpark dataframe? 【发布时间】:2021-08-05 23:33:03 【问题描述】:

我正在尝试通过获取现有的小数据集并将其放大得多来制作合成数据集。我希望目标大小为 20M 行。 我目前的方法:

for i in range(int(log(130000, 2))): 
    table_copy = table_copy.unionAll(table_copy)

但这在第 12 次迭代(共 17 次)之后会减慢很多。有没有更快的方法将由 150 行组成的数据帧变成 20M?

【问题讨论】:

【参考方案1】:

这个效果最好: (5 秒 = 20M 行)

df = spark.range(150)
factor = 135000
df = df.withColumn('a', F.expr(f'explode(array_repeat(0,factor))')).drop('a')

this smart guy提出的想法

在你的情况下,它可能只是

table_copy = table_copy.withColumn('a', F.expr('explode(array_repeat(0,135000))')).drop('a')

其他经过测试的选项

(16 秒 = 150 万行)

import pyspark.sql.functions as F
df = spark.range(150)
df = df.withColumn('array', F.explode(F.array(*map(F.lit, range(1000)))))
df = df.drop('array')

(11 秒 = 38k 行):

def union_self(df, p):
    if p:
        df = union_self(df, p - 1)
        return df.union(df)
    return df

df = spark.range(150)
df = union_self(df, 8)

(16 秒 = 38k 行):

from functools import reduce
df = spark.range(150)
df = reduce(lambda df1, df2: df1.union(df2), [df] * 256)

【讨论】:

嘿,谢谢!你能帮我理解为什么我原来的方法比这个方法要花这么长时间吗? 显然,拆分比许多联合更有效。我不知道原因,但您可以看到 Spark 为两个版本创建了完全不同的物理计划。您可以在两个版本之后使用table_copy.explain() 来查看它们。【参考方案2】:

如果我理解正确的话。您希望扩展或放大具有相同数据的多个数据集:

val replicas = 5 // calcu yourself and i've never try 20M
val dsReplicated = ds.flatMap(a => 0 until replicas map ((a, _))).map(_._1)

或者对于数据框:

val dfReplicated = df
      .withColumn("__temporarily__", functions.typedLit((0 until replicas).toArray))
      .withColumn("idx", functions.explode($"__temporarily__"))
      .drop($"__temporarily__")
      .drop($"idx")

【讨论】:

以上是关于如何有效地将 PySpark 数据框中的行相乘?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark如何将一列与数据框中另一列的结果相乘?

PySpark - 如何根据列中的两个值从数据框中过滤出连续的行块

动态填充pyspark数据框中列中的行

如何使用 pyspark 2.1.0 选择另一个数据框中不存在的行?

Pyspark 基于另一个类似的数据框添加或删除数据框中的行

过滤 pyspark 数据框中的行并创建一个包含结果的新列