将 Pandas Python 转换为 Pyspark

Posted

技术标签:

【中文标题】将 Pandas Python 转换为 Pyspark【英文标题】:Converting Pandas Python to Pyspark 【发布时间】:2020-12-03 19:03:23 【问题描述】:

我有用 pandas 编写的代码,我被要求转换为 pyspark,但我对 pyspark 不是很熟悉。我想我已经掌握了大部分内容,但有几行无法转换。

第一个查找ID字段的下一个开始日期(如果存在)(对数据进行排序以使其具有顺序性)

addmaxdate['next_start'] = pd.NaT
addmaxdate.loc[addmaxdate.ID_combo.eq(addmaxdate.shift(-1).ID_combo), 'next_start'] = addmaxdate.shift(-1).startdate
addmaxdate.loc[addmaxdate.startdate.eq(addmaxdate.next_start), 'next_start'] = pd.NaT

下一个 sn-p 代码通过结束日期列减去下一个开始列来创建一个间隙列。

addmaxdate['gap'] = pd.NaT
addmaxdate.loc[addmaxdate.gap.isna(), 'gap'] = addmaxdate.loc[~addmaxdate.next_start.isna(), 'next_start'] - addmaxdate.loc[~addmaxdate.next_start.isna(), 'stopdate']

就我的研究而言,pyspark 没有 shift() 等效项,所以我不确定如何在 pyspark 中完成此操作并获得相同的结果。我不想使用 toPandas(),因为它非常耗费资源。

有人可以帮忙吗?谢谢!

【问题讨论】:

我建议查看考拉:koalas.readthedocs.io/en/latest - 在 PySpark 之上提供与 Pandas 兼容的 API 可能会有很大帮助 【参考方案1】:

df.shift(-1).column 在 pandas 中相当于

import pyspark.sql.functions as F
from pyspark.sql.window import Window

F.lag('column').over(Window.orderBy('another_column'))

您需要指定排序,因为 Spark 没有 pandas 那样的索引概念。正如你所说的

(数据按顺序排序)

你可以在上面的代码sn-p中使用排序列为'another_column'

举个例子,

addmaxdate['next_start'] = pd.NaT
addmaxdate.loc[addmaxdate.ID_combo.eq(addmaxdate.shift(-1).ID_combo), 'next_start'] = addmaxdate.shift(-1).startdate

可以翻译成

addmaxdate.withColumn('next_start',
    F.when(
        F.col('ID_combo') == F.lag(F.col('ID_combo')).over(Window.orderBy('ordering_column')), 
        F.lag(F.col('startdate')).over(Window.orderBy('ordering_column'))
    )
)

【讨论】:

嗨!这会创建我要查找的列,但如果我包含 .otherwise( F.col('next_start') 我会收到无法解析列 'next_start" given input" 错误。该部分有什么作用? @Jw007 哦,对不起,这是一个错误,它不应该在那里。我会编辑我的帖子

以上是关于将 Pandas Python 转换为 Pyspark的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 python 和 pandas 将 Csv 文件转换为 libsvm?

如何使用 PANDAS / Python 将矩阵转换为列数组

将 Pandas Python 转换为 Pyspark

python 将Numpy数组转换为Pandas Dataframe

如何将字节数据转换为 python pandas 数据框?

通过 Python 中的 pandas 将每日库存数据转换为每周