toPandas() 会随着 pyspark 数据框变小而加快速度吗?

Posted

技术标签:

【中文标题】toPandas() 会随着 pyspark 数据框变小而加快速度吗?【英文标题】:Does toPandas() speed up as a pyspark dataframe gets smaller? 【发布时间】:2020-01-21 20:35:18 【问题描述】:

我想我会问这个问题。我找到了一种巧妙的方法来减小 PySpark 数据框的大小并将其转换为 Pandas,我只是想知道,随着 pyspark 数据框的大小变小,toPandas 函数会变得更快吗?这是一些代码:

window = Window.partitionBy(F.lit('A')).orderBy(F.lit('A'))

eps_tfs = 
while True:
    pdf = toPandas(conn.select(F.col('*')).where(F.col('row_number') <= 2500))
    n = len(pdf)
    trigger = 0
    for u in pdf['features']:
        indices = [i for i, x in enumerate(u) if x == 1.0]
        for idx in range(len(eps_columns)):
            if idx in indices:
                try:
                    eps_tfs[eps_columns[idx]].append(True)
                except:
                    eps_tfs[eps_columns[idx]] = [True]
            else:
                try:
                    eps_tfs[eps_columns[idx]].append(False)
                except:
                    eps_tfs[eps_columns[idx]] = [False]
    full_view = full_view.append(pd.concat([pdf, pd.DataFrame(eps_tfs)], axis=1))
    conn = conn.select(F.col('*')).where(F.col('row_number') > 2500)
    conn = conn.drop("row_number")
    conn = conn.select(F.col('*'), F.row_number().over(window).alias('row_number'))
    eps_tfs = 
    del pdf
    if n < 2500:
        break

另外,下面的代码真的是一种将数据帧映射到 pandas 的更快方法吗?

def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

有没有更好的方法来做到这一点?

【问题讨论】:

我会在这里问不同的问题 - 如果是,pandas 是否比 pyspark 更适合您的用例 - 为什么不在 pandas 中做所有事情,如果不是 - 为什么要转换?跨度> 【参考方案1】:

here是ToPandas的源代码,

首先,是的,如果您的 pyspark 数据框变小,toPandas 会更快,它的味道与 sdf.collect() 相似 不同之处在于 ToPandas 返回一个 pdf 并收集返回一个列表。 从源代码中可以看到pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) pdf 是从 List 中的 pd.DataFrame.from_records 生成的!

因此,如果您的 sdf 较小,则通过网络传输的数据也较小,from_record 使用驱动程序的 CPU 处理的数据也较少。

第二个代码的设计不同,sdf是分布式的,代码调用一个Mappartition,所以所有worker从数据子集生成一个Pandas数据帧,然后它调用collect,现在所有的Pandas数据帧都通过网络传输,带到了司机那里。然后代码调用 pd.concat 将所有数据帧连接在一起。

好处是:

    在转换为 Pandas DataFrame 时,所有工作人员并行处理一小部分数据,这比将所有数据传送到驱动程序并烧毁驱动程序的 CPU 以将大量数据转换为 Pandas 更好。 repartition 正在进行,意味着如果你的数据集很大,而你的分区数很少,每个分区上的数据会很大,toPandas 会在序列化器的 OOM 上失败,而且速度也很慢收集数据

缺点是:

    现在,当您收集时,您不是在收集本机 sdf 数据,而是在 pandas 数据帧中附加了更多元数据并且通常更大,这意味着对象的总大小更大 pd.concat 很慢,哈哈,但可能仍然比 from_record

所以没有普遍的结论说哪种方法更好,但要明智地选择使用哪种工具。就像在这个问题中一样,toPandas 可能比小 sdf 快,但对于大尺寸 sdf,代码 sn-p 肯定会更好。

【讨论】:

我认为“toPandas”的源代码应该是:spark.apache.org/docs/2.3.0/api/python/_modules/pyspark/sql/…【参考方案2】:

在我们的案例中,我们发现不使用toPandas() 而使用pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) 是最快的。我们无法使用arrow 选项,因为我们收到错误“使用基于文件的收集时不支持箭头”。

查看toPandas() 的源代码,它可能很慢的一个原因是它首先创建了pandas DataFrame,然后将DataFrame 中的每个Series 复制到返回的@987654328 @。如果您知道所有列都有唯一的名称,并且通过让 pandas 推断 dtype 值可以很好地转换数据类型,则无需进行任何复制或 dtype 推断。

旁注:我们在 Databricks 上转换了一个 Spark DataFrame,它有大约 200 万行和 6 列,因此您的里程可能会因转换的大小而异。

【讨论】:

【参考方案3】:

@EZY 的答案是正确的(您需要将所有行收集到驱动程序或客户端)。但是,使用 apache 箭头 integration 可以进行另一种优化。它为 numpy 和 pandas 数据类型提供了更快的库。默认情况下不启用,因此您需要通过设置 spark conf 来启用它,如下所示。

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

【讨论】:

以上是关于toPandas() 会随着 pyspark 数据框变小而加快速度吗?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark toPandas 函数正在改变列类型

如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()

使用 toPandas 时强制将 null 一致转换为 nan

为啥 toPandas() 会抛出错误,而 .show() 工作得很好?

toPandas() 在 Jupyter iPython Notebook 上工作,但提交失败 - AWS EMR