为每组 pyspark RDD/dataframe 选择随机列

Posted

技术标签:

【中文标题】为每组 pyspark RDD/dataframe 选择随机列【英文标题】:Selecting random columns for each group of pyspark RDD/dataframe 【发布时间】:2016-10-27 19:19:23 【问题描述】:

我的数据框有 10,0000 列,我必须对每个组应用一些逻辑(键是区域和部门)。每个组将使用 10k 列中的最多 30 列,30 列列表来自第二个数据集列“colList”。每个组将有 2-3 百万行。我的方法是按键分组并调用如下函数。但它失败了 - 1. shuffle 和 2.data group 超过 2G(可以通过重新分区来解决,但成本很高),3. 非常慢

def testfunc(iter):
   <<got some complex business logic which cant be done in spark API>>

resRDD = df.rdd.groupBy(region, dept).map(lambda x: testfunc(x))

输入:

region dept week val0 val1  val2  val3 ... val10000   
 US    CS   1     1    2    1     1   ...  2 
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

为每个组选择的列:(数据集 2)

region dept colList   
 US    CS   val0,val10,val100,val2000 
 US    ELE  val2,val5,val800,val900
 UE    CS   val21,val54,val806,val9000

我的第二个解决方案是从只有 30 列的输入数据创建一个新数据集,并将这些列重命名为 col1 到 col30。然后为每个列和组使用一个映射列表。然后我可以应用 groupbyKey(假设),这将比 10K 列的原始输入更 Skinner。

region dept week col0 col1  col2  col3 ... col30   
 US    CS   1     1    2    1     1   ...  2 
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

任何人都可以帮助将 10K 的输入转换为 30 列吗?或者任何其他替代方案都可以避免分组。

【问题讨论】:

【参考方案1】:

您可以使用 create_map 函数将所有 10k 列转换为每行映射。现在使用 UDF 获取地图、区域和部门,并将地图稀释到 30 列,并确保所有 30 列始终具有相同的名称。 最后,您可以包装您的复杂函数以接收地图而不是原始的 10K 列。希望这将使它足够小以正常工作。

如果没有,您可以获得不同的区域和部门,并假设您可以循环遍历一个并按另一个分组。

【讨论】:

感谢 Assaf,是的,我也遵循同样的方法,创建了一个地图字典并创建了 30 列,这些列对于所有组合都是相同的。我不能循环,因为我有 2K 组合。每个循环不会使用整个集群。每行映射的一个问题是 5M 行需要 30 分钟。有什么建议吗? 如果你能分享你的代码,我可以试着找出一些东西。

以上是关于为每组 pyspark RDD/dataframe 选择随机列的主要内容,如果未能解决你的问题,请参考以下文章

[Spark][Python][RDD][DataFrame]从 RDD 构造 DataFrame 例子

[Spark][Python][DataFrame][RDD]DataFrame中抽取RDD例子

在 PySpark 中为每一行查找最新的非空值

spark为每组动态创建struct/json

为每组选择随机行

为每组值创建 Div