toPandas() 会随着 pyspark 数据框变小而加快速度吗?
Posted
技术标签:
【中文标题】toPandas() 会随着 pyspark 数据框变小而加快速度吗?【英文标题】:Does toPandas() speed up as a pyspark dataframe gets smaller? 【发布时间】:2020-01-21 20:35:18 【问题描述】:我想我会问这个问题。我找到了一种巧妙的方法来减小 PySpark 数据框的大小并将其转换为 Pandas,我只是想知道,随着 pyspark 数据框的大小变小,toPandas 函数会变得更快吗?这是一些代码:
window = Window.partitionBy(F.lit('A')).orderBy(F.lit('A'))
eps_tfs =
while True:
pdf = toPandas(conn.select(F.col('*')).where(F.col('row_number') <= 2500))
n = len(pdf)
trigger = 0
for u in pdf['features']:
indices = [i for i, x in enumerate(u) if x == 1.0]
for idx in range(len(eps_columns)):
if idx in indices:
try:
eps_tfs[eps_columns[idx]].append(True)
except:
eps_tfs[eps_columns[idx]] = [True]
else:
try:
eps_tfs[eps_columns[idx]].append(False)
except:
eps_tfs[eps_columns[idx]] = [False]
full_view = full_view.append(pd.concat([pdf, pd.DataFrame(eps_tfs)], axis=1))
conn = conn.select(F.col('*')).where(F.col('row_number') > 2500)
conn = conn.drop("row_number")
conn = conn.select(F.col('*'), F.row_number().over(window).alias('row_number'))
eps_tfs =
del pdf
if n < 2500:
break
另外,下面的代码真的是一种将数据帧映射到 pandas 的更快方法吗?
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df: pyspark.sql.DataFrame
:param n_partitions: int or None
:return: pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
有没有更好的方法来做到这一点?
【问题讨论】:
我会在这里问不同的问题 - 如果是,pandas
是否比 pyspark
更适合您的用例 - 为什么不在 pandas
中做所有事情,如果不是 - 为什么要转换?跨度>
【参考方案1】:
here是ToPandas的源代码,
首先,是的,如果您的 pyspark 数据框变小,toPandas 会更快,它的味道与 sdf.collect() 相似
不同之处在于 ToPandas 返回一个 pdf 并收集返回一个列表。
从源代码中可以看到pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
pdf 是从 List 中的 pd.DataFrame.from_records 生成的!
因此,如果您的 sdf 较小,则通过网络传输的数据也较小,from_record
使用驱动程序的 CPU 处理的数据也较少。
第二个代码的设计不同,sdf是分布式的,代码调用一个Mappartition,所以所有worker从数据子集生成一个Pandas数据帧,然后它调用collect,现在所有的Pandas数据帧都通过网络传输,带到了司机那里。然后代码调用 pd.concat 将所有数据帧连接在一起。
好处是:
-
在转换为 Pandas DataFrame 时,所有工作人员并行处理一小部分数据,这比将所有数据传送到驱动程序并烧毁驱动程序的 CPU 以将大量数据转换为 Pandas 更好。
repartition 正在进行,意味着如果你的数据集很大,而你的分区数很少,每个分区上的数据会很大,toPandas 会在序列化器的 OOM 上失败,而且速度也很慢收集数据
缺点是:
-
现在,当您收集时,您不是在收集本机 sdf 数据,而是在 pandas 数据帧中附加了更多元数据并且通常更大,这意味着对象的总大小更大
pd.concat
很慢,哈哈,但可能仍然比 from_record
好
所以没有普遍的结论说哪种方法更好,但要明智地选择使用哪种工具。就像在这个问题中一样,toPandas 可能比小 sdf 快,但对于大尺寸 sdf,代码 sn-p 肯定会更好。
【讨论】:
我认为“toPandas”的源代码应该是:spark.apache.org/docs/2.3.0/api/python/_modules/pyspark/sql/…【参考方案2】:在我们的案例中,我们发现不使用toPandas()
而使用pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
是最快的。我们无法使用arrow
选项,因为我们收到错误“使用基于文件的收集时不支持箭头”。
查看toPandas()
的源代码,它可能很慢的一个原因是它首先创建了pandas DataFrame
,然后将DataFrame
中的每个Series
复制到返回的@987654328 @。如果您知道所有列都有唯一的名称,并且通过让 pandas 推断 dtype
值可以很好地转换数据类型,则无需进行任何复制或 dtype 推断。
旁注:我们在 Databricks 上转换了一个 Spark DataFrame,它有大约 200 万行和 6 列,因此您的里程可能会因转换的大小而异。
【讨论】:
【参考方案3】:@EZY 的答案是正确的(您需要将所有行收集到驱动程序或客户端)。但是,使用 apache 箭头 integration 可以进行另一种优化。它为 numpy 和 pandas 数据类型提供了更快的库。默认情况下不启用,因此您需要通过设置 spark conf 来启用它,如下所示。
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
【讨论】:
以上是关于toPandas() 会随着 pyspark 数据框变小而加快速度吗?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?
pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()
使用 toPandas 时强制将 null 一致转换为 nan