将 Pandas 最佳拟合函数转换为 pyspark

Posted

技术标签:

【中文标题】将 Pandas 最佳拟合函数转换为 pyspark【英文标题】:Convert Pandas best fit function into pyspark 【发布时间】:2019-03-18 14:47:41 【问题描述】:

我一直在使用这个函数在 Pandas 中创建时间序列特征,它返回给定点范围的(OLS?)最佳拟合斜率:

def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X) 
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    b = numer / denum
    return b

这是一个显示结果的简单示例(请参阅下面的最终 df):

import pandas as pd
import numpy as np
import random
cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

然后我应用 best_fit 函数得到前面 5 个点的斜率:

for i,row in df.iterrows():
  if i>=5:
    X = df['x_vals'][i-5:i]
    Y = df['y_vals'][i-5:i]
    df.loc[i,'slope'] = best_fit(X, Y)
df

这给了我这个:

x_vals  y_vals  slope
0   -0.648205   NaN
1   0.282729    NaN
2   0.785474    NaN
3   1.48546     NaN
4   0.408165    NaN
5   1.61244     0.331548
6   2.60868     0.228211
7   3.77621     0.377338
8   4.08937     0.678201
9   4.34625     0.952618
10  5.47554     0.694832
11  7.90902     0.630377
12  8.83912     0.965180
13  9.01195     1.306227
14  11.8244     1.269497
15  13.3199     1.380057
16  15.2751     1.380692
17  15.3959     1.717981
18  18.454      1.621861
19  20.0773     1.533528

我需要从 pyspark 数据框而不是 Pandas 中获取相同的斜率列,只是我正在努力寻找一个起点(pyspark 窗口?OLS 内置函数?udf?)。

【问题讨论】:

应该结合使用pyspark window和udf,这个参考(基于scala)将帮助你实现***.com/questions/23402303/… 【参考方案1】:

使用 Pyspark 窗口,收集前 5 个 col 值作为列表并调用 best_fit_udf

#moodified this function to handle 0 division and size of elements
def best_fit(X, Y):
    xbar = sum(X)/len(X)
    ybar = sum(Y)/len(Y)
    n = len(X)
    if n < 6 :
       return None
    numer = sum([xi*yi for xi,yi in zip(X, Y)]) - n * xbar * ybar
    denum = sum([xi**2 for xi in X]) - n * xbar**2
    if denum == 0:
       return None
    else:
       return numer / denum

best_fit_udf = udf(best_fit, DoubleType())

cols = ['x_vals','y_vals']
df = pd.DataFrame(columns=cols)
for i in range(0,20):
  df.loc[i,'x_vals'] = i
  df.loc[i,'y_vals'] = 0.05 * i**2 + 0.1 * i + random.uniform(-1,1) #some random parabolic points

spark_df = spark.createDataFrame(df)

w = Window.orderBy("x_vals").rowsBetween(-5, 0)

df = spark_df.select("x_vals","y_vals",(F.collect_list('x_vals')).over(w).alias("x_list"), (F.collect_list('y_vals')).over(w).alias("y_list"))

df.withColumn("slope", best_fit_udf('x_list','y_list') ).drop('x_list','y_list').show()

这给了我这个

+------+--------------------+------------------+
|x_vals|              y_vals|             slope|
+------+--------------------+------------------+
|     0|-0.05626232194330516|              null|
|     1|  1.0626613654187942|              null|
|     2|-0.18870622421238525|              null|
|     3|  1.7106172105001147|              null|
|     4|  1.9398571272258158|              null|
|     5|  2.3632022124308474| 0.475092382628695|
|     6|  1.7264493731921893|0.3201115790149247|
|     7|   3.298712278452215|0.5116552596172641|
|     8|  4.3179382280764305|0.4707547914949186|
|     9|    4.00691449276564|0.5077645079970263|
|    10|   6.085792506183289|0.7563877936316236|
|    11|   7.272669055040746|1.0223232959178614|
|    12|    8.70598472345308| 1.085126649123283|
|    13|  10.141576882812515|1.2686365861314373|
|    14|  11.170519757896672| 1.411962717827295|
|    15|  11.999868557507794|1.2199864149871311|
|    16|   14.86294824152797|1.3960568659909833|
|    17|  16.698964370210007| 1.570238888844051|
|    18|   18.71951724368806|1.7810890092953742|
|    19|  20.428078271618062|1.9509358501665701|
+------+--------------------+------------------+

【讨论】:

【参考方案2】:

感谢@Ranga Vure。我根据原始 best_fit 函数(当然是你的值)测试了你的函数,我得到了不同的斜率值。 这是best_fit 函数给出的(y_vals 值看起来是四舍五入的,但实际上不是):

x_vals  y_vals      slope_py
0       -0.0562623  NaN
1       1.06266     NaN
2       -0.188706   NaN
3       1.71062     NaN
4       1.93986     NaN
5       2.3632      0.464019
6       1.72645     0.472965
7       3.29871     0.448290
8       4.31794     0.296278
9       4.00691     0.569167
10      6.08579     0.587891
11      7.27267     0.942689
12      8.70598     0.971577
13      10.1416     1.204185
14      11.1705     1.488952
15      11.9999     1.303836
16      14.8629     1.191893
17      16.699      1.417222
18      18.7195     1.680720
19      20.4281     1.979709

我将best_fit 函数翻译成sqlContext,这给了我与原始best_fit 函数相同的值:

spark_df.createOrReplaceTempView('tempsql')
df_sql = sqlContext.sql("""
SELECT *,
(((sum(x_vals*y_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(y_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5) /
((sum(x_vals*x_vals) over (order by x_vals rows between 5 preceding and 1 preceding)) - 5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5 * (sum(x_vals) over (order by x_vals rows between 5 preceding and 1 preceding))/5)) as slope_sql
from tempsql 
""")

这给出了与原始 best_fit 函数相同的值,除了第 3-5 点,因为它计算了预期开始之前的斜率(即第 6 点) - 一个小怪癖,但我可以忍受:

+------+-------------------+-------------------+--------------------+
|x_vals|             y_vals|           slope_py|           slope_sql|
+------+-------------------+-------------------+--------------------+
|     0|-0.0562623219433051|                NaN|                null|
|     1|   1.06266136541879|                NaN|                null|
|     2| -0.188706224212385|                NaN|  1.0767269459046163|
|     3|   1.71061721050011|                NaN|0.060822882948800006|
|     4|   1.93985712722581|                NaN|  0.4092836048203674|
|     5|   2.36320221243084| 0.4640194743419549|  0.4640194743419549|
|     6|   1.72644937319218| 0.4729645045462295|  0.4729645045462295|
|     7|   3.29871227845221|0.44828961967398656|  0.4482896196739862|
|     8|   4.31793822807643| 0.2962782381870575| 0.29627823818705823|
|     9|   4.00691449276564|  0.569167226772261|  0.5691672267722595|

【讨论】:

@Ranga Vure 啊,斜率输出的差异是由于您的 rowsBetween 包括当前行,而我希望它排除当前行(即最多但不包括观察点)。如果我修改原始函数以包含当前行,我会得到您的号码,因此我接受您的回答,谢谢!如何排除当前行,如果我做 .rowsBetween(-5, -1),它会倒下。

以上是关于将 Pandas 最佳拟合函数转换为 pyspark的主要内容,如果未能解决你的问题,请参考以下文章

数据可视化实例: 带线性回归最佳拟合线的散点图(matplotlib,pandas)

pyspark pandas 对象作为数据框 - TypeError

如何将 Spark 数据帧转换为 Pandas 并返回 Kedro?

想用matlab确定拟合函数的最佳次数?

将 dict 构造函数转换为 Pandas MultiIndex 数据帧

如何防止 pandas.to_datetime() 函数将 0001-01-01 转换为 2001-01-01