如何在 DataFrame 中跨组使用 LinearRegression?

Posted

技术标签:

【中文标题】如何在 DataFrame 中跨组使用 LinearRegression?【英文标题】:How to use LinearRegression across groups in DataFrame? 【发布时间】:2017-05-03 21:18:54 【问题描述】:

假设我的 spark DataFrame (DF) 看起来像

id | age | earnings| health 
----------------------------
1  | 34  | 65      | 8
2  | 65  | 12      | 4
2  | 20  | 7       | 10
1  | 40  | 75      | 7
.  | ..  | ..      | ..

我想对 DF 进行分组,应用一个函数(比如线性 依赖于多列的回归 - 在这种情况下为两列 - 在每个聚合 DF 上的聚合 DF)并获得类似

的输出
id | intercept| slope 
----------------------
1  |   ?      |  ? 
2  |   ?      |  ? 
from sklearn.linear_model import LinearRegression
lr_object = LinearRegression()

def linear_regression(ith_DF):
    # Note: for me it is necessary that ith_DF should contain all 
    # data within this function scope, so that I can apply any 
    # function that needs all data in ith_DF

    X = [i.earnings for i in ith_DF.select("earnings").rdd.collect()]
    y = [i.health for i in ith_DF.select("health").rdd.collect()]

    lr_object.fit(X, y)
    return lr_object.intercept_, lr_object.coef_[0]

coefficient_collector = []

# following iteration is not possible in spark as 'GroupedData' 
# object is not iterable, please consider it as pseudo code

for ith_df in df.groupby("id"): 
    c, m = linear_regression(ith_df)
    coefficient_collector.append((float(c), float(m)))

model_df = spark.createDataFrame(coefficient_collector, ["intercept", "slope"])
model_df.show()

【问题讨论】:

这个问题似乎与***.com/q/43742926/1305344 相似,唯一的区别是使用LinearRegression 而不是QuantileDiscretizer。这些天看起来像一个非常热门的话题。 @JacekLaskowski 感谢您的意见。但问题是应用自定义聚合函数,该函数需要 2 列数据帧的分组部分,这与 xxxx.agg('colA': sum) 不同,其中 sum 仅适用于一列 colA。谢谢。 自定义聚合函数要做什么? 如果我能够获取 ith_df,那么我可以在每个模型上运行各种预测模型(特征列:x1、x2、x3、.....xn-1 和目标列:xn) ith_df. 哦...谢谢。但是在将每个数据帧连接起来形成单个数据帧时,它会对性能产生影响。这就是为什么我想对每个数据帧进行内联聚合。 【参考方案1】:

我认为这可以从 Spark 2.3 开始使用 pandas_UDF 完成。其实pandas_UDFs的公告在这里有一个拟合分组回归的例子:

Introducing Pandas UDF for Python

【讨论】:

【参考方案2】:

我要做的是 filter 主 DataFrame 创建较小的 DataFrame 并进行处理,比如线性回归。

然后您可以并行执行线性回归(在不同的线程上使用相同的SparkSession,这是线程安全的)并缓存主 DataFrame。

这应该为您提供 Spark 的全部功能。

附言我对 Spark 的那部分了解有限,因此我认为 Spark MLlib 中的 grid search-based model selection 和 TensorFrames 使用了非常相似的方法,即“Scala 和 Apache Spark 的实验性 TensorFlow 绑定”。

【讨论】:

以上是关于如何在 DataFrame 中跨组使用 LinearRegression?的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Spark 中跨执行程序共享数据

如何跨组实现功能并保存唯一输出

当 XG(跨组)事务可以做同样的工作时,为啥要选择实体组事务?

App Engine Java JDO 中的单元测试 XG 跨组事务

SQL Server:跨组(而不是组内)的领先/滞后分析功能

如何在 PHPUnit 中跨多个测试模拟测试 Web 服务?