将 Pandas Python 转换为 Pyspark
Posted
技术标签:
【中文标题】将 Pandas Python 转换为 Pyspark【英文标题】:Converting Pandas Python to Pyspark 【发布时间】:2020-12-03 19:03:23 【问题描述】:我有用 pandas 编写的代码,我被要求转换为 pyspark,但我对 pyspark 不是很熟悉。我想我已经掌握了大部分内容,但有几行无法转换。
第一个查找ID字段的下一个开始日期(如果存在)(对数据进行排序以使其具有顺序性)
addmaxdate['next_start'] = pd.NaT
addmaxdate.loc[addmaxdate.ID_combo.eq(addmaxdate.shift(-1).ID_combo), 'next_start'] = addmaxdate.shift(-1).startdate
addmaxdate.loc[addmaxdate.startdate.eq(addmaxdate.next_start), 'next_start'] = pd.NaT
下一个 sn-p 代码通过结束日期列减去下一个开始列来创建一个间隙列。
addmaxdate['gap'] = pd.NaT
addmaxdate.loc[addmaxdate.gap.isna(), 'gap'] = addmaxdate.loc[~addmaxdate.next_start.isna(), 'next_start'] - addmaxdate.loc[~addmaxdate.next_start.isna(), 'stopdate']
就我的研究而言,pyspark 没有 shift() 等效项,所以我不确定如何在 pyspark 中完成此操作并获得相同的结果。我不想使用 toPandas(),因为它非常耗费资源。
有人可以帮忙吗?谢谢!
【问题讨论】:
我建议查看考拉:koalas.readthedocs.io/en/latest - 在 PySpark 之上提供与 Pandas 兼容的 API 可能会有很大帮助 【参考方案1】:df.shift(-1).column
在 pandas 中相当于
import pyspark.sql.functions as F
from pyspark.sql.window import Window
F.lag('column').over(Window.orderBy('another_column'))
您需要指定排序,因为 Spark 没有 pandas 那样的索引概念。正如你所说的
(数据按顺序排序)
你可以在上面的代码sn-p中使用排序列为'another_column'
。
举个例子,
addmaxdate['next_start'] = pd.NaT
addmaxdate.loc[addmaxdate.ID_combo.eq(addmaxdate.shift(-1).ID_combo), 'next_start'] = addmaxdate.shift(-1).startdate
可以翻译成
addmaxdate.withColumn('next_start',
F.when(
F.col('ID_combo') == F.lag(F.col('ID_combo')).over(Window.orderBy('ordering_column')),
F.lag(F.col('startdate')).over(Window.orderBy('ordering_column'))
)
)
【讨论】:
嗨!这会创建我要查找的列,但如果我包含 .otherwise( F.col('next_start') 我会收到无法解析列 'next_start" given input" 错误。该部分有什么作用? @Jw007 哦,对不起,这是一个错误,它不应该在那里。我会编辑我的帖子以上是关于将 Pandas Python 转换为 Pyspark的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 python 和 pandas 将 Csv 文件转换为 libsvm?
如何使用 PANDAS / Python 将矩阵转换为列数组