如何在 DataFrame 中跨组使用 LinearRegression?
Posted
技术标签:
【中文标题】如何在 DataFrame 中跨组使用 LinearRegression?【英文标题】:How to use LinearRegression across groups in DataFrame? 【发布时间】:2017-05-03 21:18:54 【问题描述】:假设我的 spark DataFrame (DF) 看起来像
id | age | earnings| health
----------------------------
1 | 34 | 65 | 8
2 | 65 | 12 | 4
2 | 20 | 7 | 10
1 | 40 | 75 | 7
. | .. | .. | ..
我想对 DF 进行分组,应用一个函数(比如线性 依赖于多列的回归 - 在这种情况下为两列 - 在每个聚合 DF 上的聚合 DF)并获得类似
的输出id | intercept| slope
----------------------
1 | ? | ?
2 | ? | ?
from sklearn.linear_model import LinearRegression
lr_object = LinearRegression()
def linear_regression(ith_DF):
# Note: for me it is necessary that ith_DF should contain all
# data within this function scope, so that I can apply any
# function that needs all data in ith_DF
X = [i.earnings for i in ith_DF.select("earnings").rdd.collect()]
y = [i.health for i in ith_DF.select("health").rdd.collect()]
lr_object.fit(X, y)
return lr_object.intercept_, lr_object.coef_[0]
coefficient_collector = []
# following iteration is not possible in spark as 'GroupedData'
# object is not iterable, please consider it as pseudo code
for ith_df in df.groupby("id"):
c, m = linear_regression(ith_df)
coefficient_collector.append((float(c), float(m)))
model_df = spark.createDataFrame(coefficient_collector, ["intercept", "slope"])
model_df.show()
【问题讨论】:
这个问题似乎与***.com/q/43742926/1305344 相似,唯一的区别是使用LinearRegression
而不是QuantileDiscretizer
。这些天看起来像一个非常热门的话题。
@JacekLaskowski 感谢您的意见。但问题是应用自定义聚合函数,该函数需要 2 列数据帧的分组部分,这与 xxxx.agg('colA': sum) 不同,其中 sum 仅适用于一列 colA。谢谢。
自定义聚合函数要做什么?
如果我能够获取 ith_df,那么我可以在每个模型上运行各种预测模型(特征列:x1、x2、x3、.....xn-1 和目标列:xn) ith_df.
哦...谢谢。但是在将每个数据帧连接起来形成单个数据帧时,它会对性能产生影响。这就是为什么我想对每个数据帧进行内联聚合。
【参考方案1】:
我认为这可以从 Spark 2.3 开始使用 pandas_UDF 完成。其实pandas_UDFs的公告在这里有一个拟合分组回归的例子:
Introducing Pandas UDF for Python
【讨论】:
【参考方案2】:我要做的是 filter
主 DataFrame 创建较小的 DataFrame 并进行处理,比如线性回归。
然后您可以并行执行线性回归(在不同的线程上使用相同的SparkSession
,这是线程安全的)并缓存主 DataFrame。
这应该为您提供 Spark 的全部功能。
附言我对 Spark 的那部分了解有限,因此我认为 Spark MLlib 中的 grid search-based model selection 和 TensorFrames 使用了非常相似的方法,即“Scala 和 Apache Spark 的实验性 TensorFlow 绑定”。
【讨论】:
以上是关于如何在 DataFrame 中跨组使用 LinearRegression?的主要内容,如果未能解决你的问题,请参考以下文章
当 XG(跨组)事务可以做同样的工作时,为啥要选择实体组事务?
App Engine Java JDO 中的单元测试 XG 跨组事务