Pandas对Pyspark函数的迭代。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pandas对Pyspark函数的迭代。相关的知识,希望对你有一定的参考价值。

给定一个数据集,我试图创建一个逻辑,在两个列中,我需要强制执行连续性,最后一个目的地(进入列)是每个id的确切下一个起点(从列)。例如这个表

+----+-------+-------+
| id | from  | to    |
+----+-------+-------+
|  1 | A     | B     |
|  1 | C     | A     |
|  2 | D     | D     |
|  2 | F     | G     |
|  2 | F     | F     |
+----+-------+-------+

理想情况下应该是这样的。

+----+-------+-------+
| id | from  | to    |
+----+-------+-------+
|  1 | A     | B     |
|  1 | B     | C     |
|  1 | C     | A     |
|  2 | D     | D     |
|  2 | D     | F     |
|  2 | F     | G     |
|  2 | G     | F     |
|  2 | F     | F     |
+----+-------+-------+

我用Pandas做了这样的工作,我按行循环检查 previous_row['to'] == current_row['from'],同时检查id,也许可以用groupby来避免,你可以看到下面的内容

for i in range(len(df)):
    if (i < (len(df)-1)):
        if (new.ix[i,"to"] != new.ix[i+1,"from"]) & (new.ix[i,"id"] == new.ix[i+1,"id"]): 
            new_index = i + 0.5
            line = pd.DataFrame({"id":new.ix[i,"id"],
                             "from":new.ix[i,"to"],"to":new.ix[i+1,"from"],}, index = [new_index])
            appendings = pd.concat([appendings,line])
        else:
            pass
    else:
        pass

是否有可能将其 "翻译 "为pyspark rdds?

我知道在Pyspark中,循环远不是最佳的复制循环和if-else逻辑。

我考虑过通过分组和从列和到列的zipping,并在单列上工作。主要问题在于,我可以在 "有问题 "的行上产生一个标志,但没有办法在不使用索引操作的情况下插入新行。

答案

这不是一个 pyspark 答案,但只是一个部分答案,以告诉你如何实现没有循环的任务在 pandas.

你可以试试。

def f(sub_df):
    return sub_df.assign(to_=np.roll(sub_df.To, 1)) 
                .apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1) 
                .explode() 
                .apply(pd.Series)


out = df.groupby('id').apply(f) 
        .reset_index(level=1, drop=True) 
        .rename(columns={0: "from", 1: "to"})

工作流。

  • 通过以下方式对数据帧进行分组 id 使用 groupby
  • 为每个组。
    • 创建一个新的列(这里的名字是 to_)来拥有前一行。np.roll 执行 圆班 以便保留最后的数值。
    • 根据如果 潮流 == 之前:返回当前行或添加新行进行过渡。
    • 使用 explode 爆炸 listlist 变成每行一个列表。
    • 将该列转换成两列,使用 apply(pd.Series)
  • 然后,对于输出数据帧,删除了 第一级 用指数 reset_index
  • 并使用以下方法重命名列 rename

完整代码

# Import module
import pandas as pd
import numpy as np

# create dataset
df = pd.DataFrame({"id": [1,1,2,2,2], "From": ["A", "C", "D", "F", "F"], "To": ["B", "D", "D", "G", "F"]})
# print(df)


def f(sub_df):
    return sub_df.assign(to_=np.roll(sub_df.To, 1)) 
                .apply(lambda x: [[x.From, x.To]] if x.to_ == x.From else [[x.to_, x.From], [x.From, x.To]], axis=1) 
                .explode() 
                .apply(pd.Series)


out = df.groupby('id').apply(f) 
        .reset_index(level=1, drop=True) 
        .rename(columns={0: "from", 1: "to"})
print(out)
#    from to
# id
# 1     D  A
# 1     A  B
# 1     B  C
# 1     C  D
# 2     F  D
# 2     D  D
# 2     D  F
# 2     F  G
# 2     G  F
# 2     F  F

下一步是将其转化为 PySpark. 尝试一下,并随时打开一个新的问题与你的尝试。

以上是关于Pandas对Pyspark函数的迭代。的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中是不是有与 Pandas 聚合函数 any() 等效的函数?

Pandas 到 pyspark cumprod 函数

pyspark 中的 Pandas UDF

使用 pyspark 的 toPandas() 错误:'int' 对象不可迭代

将 Pandas Python 转换为 Pyspark

将 Pandas 最佳拟合函数转换为 pyspark