如何在 Spark SQL 中以多列为中心?

Posted

技术标签:

【中文标题】如何在 Spark SQL 中以多列为中心?【英文标题】:How to pivot on multiple columns in Spark SQL? 【发布时间】:2017-12-15 14:21:42 【问题描述】:

我需要在 pyspark 数据框中旋转多个列。示例数据框,

 >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100|  1|   23|   10|
|100|  2|   45|   11|
|100|  3|   67|   12|
|100|  4|   78|   13|
|101|  1|   23|   10|
|101|  2|   45|   13|
|101|  3|   67|   14|
|101|  4|   78|   15|
|102|  1|   23|   10|
|102|  2|   45|   11|
|102|  3|   67|   16|
|102|  4|   78|   18|
+---+---+-----+-----+

现在,如果我需要根据日期将每个 id 的价格列放入一行,那么我可以使用 pivot 方法,

>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100|     23|     45|     67|     78|
|101|     23|     45|     67|     78|
|102|     23|     45|     67|     78|
+---+-------+-------+-------+-------+

所以当我需要将单位列作为价格转置时,我必须像上面那样为单位再创建一个数据框,然后使用 id 加入两者。但是,当我有更多列时,我尝试了一个函数去做吧,

>>> def pivot_udf(df,*cols):
...     mydf = df.select('id').drop_duplicates()
...     for c in cols:
...        mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')
...     return mydf
...
>>> pivot_udf(mydf,'price','units').show()
+---+-------+-------+-------+-------+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
|100|     23|     45|     67|     78|     10|     11|     12|     13|
|101|     23|     45|     67|     78|     10|     13|     14|     15|
|102|     23|     45|     67|     78|     10|     11|     16|     18|
+---+-------+-------+-------+-------+-------+-------+-------+-------+

需要建议,如果这样做是好的做法,以及是否有其他更好的方法。提前致谢!

【问题讨论】:

请参考此链接,希望对您有所帮助!! [***.com/questions/37486910/… 【参考方案1】:

这是一种涉及单个枢轴的非 UDF 方式(因此,只需进行单列扫描即可识别所有唯一日期)。

dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))

这是结果(对于不匹配的排序和命名表示歉意):

+---+-------+------+-------+------+-------+------+-------+------+               
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100|     23|    10|     45|    11|     67|    12|     78|    13|
|101|     23|    10|     45|    13|     67|    14|     78|    15|
|102|     23|    10|     45|    11|     67|    16|     78|    18|
+---+-------+------+-------+------+-------+------+-------+------+

我们只是在当天旋转后在priceunit 列上进行聚合。

如果有问题需要命名,

dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()

+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100|     23|    10|     45|    11|     67|    12|     78|    13|
|101|     23|    10|     45|    13|     67|    14|     78|    15|
|102|     23|    10|     45|    11|     67|    16|     78|    18|
+---+-------+------+-------+------+-------+------+-------+------+

【讨论】:

【参考方案2】:

问题中的解决方案是我能得到的最好的解决方案。唯一的改进是cache 输入数据集以避免双重扫描,即

mydf.cache
pivot_udf(mydf,'price','units').show()

【讨论】:

【参考方案3】:

与 spark 1.6 版本一样,我认为这是唯一的方法,因为 pivot 只需要一列,并且有第二个属性值,您可以在其上传递该列的不同值,这将使您的代码运行得更快,因为否则 spark 必须运行那是给你的,所以是的,这是正确的做法。

【讨论】:

以上是关于如何在 Spark SQL 中以多列为中心?的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 的 REST API 程序中以多进程方式调用方法而苦苦挣扎

在 NestJS 中以多对一关系添加字段

如何使用部分在CollectionView中水平居中行

过滤以多列为条件的数据框,根据列值具有不同的条件

您可以在 DTrace 中以多 CPU 安全的方式比较探针之间的值吗?

在java中以多线程方式插入或更新数据库中的多条记录