在 Pyspark 中将复杂的数据帧行划分为简单的行

Posted

技术标签:

【中文标题】在 Pyspark 中将复杂的数据帧行划分为简单的行【英文标题】:Dividing complex rows of dataframe to simple rows in Pyspark 【发布时间】:2016-07-11 05:50:06 【问题描述】:

我有这个代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

sc = SparkContext()
sqlContext = SQLContext(sc)
documents = sqlContext.createDataFrame([
    Row(id=1, title=[Row(value=u'cars', max_dist=1000)]),
    Row(id=2, title=[Row(value=u'horse bus',max_dist=50), Row(value=u'normal bus',max_dist=100)]),
    Row(id=3, title=[Row(value=u'Airplane', max_dist=5000)]),
    Row(id=4, title=[Row(value=u'Bicycles', max_dist=20),Row(value=u'Motorbikes', max_dist=80)]),
    Row(id=5, title=[Row(value=u'Trams', max_dist=15)])])

documents.show(truncate=False)
#+---+----------------------------------+
#|id |title                             |
#+---+----------------------------------+
#|1  |[[1000,cars]]                     |
#|2  |[[50,horse bus], [100,normal bus]]|
#|3  |[[5000,Airplane]]                 |
#|4  |[[20,Bicycles], [80,Motorbikes]]  |
#|5  |[[15,Trams]]                      |
#+---+----------------------------------+

我需要将所有复合行(例如 2 和 4)拆分为多行,同时保留“id”,以获得如下结果:

#+---+----------------------------------+
#|id |title                             |
#+---+----------------------------------+
#|1  |[1000,cars]                       |
#|2  |[50,horse bus]                    |
#|2  |[100,normal bus]                  |
#|3  |[5000,Airplane]                   |
#|4  |[20,Bicycles]                     |
#|4  |[80,Motorbikes]                   |
#|5  |[15,Trams]                        |
#+---+----------------------------------+

【问题讨论】:

我没有时间写出答案(对不起!),但这是我正在构建的想法:将 ID 放入“标题”类别中的每个元素中(比如让每个复合行包含valuemax_distid),然后执行flatMap。您将无法使用map,因为map 期望输入和输出之间存在一对一的关系。如果以后有时间会充实一个答案! 谢谢@Katya Handler,我会试试你的想法。如果我不能,我会通过评论请求你的帮助:-) 【参考方案1】:

好的,这就是我想出的。不幸的是,我不得不离开Row 对象的世界并进入list 对象的世界,因为我找不到将appendRow 对象的方法。

这意味着这个方法有点乱。如果您能找到将新列添加到 Row 对象的方法,那么这不是可行的方法。

def add_id(row):
    it_list = []
    for i in range(0, len(row[1])):
        sm_list = []
        for j in row[1][i]:
            sm_list.append(j)
        sm_list.append(row[0])
        it_list.append(sm_list)
    return it_list

with_id = documents.flatMap(lambda x: add_id(x))

df = with_id.map(lambda x: Row(id=x[2], title=Row(value=x[0], max_dist=x[1]))).toDF()

当我运行df.show() 时,我得到:

+---+----------------+
| id|           title|
+---+----------------+
|  1|     [cars,1000]|
|  2|  [horse bus,50]|
|  2|[normal bus,100]|
|  3| [Airplane,5000]|
|  4|   [Bicycles,20]|
|  4| [Motorbikes,80]|
|  5|      [Trams,15]|
+---+----------------+

【讨论】:

如果我可以建议:def flatten(row): id_, items = row; return ((id_, item) for item in items) 后跟 documents.rdd.flatMap(flatten).toDF(["id", "title"]) 再次感谢@Katya Handler,我喜欢你的原始想法 :-)【参考方案2】:

explode吧:

from pyspark.sql.functions import explode

documents.withColumn("title", explode("title"))
## +---+----------------+
## | id|           title|
## +---+----------------+
## |  1|     [1000,cars]|
## |  2|  [50,horse bus]|
## |  2|[100,normal bus]|
## |  3| [5000,Airplane]|
## |  4|   [20,Bicycles]|
## |  4| [80,Motorbikes]|
## |  5|      [15,Trams]|
## +---+----------------+

【讨论】:

这是一个更好的答案@K.Ali 请再问一个问题@zero323,是否也可以通过结构的字段(col_1=1000,col_2='cars' 等)垂直分解结果列'title'? @K.Ali 这只是一个简单的选择(select("title.value", "title.max_dist"))。 @zero323 想象一下,您想对结果做更多​​explode。目标是两个有两列数字title_num 和另一个title_typedocuments.withColumn("title", explode("title")) 还应该添加什么?这样的任务需要两次爆炸吗?【参考方案3】:

我正在使用 Spark Dataset API,以下解决了我的“爆炸”要求:

Dataset<Row> explodedDataset = initialDataset.selectExpr("ID","explode(finished_chunk) as chunks");

注意:Dataset API 的 explode 方法在 Spark 2.4.5 中已弃用,文档建议使用 Select(如上所示)或 FlatMap。

【讨论】:

以上是关于在 Pyspark 中将复杂的数据帧行划分为简单的行的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中将结构转换为数组

Spark:如何从 Spark 数据帧行解析和转换 json 字符串

在 Pyspark 中将 Pandas 数据帧转换为 Spark 数据帧的 TypeError

将 spark 数据帧行写入 dynamoDB 表中的项目

如何在pyspark中将字符串值转换为arrayType

在 Pyspark 中将字典转换为数据框