Pandas对Pyspark函数的迭代。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pandas对Pyspark函数的迭代。相关的知识,希望对你有一定的参考价值。
给定一个数据集,我试图创建一个逻辑,在两个列中,我需要强制执行连续性,最后一个目的地(进入列)是每个id的确切下一个起点(从列)。例如这个表
+----+-------+-------+
| id | from | to |
+----+-------+-------+
| 1 | A | B |
| 1 | C | A |
| 2 | D | D |
| 2 | F | G |
| 2 | F | F |
+----+-------+-------+
理想情况下应该是这样的。
+----+-------+-------+
| id | from | to |
+----+-------+-------+
| 1 | A | B |
| 1 | B | C |
| 1 | C | A |
| 2 | D | D |
| 2 | D | F |
| 2 | F | G |
| 2 | G | F |
| 2 | F | F |
+----+-------+-------+
我用Pandas做了这样的工作,我按行循环检查 previous_row['to'] == current_row['from'],同时检查id,也许可以用groupby来避免,你可以看到下面的内容
for i in range(len(df)):
if (i < (len(df)-1)):
if (new.ix[i,"to"] != new.ix[i+1,"from"]) & (new.ix[i,"id"] == new.ix[i+1,"id"]):
new_index = i + 0.5
line = pd.DataFrame({"id":new.ix[i,"id"],
"from":new.ix[i,"to"],"to":new.ix[i+1,"from"],}, index = [new_index])
appendings = pd.concat([appendings,line])
else:
pass
else:
pass
是否有可能将其 "翻译 "为pyspark rdds?
我知道在Pyspark中,循环远不是最佳的复制循环和if-else逻辑。
我考虑过通过分组和从列和到列的zipping,并在单列上工作。主要问题在于,我可以在 "有问题 "的行上产生一个标志,但没有办法在不使用索引操作的情况下插入新行。
答案
这不是一个 pyspark
答案,但只是一个部分答案,以告诉你如何实现没有循环的任务在 pandas
.
你可以试试。
def f(sub_df):
return sub_df.assign(to_=np.roll(sub_df.To, 1))
.apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1)
.explode()
.apply(pd.Series)
out = df.groupby('id').apply(f)
.reset_index(level=1, drop=True)
.rename(columns={0: "from", 1: "to"})
工作流。
- 通过以下方式对数据帧进行分组
id
使用groupby
- 为每个组。
- 创建一个新的列(这里的名字是
to_
)来拥有前一行。np.roll
执行 圆班 以便保留最后的数值。 - 根据如果 潮流 == 之前:返回当前行或添加新行进行过渡。
- 使用
explode
爆炸list
的list
变成每行一个列表。 - 将该列转换成两列,使用
apply(pd.Series)
- 创建一个新的列(这里的名字是
- 然后,对于输出数据帧,删除了 第一级 用指数
reset_index
- 并使用以下方法重命名列
rename
完整代码
# Import module
import pandas as pd
import numpy as np
# create dataset
df = pd.DataFrame({"id": [1,1,2,2,2], "From": ["A", "C", "D", "F", "F"], "To": ["B", "D", "D", "G", "F"]})
# print(df)
def f(sub_df):
return sub_df.assign(to_=np.roll(sub_df.To, 1))
.apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1)
.explode()
.apply(pd.Series)
out = df.groupby('id').apply(f)
.reset_index(level=1, drop=True)
.rename(columns={0: "from", 1: "to"})
print(out)
# from to
# id
# 1 D A
# 1 A B
# 1 B C
# 1 C D
# 2 F D
# 2 D D
# 2 D F
# 2 F G
# 2 G F
# 2 F F
下一步是将其转化为 PySpark
. 尝试一下,并随时打开一个新的问题与你的尝试。
以上是关于Pandas对Pyspark函数的迭代。的主要内容,如果未能解决你的问题,请参考以下文章
PySpark 中是不是有与 Pandas 聚合函数 any() 等效的函数?