在 Spark SQL (pyspark) 中将行转置为列

Posted

技术标签:

【中文标题】在 Spark SQL (pyspark) 中将行转置为列【英文标题】:Transpose rows to Columns in Spark SQL (pyspark) 【发布时间】:2017-10-25 07:46:05 【问题描述】:

我想在 Spark 中进行以下转换我的目标是获得输出,我希望如果我可以进行中间转换,我可以轻松获得输出。任何关于如何将行转换为列的想法都会很有帮助。

RowID  Name  Place
1      Gaga India,US,UK
1      Katy UK,India,Europe
1      Bey  Europe
2      Gaga Null
2      Katy India,Europe
2      Bey  US
3      Gaga Europe
3      Katy US
3      Bey  Null

Output:

RowID   Id  Gaga    Katy    Bey
1       1   India   UK      Europe
1       2   US      India   Null
1       3   UK      Europe  Null
2       1   Null    India   US
2       2   Null    Europe  Null
3       1   Europe  US      Null


Intermediate Output:

RowID   Gaga         Katy               Bey
1       India,US,UK  UK,India,Europe    Europe
2       Null         India,Europe       US
3       Europe       US                 Null

【问题讨论】:

【参考方案1】:

使用 Dataframe 函数和 UDF,我已经尝试过。希望对你有帮助。

>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import IntegerType
>>> from functools import reduce
>>> from pyspark.sql import DataFrame
>>> from pyspark.sql import Window
>>> l = [(1,'Gaga','India,US,UK'),(1,'Katy','UK,India,Europe'),(1,'Bey','Europe'),(2,'Gaga',None),(2,'Katy','India,Europe'),(2,'Bey','US'),(3,'Gaga','Europe'),
... (3,'Katy','US'),(3,'Bey',None)]
>>> df = spark.createDataFrame(l,['RowID','Name','Place'])
>>> df = df.withColumn('Placelist',F.split(df.Place,','))
>>> df.show()
+-----+----+---------------+-------------------+
|RowID|Name|          Place|          Placelist|
+-----+----+---------------+-------------------+
|    1|Gaga|    India,US,UK|    [India, US, UK]|
|    1|Katy|UK,India,Europe|[UK, India, Europe]|
|    1| Bey|         Europe|           [Europe]|
|    2|Gaga|           null|               null|
|    2|Katy|   India,Europe|    [India, Europe]|
|    2| Bey|             US|               [US]|
|    3|Gaga|         Europe|           [Europe]|
|    3|Katy|             US|               [US]|
|    3| Bey|           null|               null|
+-----+----+---------------+-------------------+

>>> udf1 = F.udf(lambda x : len(x) if x is not None else 0,IntegerType())
>>> maxlen = df.agg(F.max(udf1('Placelist'))).first()[0]
>>> df1 = df.groupby('RowID').pivot('Name').agg(F.first('Placelist'))
>>> df1.show()
+-----+--------+---------------+-------------------+
|RowID|     Bey|           Gaga|               Katy|
+-----+--------+---------------+-------------------+
|    1|[Europe]|[India, US, UK]|[UK, India, Europe]|
|    3|    null|       [Europe]|               [US]|
|    2|    [US]|           null|    [India, Europe]|
+-----+--------+---------------+-------------------+

>>> finaldf = reduce(
...     DataFrame.unionAll,
...     (df1.select("RowID", F.col("Bey").getItem(i), F.col("Gaga").getItem(i),F.col("Katy").getItem(i) )
...         for i in range(maxlen))
... ).toDF(*df1.columns).na.drop('all',subset=df1.columns[1:]).orderBy('RowID')
>>> w = Window.partitionBy('RowID').orderBy('Bey')
>>> finaldf = finaldf.withColumn('ID',F.row_number().over(w))
>>> finaldf.select('RowID','ID','Gaga','Katy','Bey').show()
+-----+---+------+------+------+
|RowID| ID|  Gaga|  Katy|   Bey|
+-----+---+------+------+------+
|    1|  1|    US| India|  null|
|    1|  2|    UK|Europe|  null|
|    1|  3| India|    UK|Europe|
|    2|  1|  null|Europe|  null|
|    2|  2|  null| India|    US|
|    3|  1|Europe|    US|  null|
+-----+---+------+------+------+

【讨论】:

感谢您的帮助伙伴!这段代码真的很难理解,因为我是 Python 新手,但它可以工作:-) 我会在某个时候尝试解释这些步骤,如果对您有帮助,请接受答案。【参考方案2】:

不使用 UDF 的替代解决方案:



from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import create_map, explode, struct, split, row_number, to_json
from functools import reduce

/* DataFrame Schema */

dfSchema = StructType([
    StructField('RowID', IntegerType()),
    StructField('Name', StringType()),
    StructField('Place', StringType())
])

/* Raw Data */

rowID_11 = Row(1, 'Gaga', 'India,US,UK')
rowID_12 = Row(1, 'Katy', 'UK,India,Europe')
rowID_13 = Row(1, 'Bey', 'Europe')
rowID_21 = Row(2, 'Gaga', None)
rowID_22 = Row(2, 'Katy', 'India,Europe')
rowID_23 = Row(2, 'Bey', 'US')
rowID_31 = Row(3, 'Gaga', 'Europe')
rowID_32 = Row(3, 'Katy', 'US')
rowID_33 = Row(3, 'Bey', None)

rowList = [rowID_11, rowID_12, rowID_13, 
           rowID_21, rowID_22, rowID_23, 
           rowID_31, rowID_32, rowID_33]

/* Create initial DataFrame */

df = spark.createDataFrame(rowList, dfSchema)
df.show()

+-----+----+---------------+ |RowID|Name| Place| +-----+----+---------------+ | 1|Gaga| India,US,UK| | 1|Katy|UK,India,Europe| | 1| Bey| Europe| | 2|Gaga| null| | 2|Katy| India,Europe| | 2| Bey| US| | 3|Gaga| Europe| | 3|Katy| US| | 3| Bey| null| +-----+----+---------------+

/* Use create_map, struct and to_json to create intermediate output */

jsonDFCol = df.select(
                 to_json(
                 create_map('Name', 
                            struct('RowID', 'Place')))\
                                .alias('name_place'))

jsonList = [js[0] for js in jsonDFCol.rdd.collect()] 
jsonDF = spark.read.json(sc.parallelize(jsonList))

intermediateList = [jsonDF .selectExpr(f'name.RowID', f'name.Place AS name')\
    .where('RowID is not Null') for name in jsonDF .columns]

intermediateDF = reduce(lambda curr, nxt: 
                        curr.join(nxt, on='RowID'), 
                        intermediateList).sort('RowID')\
                        .select('RowID', 'Gaga', 'Katy', 'Bey')

intermediateDF.show()

+-----+-----------+---------------+------+ |RowID| Gaga| Katy| Bey| +-----+-----------+---------------+------+ | 1|India,US,UK|UK,India,Europe|Europe| | 2| null| India,Europe| US| | 3| Europe| US| null| +-----+-----------+---------------+------+

/* Use window to create Id column */

rowWindow = Window.partitionBy('RowID').orderBy('RowID') 

/* Use split and explode functions to obtain final output */

finalDFList = \
[intermediateDF\
    .select('RowID', 
            explode(split(intermediateDF[col_], ',')).alias(col_))\
            .withColumn('id', row_number().over(rowWindow)) 
for col_ in intermediateDF.columns[1:]]

finalDFID = reduce(lambda curr, nxt: curr.select('RowID', 'Id')\
    .unionAll(nxt.select('RowId', 'Id')), finalDFList)

finalDF = reduce(lambda curr, nxt: 
                        curr.join(nxt, on=['RowID', 'Id'], how='left'), 
                        finalDFList, finalDFID).distinct()\
                        .sort('RowId', 'Id')\
                        .select('RowID', 'Id', 
                                'Gaga', 'Katy', 'Bey')

finalDF.show()

+-----+---+------+------+------+ |RowID| Id| Gaga| Katy| Bey| +-----+---+------+------+------+ | 1| 1| India| UK|Europe| | 1| 2| US| India| null| | 1| 3| UK|Europe| null| | 2| 1| null| India| US| | 2| 2| null|Europe| null| | 3| 1|Europe| US| null| +-----+---+------+------+------+

【讨论】:

以上是关于在 Spark SQL (pyspark) 中将行转置为列的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中将rdd行转换为带有json结构的数据框?

如何在 pyspark 中将 DenseMatrix 转换为 spark DataFrame?

Zeppelin - 无法使用 %sql 查询我在 pyspark 注册的表

在 Pyspark 中将 Pandas 数据帧转换为 Spark 数据帧的 TypeError

这是在pyspark上进行乘法的正确方法吗?

如何使用 jupyter notebook 在 pyspark 中的 Hive 上使用 %sql Magic 字符串启用 spark SQL